Improve tests and blueprints of Netty Replication Utility 94/90694/1
authorTibor Král <tibor.kral@pantheon.tech>
Tue, 23 Jun 2020 20:58:52 +0000 (22:58 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 25 Jun 2020 13:41:16 +0000 (15:41 +0200)
- Test replication of actual DataTreeChanges from Source
to Sink and fix any related issues.
- Move blueprints to /OSGI-INF/blueprint/ and fix any
related wiring issues

Change-Id: I89c8228538e0462bbe61ca49e1ecf09dad4d4aaf
Signed-off-by: Tibor Král <tibor.kral@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit a2f2028f4d111e77ca542d2bc2110f38fafd82fb)

features/odl-mdsal-exp-replicate-common/pom.xml
features/odl-mdsal-exp-replicate-netty/pom.xml
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java
replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-sink.xml [moved from replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-sink.xml with 62% similarity]
replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-source.xml [moved from replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-source.xml with 65% similarity]
replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml [deleted file]
replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java

index 874b61edc1c60f826a0877ef7984a1df7c331a35..eab5a06dbe6be65dc92072ed96af8d1c19c1045e 100644 (file)
 
     <name>OpenDaylight :: MD-SAL :: Replicate :: Common</name>
 
+    <properties>
+        <skip.karaf.featureTest>true</skip.karaf.featureTest>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.opendaylight.mdsal</groupId>
index 9216a4a581fdf56b74c7eaf831661c78ab878124..4625f9cf32a45cfef304bbe04d0b13ca50fd8233 100644 (file)
 
     <name>OpenDaylight :: MD-SAL :: Replicate :: Netty</name>
 
