Add a unit test for netty replication 62/90562/8
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 22 Jun 2020 12:44:30 +0000 (14:44 +0200)
committerRobert Varga <nite@hq.sk>
Mon, 22 Jun 2020 17:55:08 +0000 (17:55 +0000)
This is a simple unit test, which we can use to validate establishment
of source connection. This flushes out a few bugs, which are also
addressed.

Change-Id: Ie8e5ae1772ed5dc544f434d7db44b2b31b54443b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
replicate/mdsal-replicate-netty/pom.xml
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractBootstrapSupport.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/MessageFrameEncoder.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java
replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java [new file with mode: 0644]

index ab576718aa99376d16d8c32330423e043f3950b6..72c0b1b8dbeb1a692987a8bf032b7f9697c9d391 100644 (file)
             <optional>true</optional>
         </dependency>
 
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-binding-dom-adapter</artifactId>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-binding-test-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-eos-dom-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-singleton-dom-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
index 926a7761d34876a2e860331f4af19248ba8913a4..a77a78c99f63daf2e5a93c2b4b56ddc6aff860e2 100644 (file)
@@ -33,7 +33,7 @@ public abstract class AbstractBootstrapSupport implements AutoCloseable, Bootstr
         this.workerGroup = requireNonNull(workerGroup);
     }
 
