- final Registration source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT,
- Duration.ZERO, 5);
- // ... and give it some time start up and open up the port
- Thread.sleep(1000);
-
- // Mocking for sink...
- final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
- final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
- doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
- doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
- final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
- doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain();
-
- // Kick of the sink ...
- final Registration sink = NettyReplicationSink.createSink(support, sinkBroker, css, true,
- Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3);
- // ... and sync on it starting up
-
- // verify the connection was established and MSG_EMPTY_DATA was transferred
- verify(sinkBroker, timeout(1000)).createMergingTransactionChain();
- verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction();
-
- // verify that the initial data invoked onDataTreeChanged() and was transferred to sink
- ArgumentCaptor<NormalizedNode> dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class);
- verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture());
- // verify that the initial state contains everything
- NormalizedNode capturedInitialState = dataCaptor.getAllValues().iterator().next();
- NormalizedNode expectedEntityState = generateNormalizedNodeForEntities(deltaCount);
- assertEquals(expectedEntityState, capturedInitialState);
-
- verify(sinkTx, timeout(2000).times(1)).commit();
-
- sink.close();
- source.close();
+ try (var source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT,
+ Duration.ZERO, 5)) {
+ // ... and give it some time start up and open up the port
+ Thread.sleep(1000);
+
+ // Mocking for sink...
+ final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
+ final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
+ doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
+ doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
+ final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
+ doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain();
+
+ // Kick of the sink ...
+ try (var sink = NettyReplicationSink.createSink(support, sinkBroker, css, true,
+ Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3)) {
+ // ... and sync on it starting up
+
+ // verify the connection was established and MSG_EMPTY_DATA was transferred
+ verify(sinkBroker, timeout(1000)).createMergingTransactionChain();
+ verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction();
+
+ // verify that the initial data invoked onDataTreeChanged() and was transferred to sink
+ final var dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class);
+ verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture());
+ // verify that the initial state contains everything
+ NormalizedNode capturedInitialState = dataCaptor.getAllValues().iterator().next();
+ NormalizedNode expectedEntityState = generateNormalizedNodeForEntities(deltaCount);
+ assertEquals(expectedEntityState, capturedInitialState);
+
+ verify(sinkTx, timeout(2000).times(1)).commit();
+ }
+ }