+
+ // generate some deltas
+ final int deltaCount = 5;
+ generateModification(getDataBroker(), deltaCount);
+
+ // verify that all the deltas were transferred and committed + 1 invocation from receiving MSG_EMPTY_DATA
+ verify(sinkChain, timeout(2000).times(deltaCount + 1)).newWriteOnlyTransaction();
+ verify(sinkTx, timeout(2000).times(deltaCount + 1)).commit();
+
+ sink.close();
+ source.close();
+ }
+
+ /**
+ * Add some data first and then start replication. Only 1 change is expected to return from the Source and it
+ * should contain the initial state of the Source's datastore.
+ */
+ @Test
+ public void testReplicateInitialState() throws InterruptedException, ExecutionException {
+ // add some data to datastore
+ final int deltaCount = 5;
+ generateModification(getDataBroker(), deltaCount);
+
+ // Make sure to start source...
+ final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT);
+ // ... and give it some time start up and open up the port
+ Thread.sleep(1000);
+
+ // Mocking for sink...
+ final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
+ final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
+ doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
+ doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
+ final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
+ doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(any());
+
+ // Kick of the sink ...
+ final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
+ Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO);
+ // ... and sync on it starting up
+
+ // verify the connection was established and MSG_EMPTY_DATA was transferred
+ verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
+ verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction();
+
+ // verify that the initial data invoked onDataTreeChanged() and was transferred to sink
+ ArgumentCaptor<NormalizedNode<?, ?>> dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class);
+ verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture());
+ // verify that the initial state contains everything
+ NormalizedNode<?, ?> capturedInitialState = dataCaptor.getAllValues().iterator().next();
+ NormalizedNode<?, ?> expectedEntityState = generateNormalizedNodeForEntities(deltaCount);
+ assertEquals(expectedEntityState, capturedInitialState);
+
+ verify(sinkTx, timeout(2000).times(1)).commit();