-    public static @NonNull BootstrapSupport create() {
+    public static @NonNull AbstractBootstrapSupport create() {
         if (Epoll.isAvailable()) {
             return new EpollBootstrapSupport();
         }
index eddf8d6dbaa341d3c21e4f1b672780ebc22e7df0..c9611bec6c991259f5ab9ecf034d442bd62e6342 100644 (file)
@@ -42,8 +42,9 @@ final class Constants {
      */
     static final int LENGTH_FIELD_MAX    = 1024 * 1024;
 
-    static final ByteBuf EMPTY_DATA = Unpooled.wrappedBuffer(new byte[] { MSG_EMPTY_DATA });
-    static final ByteBuf DTC_APPLY = Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY });
+    static final ByteBuf EMPTY_DATA = Unpooled.unreleasableBuffer(
+        Unpooled.wrappedBuffer(new byte[] { MSG_EMPTY_DATA }));
+    static final ByteBuf DTC_APPLY = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY }));
 
     private Constants() {
         // Hidden on purpose
index f6d934781fc9a860a16659c50d0551e1f8fee050..da0f51574ae0202973d3b5a73f39c695f50ecb9e 100644 (file)
@@ -12,13 +12,9 @@ import io.netty.handler.codec.LengthFieldPrepender;
 
 @Sharable
 final class MessageFrameEncoder extends LengthFieldPrepender {
-    private static final MessageFrameEncoder INSTANCE = new MessageFrameEncoder();
+    static final MessageFrameEncoder INSTANCE = new MessageFrameEncoder();
 
     private MessageFrameEncoder() {
         super(Constants.LENGTH_FIELD_LENGTH);
     }
-
-    static MessageFrameEncoder instance() {
-        return INSTANCE;
-    }
 }
index 7ead9c26e251e643ac94e8cf18685133ce3e9b69..5a29573ade29a5b1a7ebe853f0451d0e17bc467f 100644 (file)
@@ -27,16 +27,21 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
 import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
 import org.opendaylight.mdsal.replicate.common.DataTreeCandidateUtils;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.DataTreeCandidateInputOutput;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataInput;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.ReusableImmutableNormalizedNodeStreamWriter;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 final class SinkRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
     private static final Logger LOG = LoggerFactory.getLogger(SinkRequestHandler.class);
+    private static final ContainerNode EMPTY_ROOT = ImmutableNodes.containerNode(SchemaContext.NAME);
 
     private final ReusableStreamReceiver receiver = ReusableImmutableNormalizedNodeStreamWriter.create();
     private final List<ByteBuf> chunks = new ArrayList<>();
@@ -72,7 +77,12 @@ final class SinkRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
     private void handleEmptyData() {
         final DOMDataTreeWriteTransaction tx = chain.newWriteOnlyTransaction();
-        tx.delete(tree.getDatastoreType(), tree.getRootIdentifier());
+
+        if (tree.getRootIdentifier().isEmpty()) {
+            tx.put(tree.getDatastoreType(), YangInstanceIdentifier.empty(), EMPTY_ROOT);
+        } else {
+            tx.delete(tree.getDatastoreType(), tree.getRootIdentifier());
+        }
         commit(tx);
     }
 
index c2b5f4a83822488efb4be54903adf09b90a3139c..f71e5af1fc82020ef3839f11d6257637489a8eb6 100644 (file)
@@ -16,25 +16,29 @@ import io.netty.buffer.ByteBufOutputStream;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.Future;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class SinkSingletonService implements ClusterSingletonService {
+final class SinkSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
     private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class);
     private static final ServiceGroupIdentifier SGID =
             ServiceGroupIdentifier.create(SinkSingletonService.class.getName());
@@ -45,7 +49,7 @@ final class SinkSingletonService implements ClusterSingletonService {
 
     static {
         try {
-            TREE_REQUEST = requestTree(TREE);
+            TREE_REQUEST = Unpooled.unreleasableBuffer(requestTree(TREE));
         } catch (IOException e) {
             throw new ExceptionInInitializerError(e);
         }
@@ -65,6 +69,7 @@ final class SinkSingletonService implements ClusterSingletonService {
         this.dataBroker = requireNonNull(dataBroker);
         this.sourceAddress = requireNonNull(sourceAddress);
         this.reconnectDelay = requireNonNull(reconnectDelay);
+        LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
     }
 
     @Override
@@ -81,6 +86,7 @@ final class SinkSingletonService implements ClusterSingletonService {
 
         futureChannel = bs
                 .option(ChannelOption.SO_KEEPALIVE, true)
+                .handler(this)
                 .connect(sourceAddress, null);
 
         futureChannel.addListener(compl -> channelResolved(compl, group));
@@ -88,30 +94,39 @@ final class SinkSingletonService implements ClusterSingletonService {
 
     @Override
     public synchronized ListenableFuture<?> closeServiceInstance() {
-        // TODO Auto-generated method stub
-        return null;
+        // FIXME: initiate orderly shutdown
+        return FluentFutures.immediateNullFluentFuture();
+    }
+
+    @Override
+    protected void initChannel(final SocketChannel ch) {
+        ch.pipeline()
+            .addLast("frameDecoder", new MessageFrameDecoder())
+            .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
+                new SinkTransactionChainListener(ch))))
+            .addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
     }
 
     private synchronized void channelResolved(final Future<?> completedFuture, final ScheduledExecutorService group) {
-        if (completedFuture != futureChannel) {
-            // Future changed, this callback is irrelevant
+        final Throwable cause = completedFuture.cause();
+        if (cause != null) {
+            LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
+            group.schedule(() -> {
+                // FIXME: perform reconnect
+            }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
             return;
         }
 
-        final Channel channel = futureChannel.channel();
-        channel.pipeline()
-            .addLast("frameDecoder", new MessageFrameDecoder())
-            .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
-                new SinkTransactionChainListener(channel))))
-            .addLast("frameEncoder", MessageFrameEncoder.instance());
-
-        channel.writeAndFlush(TREE_REQUEST);
+        final Channel ch = futureChannel.channel();
+        LOG.info("Channel {} established", ch);
+        ch.writeAndFlush(TREE_REQUEST);
     }
 
     private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException {
         final ByteBuf ret = Unpooled.buffer();
 
         try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) {
+            stream.writeByte(Constants.MSG_SUBSCRIBE_REQ);
             try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) {
                 tree.getDatastoreType().writeTo(output);
                 output.writeYangInstanceIdentifier(tree.getRootIdentifier());
index f302ca156de1f3820a36e95adcbca2e8f0e465b3..b7811f01845eca02166532d4bd18b1120a550ce3 100644 (file)
@@ -82,11 +82,13 @@ final class SourceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
         reg = dtcs.registerDataTreeChangeListener(dataTree, new ClusteredDOMDataTreeChangeListener() {
             @Override
             public void onInitialData() {
+                LOG.debug("Channel {} tree {} has empty data", channel, dataTree);
                 channel.writeAndFlush(AbstractSourceMessage.empty());
             }
 
             @Override
             public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
+                LOG.debug("Channel {} tree {} has {} changes", channel, dataTree, changes.size());
                 channel.writeAndFlush(AbstractSourceMessage.of(changes));
             }
         });
index 415637269b0277954f59210893b3ba7665b366c2..2d2b35ecfb851f5c08fb06511e9292063459fc52 100644 (file)
@@ -53,6 +53,7 @@ final class SourceSingletonService extends ChannelInitializer<SocketChannel> imp
         this.bootstrapSupport = requireNonNull(bootstrapSupport);
         this.dtcs = requireNonNull(dtcs);
         this.listenPort = listenPort;
+        LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
     }
 
     @Override
