Provide auto-reconnection for Sink
[mdsal.git] / replicate / mdsal-replicate-netty / src / main / java / org / opendaylight / mdsal / replicate / netty / SinkSingletonService.java
index f71e5af1fc82020ef3839f11d6257637489a8eb6..43d93d7fb0606ad32dec030943c46292ba63d2cd 100644 (file)
@@ -19,6 +19,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -26,6 +27,7 @@ import java.time.Duration;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
@@ -59,16 +61,18 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     private final DOMDataBroker dataBroker;
     private final InetSocketAddress sourceAddress;
     private final Duration reconnectDelay;
+    private final Duration keepaliveInterval;
 
     @GuardedBy("this")
     private ChannelFuture futureChannel;
 
     SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
-            final InetSocketAddress sourceAddress, final Duration reconnectDelay) {
+            final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval) {
         this.bootstrapSupport = requireNonNull(bootstrapSupport);
         this.dataBroker = requireNonNull(dataBroker);
         this.sourceAddress = requireNonNull(sourceAddress);
         this.reconnectDelay = requireNonNull(reconnectDelay);
+        this.keepaliveInterval = requireNonNull(keepaliveInterval);
         LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
     }
 
@@ -80,27 +84,42 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     @Override
     public synchronized void instantiateServiceInstance() {
         LOG.info("Replication sink started with source {}", sourceAddress);
+        doConnect();
+    }
 
+    @Holding("this")
+    private void doConnect() {
+        LOG.info("Connecting to Source");
         final Bootstrap bs = bootstrapSupport.newBootstrap();
         final ScheduledExecutorService group = bs.config().group();
 
         futureChannel = bs
-                .option(ChannelOption.SO_KEEPALIVE, true)
-                .handler(this)
-                .connect(sourceAddress, null);
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .handler(this)
+            .connect(sourceAddress, null);
 
         futureChannel.addListener(compl -> channelResolved(compl, group));
     }
 
     @Override
     public synchronized ListenableFuture<?> closeServiceInstance() {
-        // FIXME: initiate orderly shutdown
-        return FluentFutures.immediateNullFluentFuture();
+        if (futureChannel == null) {
+            return FluentFutures.immediateNullFluentFuture();
+        }
+
+        // FIXME: this is not really immediate. We also should be closing the resulting channel
+        return FluentFutures.immediateBooleanFluentFuture(futureChannel.cancel(true));
+    }
+
+    private synchronized void reconnect() {
+        doConnect();
     }
 
     @Override
     protected void initChannel(final SocketChannel ch) {
         ch.pipeline()
+            .addLast("idleStateHandler", new IdleStateHandler(keepaliveInterval.toNanos(), 0, 0, TimeUnit.NANOSECONDS))
+            .addLast("keepaliveHandler", new KeepaliveHandler(this::reconnect))
             .addLast("frameDecoder", new MessageFrameDecoder())
             .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
                 new SinkTransactionChainListener(ch))))
@@ -112,7 +131,7 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
         if (cause != null) {
             LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
             group.schedule(() -> {
-                // FIXME: perform reconnect
+                reconnect();
             }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
             return;
         }