Netty Replicator - improve the reconnection and keepalive mechanisms
[mdsal.git] / replicate / mdsal-replicate-netty / src / main / java / org / opendaylight / mdsal / replicate / netty / SinkSingletonService.java
index 43d93d7fb0606ad32dec030943c46292ba63d2cd..98e154e3d8f7a6f58983474d4552c7316c09f3d0 100644 (file)
@@ -16,11 +16,11 @@ import io.netty.buffer.ByteBufOutputStream;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Duration;
@@ -47,6 +47,7 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     // TODO: allow different trees?
     private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
         YangInstanceIdentifier.empty());
+    private static long CHANNEL_CLOSE_TIMEOUT_S = 10;
     private static final ByteBuf TREE_REQUEST;
 
     static {
@@ -61,18 +62,23 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     private final DOMDataBroker dataBroker;
     private final InetSocketAddress sourceAddress;
     private final Duration reconnectDelay;
+    private final int maxMissedKeepalives;
     private final Duration keepaliveInterval;
 
     @GuardedBy("this")
     private ChannelFuture futureChannel;
+    private boolean closingInstance;
+    private Bootstrap bs;
 
     SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
-            final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval) {
+            final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval,
+            final int maxMissedKeepalives) {
         this.bootstrapSupport = requireNonNull(bootstrapSupport);
         this.dataBroker = requireNonNull(dataBroker);
         this.sourceAddress = requireNonNull(sourceAddress);
         this.reconnectDelay = requireNonNull(reconnectDelay);
         this.keepaliveInterval = requireNonNull(keepaliveInterval);
+        this.maxMissedKeepalives = maxMissedKeepalives;
         LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
     }
 
@@ -84,61 +90,95 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     @Override
     public synchronized void instantiateServiceInstance() {
         LOG.info("Replication sink started with source {}", sourceAddress);
+        this.bs = bootstrapSupport.newBootstrap();
         doConnect();
     }
 
     @Holding("this")
     private void doConnect() {
         LOG.info("Connecting to Source");
-        final Bootstrap bs = bootstrapSupport.newBootstrap();
         final ScheduledExecutorService group = bs.config().group();
 
         futureChannel = bs
             .option(ChannelOption.SO_KEEPALIVE, true)
             .handler(this)
             .connect(sourceAddress, null);
-
-        futureChannel.addListener(compl -> channelResolved(compl, group));
+        futureChannel.addListener((ChannelFutureListener) future -> channelResolved(future, group));
     }
 
     @Override
     public synchronized ListenableFuture<?> closeServiceInstance() {
+        closingInstance = true;
         if (futureChannel == null) {
             return FluentFutures.immediateNullFluentFuture();
         }
 
-        // FIXME: this is not really immediate. We also should be closing the resulting channel
-        return FluentFutures.immediateBooleanFluentFuture(futureChannel.cancel(true));
+        return FluentFutures.immediateBooleanFluentFuture(disconnect());
     }
 
     private synchronized void reconnect() {
+        disconnect();
         doConnect();
     }
 
+    private synchronized boolean disconnect() {
+        boolean shutdownSuccess = true;
+        final Channel channel = futureChannel.channel();
+        if (channel != null && channel.isActive()) {
+            try {
+                // close the resulting channel. Even when this triggers the closeFuture, it won't try to reconnect since
+                // the closingInstance flag is set
+                channel.close().await(CHANNEL_CLOSE_TIMEOUT_S, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                LOG.error("The channel didn't close properly within {} seconds", CHANNEL_CLOSE_TIMEOUT_S);
+                shutdownSuccess = false;
+            }
+        }
+        shutdownSuccess &= futureChannel.cancel(true);
+        futureChannel = null;
+        return shutdownSuccess;
+    }
+
     @Override
     protected void initChannel(final SocketChannel ch) {
         ch.pipeline()
-            .addLast("idleStateHandler", new IdleStateHandler(keepaliveInterval.toNanos(), 0, 0, TimeUnit.NANOSECONDS))
-            .addLast("keepaliveHandler", new KeepaliveHandler(this::reconnect))
             .addLast("frameDecoder", new MessageFrameDecoder())
+            .addLast("idleStateHandler", new IdleStateHandler(
+                keepaliveInterval.toNanos() * maxMissedKeepalives, 0, 0, TimeUnit.NANOSECONDS))
+            .addLast("keepaliveHandler", new SinkKeepaliveHandler())
             .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
                 new SinkTransactionChainListener(ch))))
             .addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
     }
 
-    private synchronized void channelResolved(final Future<?> completedFuture, final ScheduledExecutorService group) {
-        final Throwable cause = completedFuture.cause();
-        if (cause != null) {
-            LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
-            group.schedule(() -> {
-                reconnect();
-            }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
-            return;
+    private synchronized void channelResolved(final ChannelFuture completedFuture,
+        final ScheduledExecutorService group) {
+        if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
+            if (completedFuture.isSuccess()) {
+                final Channel ch = completedFuture.channel();
+                LOG.info("Channel {} established", ch);
+                ch.closeFuture().addListener((ChannelFutureListener) future -> channelClosed(future, group));
+                ch.writeAndFlush(TREE_REQUEST);
+            } else {
+                LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress,
+                    reconnectDelay.getSeconds(), completedFuture.cause());
+                group.schedule(() -> {
+                    reconnect();
+                }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
+            }
         }
+    }
 
-        final Channel ch = futureChannel.channel();
-        LOG.info("Channel {} established", ch);
-        ch.writeAndFlush(TREE_REQUEST);
+    private synchronized void channelClosed(final ChannelFuture completedFuture, final ScheduledExecutorService group) {
+        if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
+            if (!closingInstance) {
+                LOG.info("Channel {} lost connection to source {}, reconnecting in {}", completedFuture.channel(),
+                    sourceAddress, reconnectDelay.getSeconds());
+                group.schedule(() -> {
+                    reconnect();
+                }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
+            }
+        }
     }
 
     private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException {