From a2f2028f4d111e77ca542d2bc2110f38fafd82fb Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tibor=20Kr=C3=A1l?= Date: Tue, 23 Jun 2020 22:58:52 +0200 Subject: [PATCH] Improve tests and blueprints of Netty Replication Utility MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit - Test replication of actual DataTreeChanges from Source to Sink and fix any related issues. - Move blueprints to /OSGI-INF/blueprint/ and fix any related wiring issues Signed-off-by: Tibor Král Change-Id: I89c8228538e0462bbe61ca49e1ecf09dad4d4aaf Signed-off-by: Robert Varga --- .../odl-mdsal-exp-replicate-common/pom.xml | 4 + .../odl-mdsal-exp-replicate-netty/pom.xml | 4 + .../mdsal/replicate/netty/Constants.java | 1 + .../replicate/netty/NettyReplication.java | 6 + .../replicate/netty/SinkRequestHandler.java | 2 +- .../blueprint/netty-replication-common.xml | 13 ++ .../blueprint/netty-replication-sink.xml | 17 +-- .../blueprint/netty-replication-source.xml | 6 - .../blueprint/netty-replication-common.xml | 6 - .../replicate/netty/IntegrationTest.java | 112 +++++++++++++++++- 10 files changed, 141 insertions(+), 30 deletions(-) create mode 100644 replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml rename replicate/mdsal-replicate-netty/src/main/resources/{org/opendaylight => OSGI-INF}/blueprint/netty-replication-sink.xml (62%) rename replicate/mdsal-replicate-netty/src/main/resources/{org/opendaylight => OSGI-INF}/blueprint/netty-replication-source.xml (65%) delete mode 100644 replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml diff --git a/features/odl-mdsal-exp-replicate-common/pom.xml b/features/odl-mdsal-exp-replicate-common/pom.xml index 1875f4fa39..dbf277b79c 100644 --- a/features/odl-mdsal-exp-replicate-common/pom.xml +++ b/features/odl-mdsal-exp-replicate-common/pom.xml @@ -21,6 +21,10 @@ OpenDaylight :: MD-SAL :: Replicate :: Common + + true + + org.opendaylight.mdsal diff --git a/features/odl-mdsal-exp-replicate-netty/pom.xml b/features/odl-mdsal-exp-replicate-netty/pom.xml index 339c80e620..09efd74853 100644 --- a/features/odl-mdsal-exp-replicate-netty/pom.xml +++ b/features/odl-mdsal-exp-replicate-netty/pom.xml @@ -21,6 +21,10 @@ OpenDaylight :: MD-SAL :: Replicate :: Netty + + true + + org.opendaylight.mdsal diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java index c9611bec6c..1fff84f4e5 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java @@ -44,6 +44,7 @@ final class Constants { static final ByteBuf EMPTY_DATA = Unpooled.unreleasableBuffer( Unpooled.wrappedBuffer(new byte[] { MSG_EMPTY_DATA })); + static final ByteBuf DTC_APPLY = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY })); private Constants() { diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java index 7e396e8ba4..cc70ffad57 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java @@ -17,8 +17,12 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.yangtools.concepts.AbstractRegistration; import org.opendaylight.yangtools.concepts.Registration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class NettyReplication { + private static final Logger LOG = LoggerFactory.getLogger(NettyReplication.class); + private static final class Disabled extends AbstractRegistration { @Override protected void removeRegistration() { @@ -33,12 +37,14 @@ public final class NettyReplication { public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, final ClusterSingletonServiceProvider singletonService, final boolean enabled, final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay) { + LOG.debug("Sink {}", enabled ? "enabled" : "disabled"); return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport, dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay)) : new Disabled(); } public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort) { + LOG.debug("Source {}", enabled ? "enabled" : "disabled"); final DOMDataTreeChangeService dtcs = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class); verify(dtcs != null, "Missing DOMDataTreeChangeService in broker %s", dataBroker); diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java index 5a29573ade..0b65bc16ee 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java @@ -65,7 +65,7 @@ final class SinkRequestHandler extends SimpleChannelInboundHandler { handleEmptyData(); break; case Constants.MSG_DTC_CHUNK: - chunks.add(msg); + chunks.add(msg.retain()); break; case Constants.MSG_DTC_APPLY: handleDtcApply(); diff --git a/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml new file mode 100644 index 0000000000..e394dfa253 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml @@ -0,0 +1,13 @@ + + + + + + + + + + diff --git a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-sink.xml b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-sink.xml similarity index 62% rename from replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-sink.xml rename to replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-sink.xml index 9a76c3d546..a5bbbc0063 100644 --- a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-sink.xml +++ b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-sink.xml @@ -1,8 +1,8 @@ - + @@ -11,17 +11,12 @@ - - - - - + - + - + - + diff --git a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-source.xml b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-source.xml similarity index 65% rename from replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-source.xml rename to replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-source.xml index 14a2d4c436..41b4e106ad 100644 --- a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-source.xml +++ b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-source.xml @@ -1,6 +1,5 @@ @@ -9,11 +8,6 @@ - - - - diff --git a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml b/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml deleted file mode 100644 index 00dc59516c..0000000000 --- a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - diff --git a/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java b/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java index 6261bd3f9f..e688bd5e9b 100644 --- a/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java +++ b/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java @@ -7,6 +7,7 @@ */ package org.opendaylight.mdsal.replicate.netty; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; @@ -16,11 +17,15 @@ import static org.mockito.Mockito.verify; import java.net.Inet4Address; import java.time.Duration; +import java.util.concurrent.ExecutionException; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.junit.MockitoJUnitRunner; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.WriteTransaction; import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractDataBrokerTest; import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; @@ -29,14 +34,30 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; import org.opendaylight.mdsal.dom.api.DOMTransactionChain; import org.opendaylight.mdsal.eos.dom.simple.SimpleDOMEntityOwnershipService; import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.Entity; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityKey; import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; @RunWith(MockitoJUnitRunner.StrictStubs.class) public class IntegrationTest extends AbstractDataBrokerTest { private static final int TEST_PORT = 4000; + private static final QName ENTITY_QNAME = Entity.QNAME; + private static final QName ENTITY_NAME_QNAME = QName.create(ENTITY_QNAME, "name"); + private AbstractBootstrapSupport support; private DOMClusterSingletonServiceProviderImpl css; @@ -53,8 +74,12 @@ public class IntegrationTest extends AbstractDataBrokerTest { css.close(); } + /** + * Connect the Source and Sink. Verify that the initial state returned from Source was empty. After that make some + * changes to the Source's datastore and verify that they were all replicated to the Sink + */ @Test - public void testSourceToSink() throws InterruptedException { + public void testSourceToSink() throws InterruptedException, ExecutionException { // Make sure to start source... final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT); // ... and give it some time start up and open up the port @@ -72,16 +97,91 @@ public class IntegrationTest extends AbstractDataBrokerTest { final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true, Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO); // ... and sync on it starting up - verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any()); - - // FIXME: add a few writes to the broker so we have multiple transactions and verify deltas - verify(sinkChain, timeout(2000)).newWriteOnlyTransaction(); + // verify the connection was established and MSG_EMPTY_DATA was transferred + verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any()); verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION), eq(YangInstanceIdentifier.empty()), any(ContainerNode.class)); - verify(sinkTx, timeout(1000)).commit(); + + // generate some deltas + final int deltaCount = 5; + generateModification(getDataBroker(), deltaCount); + + // verify that all the deltas were transferred and committed + 1 invocation from receiving MSG_EMPTY_DATA + verify(sinkChain, timeout(2000).times(deltaCount + 1)).newWriteOnlyTransaction(); + verify(sinkTx, timeout(2000).times(deltaCount + 1)).commit(); + + sink.close(); + source.close(); + } + + /** + * Add some data first and then start replication. Only 1 change is expected to return from the Source and it + * should contain the initial state of the Source's datastore. + */ + @Test + public void testReplicateInitialState() throws InterruptedException, ExecutionException { + // add some data to datastore + final int deltaCount = 5; + generateModification(getDataBroker(), deltaCount); + + // Make sure to start source... + final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT); + // ... and give it some time start up and open up the port + Thread.sleep(1000); + + // Mocking for sink... + final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class); + final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class); + doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit(); + doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction(); + final DOMDataBroker sinkBroker = mock(DOMDataBroker.class); + doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(any()); + + // Kick of the sink ... + final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true, + Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO); + // ... and sync on it starting up + + // verify the connection was established and MSG_EMPTY_DATA was transferred + verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any()); + verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction(); + + // verify that the initial data invoked onDataTreeChanged() and was transferred to sink + ArgumentCaptor> dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class); + verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture()); + // verify that the initial state contains everything + NormalizedNode capturedInitialState = dataCaptor.getAllValues().iterator().next(); + NormalizedNode expectedEntityState = generateNormalizedNodeForEntities(deltaCount); + assertEquals(expectedEntityState, capturedInitialState); + + verify(sinkTx, timeout(2000).times(1)).commit(); sink.close(); source.close(); } + + private static ContainerNode generateNormalizedNodeForEntities(final int amount) { + final CollectionNodeBuilder builder = ImmutableNodes.mapNodeBuilder(ENTITY_QNAME); + for (int i = 0; i < amount; i++) { + builder.withChild(ImmutableNodes.mapEntry(ENTITY_QNAME, ENTITY_NAME_QNAME, "testEntity" + i)); + } + + return Builders.containerBuilder() + .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME)) + .withChild(builder.build()) + .build(); + } + + private static void generateModification(final DataBroker broker, final int amount) + throws InterruptedException, ExecutionException { + for (int i = 0; i < amount; i++) { + final WriteTransaction writeTransaction = broker.newWriteOnlyTransaction(); + final EntityKey key = new EntityKey("testEntity" + i); + + writeTransaction.put(LogicalDatastoreType.CONFIGURATION, + InstanceIdentifier.builder(Entity.class, key).build(), new EntityBuilder().withKey(key).build()); + writeTransaction.commit().get(); + } + } } -- 2.36.6