@@ -112,11 +113,12 @@ final class SourceSingletonService extends ChannelInitializer<SocketChannel> imp
         ch.pipeline()
             .addLast("frameDecoder", new MessageFrameDecoder())
             .addLast("requestHandler", new SourceRequestHandler(dtcs))
-            .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()))
-            .addLast("frameEncoder", MessageFrameEncoder.instance());
+            // Output, in reverse order
+            .addLast("frameEncoder", MessageFrameEncoder.INSTANCE)
+            .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()));
         children.add(ch);
 
-        LOG.debug("Channel {} established", ch);
+        LOG.info("Channel {} established", ch);
     }
 
     private static ListenableFuture<Void> closeChannel(final Channel ch) {
diff --git a/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java b/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java
new file mode 100644 (file)
index 0000000..6261bd3
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.net.Inet4Address;
+import java.time.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.opendaylight.mdsal.binding.dom.adapter.test.AbstractDataBrokerTest;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.eos.dom.simple.SimpleDOMEntityOwnershipService;
+import org.opendaylight.mdsal.singleton.dom.impl.DOMClusterSingletonServiceProviderImpl;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class IntegrationTest extends AbstractDataBrokerTest {
+    private static final int TEST_PORT = 4000;
+
+    private AbstractBootstrapSupport support;
+    private DOMClusterSingletonServiceProviderImpl css;
+
+    @Before
+    public void before() {
+        support = AbstractBootstrapSupport.create();
+        css = new DOMClusterSingletonServiceProviderImpl(new SimpleDOMEntityOwnershipService());
+        css.initializeProvider();
+    }
+
+    @After
+    public void after() throws InterruptedException {
+        support.close();
+        css.close();
+    }
+
+    @Test
+    public void testSourceToSink() throws InterruptedException {
+        // Make sure to start source...
+        final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT);
+        // ... and give it some time start up and open up the port
+        Thread.sleep(1000);
+
+        // Mocking for sink...
+        final DOMTransactionChain sinkChain = mock(DOMTransactionChain.class);
+        final DOMDataTreeWriteTransaction sinkTx = mock(DOMDataTreeWriteTransaction.class);
+        doReturn(CommitInfo.emptyFluentFuture()).when(sinkTx).commit();
+        doReturn(sinkTx).when(sinkChain).newWriteOnlyTransaction();
+        final DOMDataBroker sinkBroker = mock(DOMDataBroker.class);
+        doReturn(sinkChain).when(sinkBroker).createMergingTransactionChain(any());
+
+        // Kick of the sink ...
+        final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
+            Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO);
+        // ... and sync on it starting up
+        verify(sinkBroker, timeout(1000)).createMergingTransactionChain(any());
+
+        // FIXME: add a few writes to the broker so we have multiple transactions and verify deltas
+
+        verify(sinkChain, timeout(2000)).newWriteOnlyTransaction();
+        verify(sinkTx, timeout(1000)).put(eq(LogicalDatastoreType.CONFIGURATION), eq(YangInstanceIdentifier.empty()),
+            any(ContainerNode.class));
+        verify(sinkTx, timeout(1000)).commit();
+
+        sink.close();
+        source.close();
+    }
+}