Provide auto-reconnection for Sink 37/90737/1
authorTibor Král <tibor.kral@pantheon.tech>
Thu, 25 Jun 2020 16:17:55 +0000 (18:17 +0200)
committerRobert Varga <nite@hq.sk>
Mon, 29 Jun 2020 11:31:35 +0000 (11:31 +0000)
In case there is a network partition the Sink needs to
be aware of it and schedule reconnect. This patch adds
configuration knob keepalive-interval-seconds to tweak
how soon after network failure the Sink gets notified.

Change-Id: I1eb2880bb00d1101cd587e4a737ba2f8a485b7ed
Signed-off-by: Tibor Král <tibor.kral@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 2cbcb2d8589805bcb053462176424dae2b53cd1a)

replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java
replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-sink.xml
replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java

index 1fff84f4e5df84f7da57dd967cc6ded774d0c637..e7f42eb19c1a2c31acd322f0dee6fd5401d891e8 100644 (file)
@@ -32,6 +32,10 @@ final class Constants {
      * End-of-DataTreeCandidate serialization stream. The payload is empty.
      */
     static final byte MSG_DTC_APPLY     = 4;
+    /**
+     * Verify the connection is alive.
+     */
+    static final int MSG_PING           = 5;
 
     /**
      * Length of the length field in each transmitted frame.
@@ -47,6 +51,9 @@ final class Constants {
 
     static final ByteBuf DTC_APPLY = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY }));
 
+    static final ByteBuf PING = Unpooled.unreleasableBuffer(
+        Unpooled.wrappedBuffer(new byte[] { MSG_PING }));
+
     private Constants() {
         // Hidden on purpose
     }
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java
new file mode 100644 (file)
index 0000000..4803d3e
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleStateEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KeepaliveHandler extends ChannelDuplexHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(KeepaliveHandler.class);
+
+    private final Runnable reconnect;
+
+    KeepaliveHandler(final Runnable reconnectCallback) {
+        this.reconnect = requireNonNull(reconnectCallback, "Reconnect callback should not be null");
+    }
+
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+        if (evt instanceof IdleStateEvent) {
+            ctx.writeAndFlush(Constants.PING);
+        } else {
+            ctx.fireUserEventTriggered(evt);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+        LOG.warn("There was an exception on the channel. Attempting to reconnect.", cause);
+        reconnect.run();
+    }
+}
index cc70ffad573b9a00af1d1eae38b3cbc8c16a67e9..13f9c99eda4663c4f40942a234f703237d1604a2 100644 (file)
@@ -36,10 +36,12 @@ public final class NettyReplication {
 
     public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
             final ClusterSingletonServiceProvider singletonService, final boolean enabled,
-            final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay) {
+            final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay,
+            final Duration keepaliveInterval) {
         LOG.debug("Sink {}", enabled ? "enabled" : "disabled");
         return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport,
-            dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay)) : new Disabled();
+            dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay, keepaliveInterval))
+                : new Disabled();
     }
 
     public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
index f71e5af1fc82020ef3839f11d6257637489a8eb6..43d93d7fb0606ad32dec030943c46292ba63d2cd 100644 (file)
@@ -19,6 +19,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -26,6 +27,7 @@ import java.time.Duration;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
@@ -59,16 +61,18 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     private final DOMDataBroker dataBroker;
     private final InetSocketAddress sourceAddress;
     private final Duration reconnectDelay;
+    private final Duration keepaliveInterval;
 
     @GuardedBy("this")
     private ChannelFuture futureChannel;
 
     SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
-            final InetSocketAddress sourceAddress, final Duration reconnectDelay) {
+            final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval) {
         this.bootstrapSupport = requireNonNull(bootstrapSupport);
         this.dataBroker = requireNonNull(dataBroker);
         this.sourceAddress = requireNonNull(sourceAddress);
         this.reconnectDelay = requireNonNull(reconnectDelay);
+        this.keepaliveInterval = requireNonNull(keepaliveInterval);
         LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
     }
 
@@ -80,27 +84,42 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     @Override
     public synchronized void instantiateServiceInstance() {
         LOG.info("Replication sink started with source {}", sourceAddress);
+        doConnect();
+    }
 
+    @Holding("this")
+    private void doConnect() {
+        LOG.info("Connecting to Source");
         final Bootstrap bs = bootstrapSupport.newBootstrap();
         final ScheduledExecutorService group = bs.config().group();
 
         futureChannel = bs
-                .option(ChannelOption.SO_KEEPALIVE, true)
-                .handler(this)
-                .connect(sourceAddress, null);
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .handler(this)
+            .connect(sourceAddress, null);
 
         futureChannel.addListener(compl -> channelResolved(compl, group));
     }
 
     @Override
     public synchronized ListenableFuture<?> closeServiceInstance() {
-        // FIXME: initiate orderly shutdown
-        return FluentFutures.immediateNullFluentFuture();
+        if (futureChannel == null) {
+            return FluentFutures.immediateNullFluentFuture();
+        }
+
+        // FIXME: this is not really immediate. We also should be closing the resulting channel
+        return FluentFutures.immediateBooleanFluentFuture(futureChannel.cancel(true));
+    }
+
+    private synchronized void reconnect() {
+        doConnect();
     }
 
     @Override
     protected void initChannel(final SocketChannel ch) {
         ch.pipeline()
+            .addLast("idleStateHandler", new IdleStateHandler(keepaliveInterval.toNanos(), 0, 0, TimeUnit.NANOSECONDS))
+            .addLast("keepaliveHandler", new KeepaliveHandler(this::reconnect))
             .addLast("frameDecoder", new MessageFrameDecoder())
             .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
                 new SinkTransactionChainListener(ch))))
@@ -112,7 +131,7 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
         if (cause != null) {
             LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
             group.schedule(() -> {
-                // FIXME: perform reconnect
+                reconnect();
             }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
             return;
         }
index a5bbbc00635bb173df11d6f4b65cedf5477b0643..7aefce2e3538a4a14f447cb2065790a2eec9267c 100644 (file)
@@ -8,6 +8,7 @@
       <cm:property name="source-host" value="127.0.0.1"/>
       <cm:property name="source-port" value="9999"/>
       <cm:property name="reconnect-delay-millis" value="3000"/>
+      <cm:property name="keepalive-interval-seconds" value="10"/>
     </cm:default-properties>
   </cm:property-placeholder>
 
     <argument value="$(reconnect-delay-millis)"/>
   </bean>
 
+  <bean id="keepaliveInterval" class="java.time.Duration" factory-method="ofSeconds">
+    <argument value="$(keepalive-interval-seconds)"/>
+  </bean>
+
   <bean id="sourceAddress" class="java.net.InetAddress" factory-method="getByName">
     <argument value="$(source-host)"/>
   </bean>
@@ -28,6 +33,7 @@
     <argument ref="sourceAddress"/>
     <argument value="$(source-port)"/>
     <argument ref="reconnectDelay"/>
+    <argument ref="keepaliveInterval"/>
   </bean>
 
 </blueprint>
index e688bd5e9bff45d52b10501a3cf929bcd3c7805b..08bc369ce16841956b28dbdedfe9ca3cc1419cd6 100644 (file)
@@ -95,7 +95,7 @@ public class IntegrationTest extends AbstractDataBrokerTest {
 
         // Kick of the sink ...
         final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
-            Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO);
+            Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO);
         // ... and sync on it starting up
 
         // verify the connection was established and MSG_EMPTY_DATA was transferred
@@ -140,7 +140,7 @@ public class IntegrationTest extends AbstractDataBrokerTest {
 
         // Kick of the sink ...
         final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
-            Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO);
+            Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO);
         // ... and sync on it starting up
 
         // verify the connection was established and MSG_EMPTY_DATA was transferred