X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=mdsal.git;a=blobdiff_plain;f=replicate%2Fmdsal-replicate-netty%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Freplicate%2Fnetty%2FIntegrationTest.java;h=a2f1920e8bc05696d850ef3923aaf3d80b56b77c;hp=4ad437c730dcbd8219657b7637dca76936a05873;hb=8980139ca24fe883ed2168cc4bbac8ee3c760158;hpb=94fb90ab450470ee1b3225d737cd394f034ea932 diff --git a/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java b/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java index 4ad437c730..a2f1920e8b 100644 --- a/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java +++ b/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java @@ -37,18 +37,14 @@ import org.opendaylight.mdsal.singleton.dom.impl.EOSClusterSingletonServiceProvi import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.Entity; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityKey; -import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode; -import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.Builders; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @RunWith(MockitoJUnitRunner.StrictStubs.class) @@ -80,39 +76,39 @@ public class IntegrationTest extends AbstractDataBrokerTest { @Test public void testSourceToSink() throws InterruptedException, ExecutionException { // Make sure to start source... - final Registration source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT, - Duration.ZERO, 5); - // ... and give it some time start up and open up the port - Thread.sleep(1000); - - // Mocking for sink... - final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class); - final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class); - doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit(); - doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction(); - final DOMDataBroker sinkBroker = mock(DOMDataBroker.class); - doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(); - - // Kick of the sink ... - final Registration sink = NettyReplicationSink.createSink(support, sinkBroker, css, true, - Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3); - // ... and sync on it starting up - - // verify the connection was established and MSG_EMPTY_DATA was transferred - verify(sinkBroker, timeout(1000)).createMergingTransactionChain(); - verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION), eq(YangInstanceIdentifier.of()), - any(ContainerNode.class)); - - // generate some deltas - final int deltaCount = 5; - generateModification(getDataBroker(), deltaCount); - - // verify that all the deltas were transferred and committed + 1 invocation from receiving MSG_EMPTY_DATA - verify(sinkChain, timeout(2000).times(deltaCount + 1)).newWriteOnlyTransaction(); - verify(sinkTx, timeout(2000).times(deltaCount + 1)).commit(); - - sink.close(); - source.close(); + try (var source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT, + Duration.ZERO, 5)) { + // ... and give it some time start up and open up the port + Thread.sleep(1000); + + // Mocking for sink... + final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class); + final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class); + doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit(); + doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction(); + final DOMDataBroker sinkBroker = mock(DOMDataBroker.class); + doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(); + + // Kick of the sink ... + try (var sink = NettyReplicationSink.createSink(support, sinkBroker, css, true, + Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3)) { + // ... and sync on it starting up + + // verify the connection was established and MSG_EMPTY_DATA was transferred + verify(sinkBroker, timeout(1000)).createMergingTransactionChain(); + verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION), + eq(YangInstanceIdentifier.of()), any(ContainerNode.class)); + + // generate some deltas + final int deltaCount = 5; + generateModification(getDataBroker(), deltaCount); + + // verify that all the deltas were transferred and committed + 1 invocation from receiving + // MSG_EMPTY_DATA + verify(sinkChain, timeout(2000).times(deltaCount + 1)).newWriteOnlyTransaction(); + verify(sinkTx, timeout(2000).times(deltaCount + 1)).commit(); + } + } } /** @@ -126,49 +122,51 @@ public class IntegrationTest extends AbstractDataBrokerTest { generateModification(getDataBroker(), deltaCount); // Make sure to start source... - final Registration source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT, - Duration.ZERO, 5); - // ... and give it some time start up and open up the port - Thread.sleep(1000); - - // Mocking for sink... - final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class); - final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class); - doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit(); - doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction(); - final DOMDataBroker sinkBroker = mock(DOMDataBroker.class); - doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(); - - // Kick of the sink ... - final Registration sink = NettyReplicationSink.createSink(support, sinkBroker, css, true, - Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3); - // ... and sync on it starting up - - // verify the connection was established and MSG_EMPTY_DATA was transferred - verify(sinkBroker, timeout(1000)).createMergingTransactionChain(); - verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction(); - - // verify that the initial data invoked onDataTreeChanged() and was transferred to sink - ArgumentCaptor dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class); - verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture()); - // verify that the initial state contains everything - NormalizedNode capturedInitialState = dataCaptor.getAllValues().iterator().next(); - NormalizedNode expectedEntityState = generateNormalizedNodeForEntities(deltaCount); - assertEquals(expectedEntityState, capturedInitialState); - - verify(sinkTx, timeout(2000).times(1)).commit(); - - sink.close(); - source.close(); + try (var source = NettyReplicationSource.createSource(support, getDomBroker(), css, true, TEST_PORT, + Duration.ZERO, 5)) { + // ... and give it some time start up and open up the port + Thread.sleep(1000); + + // Mocking for sink... + final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class); + final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class); + doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit(); + doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction(); + final DOMDataBroker sinkBroker = mock(DOMDataBroker.class); + doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(); + + // Kick of the sink ... + try (var sink = NettyReplicationSink.createSink(support, sinkBroker, css, true, + Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3)) { + // ... and sync on it starting up + + // verify the connection was established and MSG_EMPTY_DATA was transferred + verify(sinkBroker, timeout(1000)).createMergingTransactionChain(); + verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction(); + + // verify that the initial data invoked onDataTreeChanged() and was transferred to sink + final var dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class); + verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture()); + // verify that the initial state contains everything + NormalizedNode capturedInitialState = dataCaptor.getAllValues().iterator().next(); + NormalizedNode expectedEntityState = generateNormalizedNodeForEntities(deltaCount); + assertEquals(expectedEntityState, capturedInitialState); + + verify(sinkTx, timeout(2000).times(1)).commit(); + } + } } private static ContainerNode generateNormalizedNodeForEntities(final int amount) { - final CollectionNodeBuilder builder = ImmutableNodes.mapNodeBuilder(ENTITY_QNAME); + final var builder = ImmutableNodes.newSystemMapBuilder().withNodeIdentifier(new NodeIdentifier(ENTITY_QNAME)); for (int i = 0; i < amount; i++) { - builder.withChild(ImmutableNodes.mapEntry(ENTITY_QNAME, ENTITY_NAME_QNAME, "testEntity" + i)); + final var name = "testEntity" + i; + builder.withChild(ImmutableNodes.newMapEntryBuilder() + .withNodeIdentifier(NodeIdentifierWithPredicates.of(ENTITY_QNAME, ENTITY_NAME_QNAME, name)) + .build()); } - return Builders.containerBuilder() + return ImmutableNodes.newContainerBuilder() .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME)) .withChild(builder.build()) .build();