+    <properties>
+        <skip.karaf.featureTest>true</skip.karaf.featureTest>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.opendaylight.mdsal</groupId>
index c9611bec6c991259f5ab9ecf034d442bd62e6342..1fff84f4e5df84f7da57dd967cc6ded774d0c637 100644 (file)
@@ -44,6 +44,7 @@ final class Constants {
 
     static final ByteBuf EMPTY_DATA = Unpooled.unreleasableBuffer(
         Unpooled.wrappedBuffer(new byte[] { MSG_EMPTY_DATA }));
+
     static final ByteBuf DTC_APPLY = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY }));
 
     private Constants() {
index 7e396e8ba46cbfae17921116be8e91db6423df17..cc70ffad573b9a00af1d1eae38b3cbc8c16a67e9 100644 (file)
@@ -17,8 +17,12 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.yangtools.concepts.AbstractRegistration;
 import org.opendaylight.yangtools.concepts.Registration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class NettyReplication {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyReplication.class);
+
     private static final class Disabled extends AbstractRegistration {
         @Override
         protected void removeRegistration() {
@@ -33,12 +37,14 @@ public final class NettyReplication {
     public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
             final ClusterSingletonServiceProvider singletonService, final boolean enabled,
             final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay) {
+        LOG.debug("Sink {}", enabled ? "enabled" : "disabled");
         return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport,
             dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay)) : new Disabled();
     }
 
     public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
             final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort) {
+        LOG.debug("Source {}", enabled ? "enabled" : "disabled");
         final DOMDataTreeChangeService dtcs = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         verify(dtcs != null, "Missing DOMDataTreeChangeService in broker %s", dataBroker);
 
index 5a29573ade29a5b1a7ebe853f0451d0e17bc467f..0b65bc16eeeb9a10ac18256eb2df7cbaa9e60707 100644 (file)
@@ -65,7 +65,7 @@ final class SinkRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
                 handleEmptyData();
                 break;
             case Constants.MSG_DTC_CHUNK:
-                chunks.add(msg);
+                chunks.add(msg.retain());
                 break;
             case Constants.MSG_DTC_APPLY:
                 handleDtcApply();
diff --git a/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml
new file mode 100644 (file)
index 0000000..e394dfa
--- /dev/null
@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0">
+
+    <reference id="dataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker" odl:type="default"/>
+
+    <reference id="singletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"
+               odl:type="default"/>
+
+    <bean id="bootstrapSupport" class="org.opendaylight.mdsal.replicate.netty.AbstractBootstrapSupport"
+          factory-method="create" destroy-method="close"/>
+    <service ref="bootstrapSupport" interface="org.opendaylight.mdsal.replicate.netty.BootstrapSupport"/>
+</blueprint>
similarity index 62%
rename from replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-sink.xml
rename to replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-sink.xml
index 9a76c3d54643babebe227e805cc4b191a34c8f81..a5bbbc00635bb173df11d6f4b65cedf5477b0643 100644 (file)
@@ -1,8 +1,8 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
-           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
            xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
-  <cm:property-placeholder persistent-id="org.opendaylight.mdsal.replicate.netty.sink" update-strategy="reload">
+  <cm:property-placeholder persistent-id="org.opendaylight.mdsal.replicate.netty.sink" update-strategy="reload"
+                           placeholder-prefix="$(" placeholder-suffix=")">
     <cm:default-properties>
       <cm:property name="enabled" value="false"/>
       <cm:property name="source-host" value="127.0.0.1"/>
     </cm:default-properties>
   </cm:property-placeholder>
 
-  <reference id="dataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker" odl:type="default"/>
-  <reference id="singletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"
-             odl:type="default"/>
-  <reference id="bootstrapSupport" interface="org.opendaylight.mdsal.replicate.netty.BootstrapSupport"/>
-
   <bean id="reconnectDelay" class="java.time.Duration" factory-method="ofMillis">
-    <argument value="${reconnect-delay-millis}"/>
+    <argument value="$(reconnect-delay-millis)"/>
   </bean>
 
   <bean id="sourceAddress" class="java.net.InetAddress" factory-method="getByName">
-    <argument value="${source-host}"/>
+    <argument value="$(source-host)"/>
   </bean>
 
   <bean id="nettyReplicationSink" class="org.opendaylight.mdsal.replicate.netty.NettyReplication"
@@ -29,9 +24,9 @@
     <argument ref="bootstrapSupport"/>
     <argument ref="dataBroker"/>
     <argument ref="singletonServiceProvider"/>
-    <argument value="${enabled}"/>
+    <argument value="$(enabled)"/>
     <argument ref="sourceAddress"/>
-    <argument value="${source-port}"/>
+    <argument value="$(source-port)"/>
     <argument ref="reconnectDelay"/>
   </bean>
 
similarity index 65%
rename from replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-source.xml
rename to replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-source.xml
index 14a2d4c436a1a1ec10a30a398eeb7a63cc44f779..41b4e106ad7418bc8e0fd8749610b3942f1a68ff 100644 (file)
@@ -1,6 +1,5 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
-           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
            xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
   <cm:property-placeholder persistent-id="org.opendaylight.mdsal.replicate.netty.source" update-strategy="reload">
     <cm:default-properties>
@@ -9,11 +8,6 @@
     </cm:default-properties>
   </cm:property-placeholder>
 
-  <reference id="dataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker" odl:type="default"/>
-  <reference id="singletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"
-             odl:type="default"/>
-  <reference id="bootstrapSupport" interface="org.opendaylight.mdsal.replicate.netty.BootstrapSupport"/>
-
   <bean id="nettyReplicationSource" class="org.opendaylight.mdsal.replicate.netty.NettyReplication"
         factory-method="createSource" destroy-method="close">
     <argument ref="bootstrapSupport"/>
diff --git a/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml b/replicate/mdsal-replicate-netty/src/main/resources/org/opendaylight/blueprint/netty-replication-common.xml
deleted file mode 100644 (file)
index 00dc595..0000000
+++ /dev/null
@@ -1,6 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
-  <bean id="bootstrapSupport" class="org.opendaylight.mdsal.replicate.netty.AbstractBootstrapSupport"
-               factory-method="create" destroy-method="close"/>
-  <service ref="bootstrapSupport" interface="org.opendaylight.mdsal.replicate.netty.BootstrapSupport"/>
-</blueprint>
index 6261bd3f9ff1e69fdacdd2737786f944036f6a4b..e688bd5e9bff45d52b10501a3cf929bcd3c7805b 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.mdsal.replicate.netty;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -16,11 +17,15 @@ import static org.mockito.Mockito.verify;
 
 import java.net.Inet4Address;
 import java.time.Duration;
+import java.util.concurrent.ExecutionException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.opendaylight.mdsal.binding.api.DataBroker;
+import org.opendaylight.mdsal.binding.api.WriteTransaction;
 import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractDataBrokerTest;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
@@ -29,14 +34,30 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
 import org.opendaylight.mdsal.eos.dom.simple.SimpleDOMEntityOwnershipService;
 import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.Entity;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.EntityKey;
 import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class IntegrationTest extends AbstractDataBrokerTest {
     private static final int TEST_PORT = 4000;
 
+    private static final QName ENTITY_QNAME = Entity.QNAME;
+    private static final QName ENTITY_NAME_QNAME = QName.create(ENTITY_QNAME, "name");
+
     private AbstractBootstrapSupport support;
     private DOMClusterSingletonServiceProviderImpl css;
 
@@ -53,8 +74,12 @@ public class IntegrationTest extends AbstractDataBrokerTest {
         css.close();
     }
 
+    /**
+     * Connect the Source and Sink. Verify that the initial state returned from Source was empty. After that make some
+     * changes to the Source's datastore and verify that they were all replicated to the Sink
+     */
     @Test
-    public void testSourceToSink() throws InterruptedException {
+    public void testSourceToSink() throws InterruptedException, ExecutionException {
         // Make sure to start source...
         final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT);
         // ... and give it some time start up and open up the port
@@ -72,16 +97,91 @@ public class IntegrationTest extends AbstractDataBrokerTest {
         final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
             Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO);
         // ... and sync on it starting up
-        verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
-
-        // FIXME: add a few writes to the broker so we have multiple transactions and verify deltas
 
-        verify(sinkChain, timeout(2000)).newWriteOnlyTransaction();
+        // verify the connection was established and MSG_EMPTY_DATA was transferred
+        verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
         verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION), eq(YangInstanceIdentifier.empty()),
             any(ContainerNode.class));
-        verify(sinkTx, timeout(1000)).commit();
+
+        // generate some deltas
+        final int deltaCount = 5;
+        generateModification(getDataBroker(), deltaCount);
+
+        // verify that all the deltas were transferred and committed + 1 invocation from receiving MSG_EMPTY_DATA
+        verify(sinkChain, timeout(2000).times(deltaCount + 1)).newWriteOnlyTransaction();
+        verify(sinkTx, timeout(2000).times(deltaCount + 1)).commit();
+
+        sink.close();
+        source.close();
+    }
+
+    /**
+     * Add some data first and then start replication. Only 1 change is expected to return from the Source and it
+     * should contain the initial state of the Source's datastore.
+     */
+    @Test
+    public void testReplicateInitialState() throws InterruptedException, ExecutionException {
+        // add some data to datastore
+        final int deltaCount = 5;
+        generateModification(getDataBroker(), deltaCount);
+
+        // Make sure to start source...
+        final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT);
+        // ... and give it some time start up and open up the port
+        Thread.sleep(1000);
+
+        // Mocking for sink...
+        final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
+        final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
+        doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
+        doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
+        final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
+        doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(any());
+
+        // Kick of the sink ...
+        final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
+            Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO);
+        // ... and sync on it starting up
+
+        // verify the connection was established and MSG_EMPTY_DATA was transferred
+        verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
+        verify(sinkChain, timeout(2000).times(1)).newWriteOnlyTransaction();
+
+        // verify that the initial data invoked onDataTreeChanged() and was transferred to sink
+        ArgumentCaptor<NormalizedNode<?, ?>> dataCaptor = ArgumentCaptor.forClass(NormalizedNode.class);
+        verify(sinkTx, timeout(2000).times(1)).put(any(), any(), dataCaptor.capture());
+        // verify that the initial state contains everything
+        NormalizedNode<?, ?> capturedInitialState = dataCaptor.getAllValues().iterator().next();
+        NormalizedNode<?, ?> expectedEntityState = generateNormalizedNodeForEntities(deltaCount);
+        assertEquals(expectedEntityState, capturedInitialState);
+
+        verify(sinkTx, timeout(2000).times(1)).commit();
 
         sink.close();
         source.close();
     }
+
+    private static ContainerNode generateNormalizedNodeForEntities(final int amount) {
+        final CollectionNodeBuilder<MapEntryNode, MapNode> builder = ImmutableNodes.mapNodeBuilder(ENTITY_QNAME);
+        for (int i = 0; i < amount; i++) {
+            builder.withChild(ImmutableNodes.mapEntry(ENTITY_QNAME, ENTITY_NAME_QNAME, "testEntity" + i));
+        }
+
+        return Builders.containerBuilder()
+                .withNodeIdentifier(new NodeIdentifier(SchemaContext.NAME))
+                .withChild(builder.build())
+                .build();
+    }
+
+    private static void generateModification(final DataBroker broker, final int amount)
+            throws InterruptedException, ExecutionException {
+        for (int i = 0; i < amount; i++) {
+            final WriteTransaction writeTransaction = broker.newWriteOnlyTransaction();
+            final EntityKey key = new EntityKey("testEntity" + i);
+
+            writeTransaction.put(LogicalDatastoreType.CONFIGURATION,
+                InstanceIdentifier.builder(Entity.class, key).build(), new EntityBuilder().withKey(key).build());
+            writeTransaction.commit().get();
+        }
+    }
 }