Add a unit test for netty replication
[mdsal.git] / replicate / mdsal-replicate-netty / src / main / java / org / opendaylight / mdsal / replicate / netty / SinkSingletonService.java
index c2b5f4a83822488efb4be54903adf09b90a3139c..f71e5af1fc82020ef3839f11d6257637489a8eb6 100644 (file)
@@ -16,25 +16,29 @@ import io.netty.buffer.ByteBufOutputStream;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.Future;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class SinkSingletonService implements ClusterSingletonService {
+final class SinkSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
     private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class);
     private static final ServiceGroupIdentifier SGID =
             ServiceGroupIdentifier.create(SinkSingletonService.class.getName());
@@ -45,7 +49,7 @@ final class SinkSingletonService implements ClusterSingletonService {
 
     static {
         try {
-            TREE_REQUEST = requestTree(TREE);
+            TREE_REQUEST = Unpooled.unreleasableBuffer(requestTree(TREE));
         } catch (IOException e) {
             throw new ExceptionInInitializerError(e);
         }
@@ -65,6 +69,7 @@ final class SinkSingletonService implements ClusterSingletonService {
         this.dataBroker = requireNonNull(dataBroker);
         this.sourceAddress = requireNonNull(sourceAddress);
         this.reconnectDelay = requireNonNull(reconnectDelay);
+        LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
     }
 
     @Override
@@ -81,6 +86,7 @@ final class SinkSingletonService implements ClusterSingletonService {
 
         futureChannel = bs
                 .option(ChannelOption.SO_KEEPALIVE, true)
+                .handler(this)
                 .connect(sourceAddress, null);
 
         futureChannel.addListener(compl -> channelResolved(compl, group));
@@ -88,30 +94,39 @@ final class SinkSingletonService implements ClusterSingletonService {
 
     @Override
     public synchronized ListenableFuture<?> closeServiceInstance() {
-        // TODO Auto-generated method stub
-        return null;
+        // FIXME: initiate orderly shutdown
+        return FluentFutures.immediateNullFluentFuture();
+    }
+
+    @Override
+    protected void initChannel(final SocketChannel ch) {
+        ch.pipeline()
+            .addLast("frameDecoder", new MessageFrameDecoder())
+            .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
+                new SinkTransactionChainListener(ch))))
+            .addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
     }
 
     private synchronized void channelResolved(final Future<?> completedFuture, final ScheduledExecutorService group) {
-        if (completedFuture != futureChannel) {
-            // Future changed, this callback is irrelevant
+        final Throwable cause = completedFuture.cause();
+        if (cause != null) {
+            LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
+            group.schedule(() -> {
+                // FIXME: perform reconnect
+            }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
             return;
         }
 
-        final Channel channel = futureChannel.channel();
-        channel.pipeline()
-            .addLast("frameDecoder", new MessageFrameDecoder())
-            .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
-                new SinkTransactionChainListener(channel))))
-            .addLast("frameEncoder", MessageFrameEncoder.instance());
-
-        channel.writeAndFlush(TREE_REQUEST);
+        final Channel ch = futureChannel.channel();
+        LOG.info("Channel {} established", ch);
+        ch.writeAndFlush(TREE_REQUEST);
     }
 
     private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException {
         final ByteBuf ret = Unpooled.buffer();
 
         try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) {
+            stream.writeByte(Constants.MSG_SUBSCRIBE_REQ);
             try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) {
                 tree.getDatastoreType().writeTo(output);
                 output.writeYangInstanceIdentifier(tree.getRootIdentifier());