Netty Replicator - improve the reconnection and keepalive mechanisms 40/90740/4
authorTibor Král <tibor.kral@pantheon.tech>
Tue, 30 Jun 2020 01:29:04 +0000 (03:29 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 9 Jul 2020 16:09:04 +0000 (16:09 +0000)
In some cases during a network partition the disconnected
channel got closed with delay after a new channel was already created.
This started reconnection process which closed the new channel and
created yet another one.

Also improve the keepalive mechanism since some types of network
partitions left one side unaware of the issue.  It is important both
the Sink and the Source are notified about any connection issue as
soon as possible. PING-PONG messages are exchanged between the two
sides when no deltas are sent for a period of time

Configuration options keepalive-interval-seconds and
max-missed-keepalives added to both configurations.

Change-Id: Iebde72963bddb748ab97617d07cfc77cd8614da4
Signed-off-by: Tibor Král <tibor.kral@pantheon.tech>
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit b94fbc60c0b41da2f6645ab51188fbcdfa74e4af)

14 files changed:
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractKeepaliveHandler.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveException.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkKeepaliveHandler.java [moved from replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java with 50% similarity]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceKeepaliveHandler.java [new file with mode: 0644]
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java
replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml
replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-sink.xml
replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-source.xml
replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java

diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractKeepaliveHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractKeepaliveHandler.java
new file mode 100644 (file)
index 0000000..ced8884
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractKeepaliveHandler extends ChannelDuplexHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractKeepaliveHandler.class);
+
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+        LOG.warn("Closing channel {} due to an error", ctx.channel(), cause);
+        ctx.close();
+    }
+}
index e7f42eb19c1a2c31acd322f0dee6fd5401d891e8..8041640ebce19d8ab7c136a11c21c208455a5b3d 100644 (file)
@@ -36,6 +36,10 @@ final class Constants {
      * Verify the connection is alive.
      */
     static final int MSG_PING           = 5;
+    /**
+     * Verify the connection is alive.
+     */
+    static final int MSG_PONG           = 6;
 
     /**
      * Length of the length field in each transmitted frame.
@@ -54,6 +58,9 @@ final class Constants {
     static final ByteBuf PING = Unpooled.unreleasableBuffer(
         Unpooled.wrappedBuffer(new byte[] { MSG_PING }));
 
+    static final ByteBuf PONG = Unpooled.unreleasableBuffer(
+        Unpooled.wrappedBuffer(new byte[] { MSG_PONG }));
+
     private Constants() {
         // Hidden on purpose
     }
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveException.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveException.java
new file mode 100644 (file)
index 0000000..a4b4029
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+final class KeepaliveException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    KeepaliveException(final int missedKeepalives) {
+        super("Keepalive Exchange Failed - missed " + missedKeepalives + " pings");
+    }
+}
index 13f9c99eda4663c4f40942a234f703237d1604a2..14aed6437aa23fe9f5e7b4b8af71f80b6410f035 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.mdsal.replicate.netty;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Verify.verify;
 
 import java.net.InetAddress;
@@ -37,20 +38,22 @@ public final class NettyReplication {
     public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
             final ClusterSingletonServiceProvider singletonService, final boolean enabled,
             final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay,
-            final Duration keepaliveInterval) {
+        final Duration keepaliveInterval, final int maxMissedKeepalives) {
         LOG.debug("Sink {}", enabled ? "enabled" : "disabled");
+        checkArgument(maxMissedKeepalives > 0, "max-missed-keepalives %s must be greater than 0", maxMissedKeepalives);
         return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport,
-            dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay, keepaliveInterval))
-                : new Disabled();
+            dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay, keepaliveInterval,
+            maxMissedKeepalives)) : new Disabled();
     }
 
     public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
-            final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort) {
+            final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort,
+        final Duration keepaliveInterval, final int maxMissedKeepalives) {
         LOG.debug("Source {}", enabled ? "enabled" : "disabled");
         final DOMDataTreeChangeService dtcs = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
         verify(dtcs != null, "Missing DOMDataTreeChangeService in broker %s", dataBroker);
-
+        checkArgument(maxMissedKeepalives > 0, "max-missed-keepalives %s must be greater than 0", maxMissedKeepalives);
         return enabled ? singletonService.registerClusterSingletonService(new SourceSingletonService(bootstrapSupport,
-            dtcs, listenPort)) : new Disabled();
+            dtcs, listenPort, keepaliveInterval, maxMissedKeepalives)) : new Disabled();
     }
 }
similarity index 50%
rename from replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java
rename to replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkKeepaliveHandler.java
index 4803d3e59d57470ee4a29339260471ba431fd2f1..753bfe0c464b41f7a8b8bc5a30713982972c856b 100644 (file)
@@ -7,35 +7,21 @@
  */
 package org.opendaylight.mdsal.replicate.netty;
 
-import static java.util.Objects.requireNonNull;
-
-import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.timeout.IdleStateEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class KeepaliveHandler extends ChannelDuplexHandler {
-    private static final Logger LOG = LoggerFactory.getLogger(KeepaliveHandler.class);
-
-    private final Runnable reconnect;
-
-    KeepaliveHandler(final Runnable reconnectCallback) {
-        this.reconnect = requireNonNull(reconnectCallback, "Reconnect callback should not be null");
-    }
+final class SinkKeepaliveHandler extends AbstractKeepaliveHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(SinkKeepaliveHandler.class);
 
     @Override
     public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
         if (evt instanceof IdleStateEvent) {
-            ctx.writeAndFlush(Constants.PING);
+            LOG.debug("IdleStateEvent received. Closing channel {}.", ctx.channel());
+            ctx.close();
         } else {
             ctx.fireUserEventTriggered(evt);
         }
     }
-
-    @Override
-    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
-        LOG.warn("There was an exception on the channel. Attempting to reconnect.", cause);
-        reconnect.run();
-    }
 }
index 0b65bc16eeeb9a10ac18256eb2df7cbaa9e60707..24904eaaa982bd70138c82202b79e377ff3e0f1c 100644 (file)
@@ -70,6 +70,10 @@ final class SinkRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
             case Constants.MSG_DTC_APPLY:
                 handleDtcApply();
                 break;
+            case Constants.MSG_PING:
+                LOG.trace("Received PING from Source, sending PONG");
+                ctx.channel().writeAndFlush(Constants.PONG);
+                break;
             default:
                 throw new IllegalStateException("Unexpected message type " + msgType);
         }
index 43d93d7fb0606ad32dec030943c46292ba63d2cd..98e154e3d8f7a6f58983474d4552c7316c09f3d0 100644 (file)
@@ -16,11 +16,11 @@ import io.netty.buffer.ByteBufOutputStream;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.time.Duration;
@@ -47,6 +47,7 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     // TODO: allow different trees?
     private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
         YangInstanceIdentifier.empty());
+    private static long CHANNEL_CLOSE_TIMEOUT_S = 10;
     private static final ByteBuf TREE_REQUEST;
 
     static {
@@ -61,18 +62,23 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     private final DOMDataBroker dataBroker;
     private final InetSocketAddress sourceAddress;
     private final Duration reconnectDelay;
+    private final int maxMissedKeepalives;
     private final Duration keepaliveInterval;
 
     @GuardedBy("this")
     private ChannelFuture futureChannel;
+    private boolean closingInstance;
+    private Bootstrap bs;
 
     SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
-            final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval) {
+            final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval,
+            final int maxMissedKeepalives) {
         this.bootstrapSupport = requireNonNull(bootstrapSupport);
         this.dataBroker = requireNonNull(dataBroker);
         this.sourceAddress = requireNonNull(sourceAddress);
         this.reconnectDelay = requireNonNull(reconnectDelay);
         this.keepaliveInterval = requireNonNull(keepaliveInterval);
+        this.maxMissedKeepalives = maxMissedKeepalives;
         LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
     }
 
@@ -84,61 +90,95 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     @Override
     public synchronized void instantiateServiceInstance() {
         LOG.info("Replication sink started with source {}", sourceAddress);
+        this.bs = bootstrapSupport.newBootstrap();
         doConnect();
     }
 
     @Holding("this")
     private void doConnect() {
         LOG.info("Connecting to Source");
-        final Bootstrap bs = bootstrapSupport.newBootstrap();
         final ScheduledExecutorService group = bs.config().group();
 
         futureChannel = bs
             .option(ChannelOption.SO_KEEPALIVE, true)
             .handler(this)
             .connect(sourceAddress, null);
-
-        futureChannel.addListener(compl -> channelResolved(compl, group));
+        futureChannel.addListener((ChannelFutureListener) future -> channelResolved(future, group));
     }
 
     @Override
     public synchronized ListenableFuture<?> closeServiceInstance() {
+        closingInstance = true;
         if (futureChannel == null) {
             return FluentFutures.immediateNullFluentFuture();
         }
 
-        // FIXME: this is not really immediate. We also should be closing the resulting channel
-        return FluentFutures.immediateBooleanFluentFuture(futureChannel.cancel(true));
+        return FluentFutures.immediateBooleanFluentFuture(disconnect());
     }
 
     private synchronized void reconnect() {
+        disconnect();
         doConnect();
     }
 
+    private synchronized boolean disconnect() {
+        boolean shutdownSuccess = true;
+        final Channel channel = futureChannel.channel();
+        if (channel != null && channel.isActive()) {
+            try {
+                // close the resulting channel. Even when this triggers the closeFuture, it won't try to reconnect since
+                // the closingInstance flag is set
+                channel.close().await(CHANNEL_CLOSE_TIMEOUT_S, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                LOG.error("The channel didn't close properly within {} seconds", CHANNEL_CLOSE_TIMEOUT_S);
+                shutdownSuccess = false;
+            }
+        }
+        shutdownSuccess &= futureChannel.cancel(true);
+        futureChannel = null;
+        return shutdownSuccess;
+    }
+
     @Override
     protected void initChannel(final SocketChannel ch) {
         ch.pipeline()
-            .addLast("idleStateHandler", new IdleStateHandler(keepaliveInterval.toNanos(), 0, 0, TimeUnit.NANOSECONDS))
-            .addLast("keepaliveHandler", new KeepaliveHandler(this::reconnect))
             .addLast("frameDecoder", new MessageFrameDecoder())
+            .addLast("idleStateHandler", new IdleStateHandler(
+                keepaliveInterval.toNanos() * maxMissedKeepalives, 0, 0, TimeUnit.NANOSECONDS))
+            .addLast("keepaliveHandler", new SinkKeepaliveHandler())
             .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
                 new SinkTransactionChainListener(ch))))
             .addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
     }
 
-    private synchronized void channelResolved(final Future<?> completedFuture, final ScheduledExecutorService group) {
-        final Throwable cause = completedFuture.cause();
-        if (cause != null) {
-            LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
-            group.schedule(() -> {
-                reconnect();
-            }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
-            return;
+    private synchronized void channelResolved(final ChannelFuture completedFuture,
+        final ScheduledExecutorService group) {
+        if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
+            if (completedFuture.isSuccess()) {
+                final Channel ch = completedFuture.channel();
+                LOG.info("Channel {} established", ch);
+                ch.closeFuture().addListener((ChannelFutureListener) future -> channelClosed(future, group));
+                ch.writeAndFlush(TREE_REQUEST);
+            } else {
+                LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress,
+                    reconnectDelay.getSeconds(), completedFuture.cause());
+                group.schedule(() -> {
+                    reconnect();
+                }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
+            }
         }
+    }
 
-        final Channel ch = futureChannel.channel();
-        LOG.info("Channel {} established", ch);
-        ch.writeAndFlush(TREE_REQUEST);
+    private synchronized void channelClosed(final ChannelFuture completedFuture, final ScheduledExecutorService group) {
+        if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
+            if (!closingInstance) {
+                LOG.info("Channel {} lost connection to source {}, reconnecting in {}", completedFuture.channel(),
+                    sourceAddress, reconnectDelay.getSeconds());
+                group.schedule(() -> {
+                    reconnect();
+                }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
+            }
+        }
     }
 
     private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException {
diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceKeepaliveHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceKeepaliveHandler.java
new file mode 100644 (file)
index 0000000..d98c23a
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleStateEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SourceKeepaliveHandler extends AbstractKeepaliveHandler {
+    private static final Logger LOG = LoggerFactory.getLogger(SourceKeepaliveHandler.class);
+
+    private final int maxMissedKeepalives;
+
+    private int pingsSinceLastContact;
+
+    SourceKeepaliveHandler(final int maxMissedKeepalives) {
+        this.maxMissedKeepalives = maxMissedKeepalives;
+    }
+
+    /**
+     * Intercept messages from the Sink. Reset the pingsSinceLastContact counter and forward the message.
+     */
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+        pingsSinceLastContact = 0;
+        ctx.fireChannelRead(msg);
+    }
+
+    /**
+     * If the IdleStateEvent was fired, it means the Source has not written anything to the Sink for the duration
+     * specified by the keepalive-interval. PING will be sent and pingsSinceLastContact incremented.
+     * If pingsSinceLastContact reaches max-missed-keepalives a KeepaliveException will be raised and channel closed.
+     */
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+        if (evt instanceof IdleStateEvent) {
+            LOG.trace("IdleStateEvent received. Sending PING to sink");
+            if (pingsSinceLastContact > maxMissedKeepalives) {
+                ctx.fireExceptionCaught(new KeepaliveException(maxMissedKeepalives));
+            }
+            ctx.channel().writeAndFlush(Constants.PING);
+        } else {
+            ctx.fireUserEventTriggered(evt);
+        }
+    }
+}
index b7811f01845eca02166532d4bd18b1120a550ce3..2d2065d50d4657aee58ccef5e6b38bc73080bf3a 100644 (file)
@@ -43,7 +43,7 @@ final class SourceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) {
-        LOG.trace("Channel {} going inactive", ctx.channel());
+        LOG.info("Channel {} going inactive", ctx.channel());
         if (reg != null) {
             reg.close();
             reg = null;
@@ -62,11 +62,19 @@ final class SourceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> {
             case Constants.MSG_SUBSCRIBE_REQ:
                 subscribe(channel, msg);
                 break;
+            case Constants.MSG_PONG:
+                break;
             default:
                 throw new IllegalStateException("Unexpected message type " + msgType);
         }
     }
 
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+        LOG.warn("Closing channel {} due to an error", ctx.channel(), cause);
+        ctx.close();
+    }
+
     private void subscribe(final Channel channel, final ByteBuf msg) throws IOException {
         verify(reg == null, "Unexpected subscription when already subscribed");
 
index 2d2b35ecfb851f5c08fb06511e9292063459fc52..d707fd1407e42f21f8791cefaf77d89c7d9c79cf 100644 (file)
@@ -18,10 +18,13 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
@@ -45,14 +48,18 @@ final class SourceSingletonService extends ChannelInitializer<SocketChannel> imp
 
     @GuardedBy("this")
     private final Collection<SocketChannel> children = new HashSet<>();
+    private final Duration keepaliveInterval;
+    private final int maxMissedKeepalives;
     @GuardedBy("this")
     private Channel serverChannel;
 
     SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
-            final int listenPort) {
+            final int listenPort, final Duration keepaliveInterval, final int maxMissedKeepalives) {
         this.bootstrapSupport = requireNonNull(bootstrapSupport);
         this.dtcs = requireNonNull(dtcs);
         this.listenPort = listenPort;
+        this.keepaliveInterval = requireNonNull(keepaliveInterval);
+        this.maxMissedKeepalives = maxMissedKeepalives;
         LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
     }
 
@@ -112,6 +119,8 @@ final class SourceSingletonService extends ChannelInitializer<SocketChannel> imp
 
         ch.pipeline()
             .addLast("frameDecoder", new MessageFrameDecoder())
+            .addLast("idleStateHandler", new IdleStateHandler(0, keepaliveInterval.toNanos(), 0, TimeUnit.NANOSECONDS))
+            .addLast("keepaliveHandler", new SourceKeepaliveHandler(maxMissedKeepalives))
             .addLast("requestHandler", new SourceRequestHandler(dtcs))
             // Output, in reverse order
             .addLast("frameEncoder", MessageFrameEncoder.INSTANCE)
index e394dfa253e209ba19201e103aaf5d67fd148abd..7eb0bca2c64f39a1acac8bd4b601d7ceb9886df1 100644 (file)
@@ -1,7 +1,14 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
-           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0">
-
+           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+           xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
+    <cm:property-placeholder persistent-id="org.opendaylight.mdsal.replicate.netty.common" update-strategy="none"
+                             placeholder-prefix="$common(" placeholder-suffix=")">
+        <cm:default-properties>
+            <cm:property name="keepalive-interval-seconds" value="10"/>
+            <cm:property name="max-missed-keepalives" value="5"/>
+        </cm:default-properties>
+    </cm:property-placeholder>
     <reference id="dataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker" odl:type="default"/>
 
     <reference id="singletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"
index 7aefce2e3538a4a14f447cb2065790a2eec9267c..c1ed8ef0b71f5052eb6ac1a1f96b3c87ccf1c611 100644 (file)
@@ -8,7 +8,8 @@
       <cm:property name="source-host" value="127.0.0.1"/>
       <cm:property name="source-port" value="9999"/>
       <cm:property name="reconnect-delay-millis" value="3000"/>
-      <cm:property name="keepalive-interval-seconds" value="10"/>
+      <cm:property name="keepalive-interval-seconds" value="$common(keepalive-interval-seconds)"/>
+      <cm:property name="max-missed-keepalives" value="$common(max-missed-keepalives)"/>
     </cm:default-properties>
   </cm:property-placeholder>
 
     <argument value="$(reconnect-delay-millis)"/>
   </bean>
 
-  <bean id="keepaliveInterval" class="java.time.Duration" factory-method="ofSeconds">
-    <argument value="$(keepalive-interval-seconds)"/>
-  </bean>
-
   <bean id="sourceAddress" class="java.net.InetAddress" factory-method="getByName">
     <argument value="$(source-host)"/>
   </bean>
 
+  <bean id="keepaliveInt" class="java.time.Duration" factory-method="ofSeconds">
+    <argument value="$(keepalive-interval-seconds)"/>
+  </bean>
+
   <bean id="nettyReplicationSink" class="org.opendaylight.mdsal.replicate.netty.NettyReplication"
         factory-method="createSink" destroy-method="close">
     <argument ref="bootstrapSupport"/>
@@ -33,7 +34,8 @@
     <argument ref="sourceAddress"/>
     <argument value="$(source-port)"/>
     <argument ref="reconnectDelay"/>
-    <argument ref="keepaliveInterval"/>
+    <argument ref="keepaliveInt"/>
+    <argument value="$(max-missed-keepalives)"/>
   </bean>
 
 </blueprint>
index 41b4e106ad7418bc8e0fd8749610b3942f1a68ff..48545feefbf0bea95e6129557a780dc640256734 100644 (file)
@@ -5,9 +5,15 @@
     <cm:default-properties>
       <cm:property name="enabled" value="false"/>
       <cm:property name="listen-port" value="9999"/>
+      <cm:property name="keepalive-interval-seconds" value="$common(keepalive-interval-seconds)"/>
+      <cm:property name="max-missed-keepalives" value="$common(max-missed-keepalives)"/>
     </cm:default-properties>
   </cm:property-placeholder>
 
+  <bean id="keepaliveInterval" class="java.time.Duration" factory-method="ofSeconds">
+    <argument value="${keepalive-interval-seconds}"/>
+  </bean>
+
   <bean id="nettyReplicationSource" class="org.opendaylight.mdsal.replicate.netty.NettyReplication"
         factory-method="createSource" destroy-method="close">
     <argument ref="bootstrapSupport"/>
@@ -15,5 +21,7 @@
     <argument ref="singletonServiceProvider"/>
     <argument value="${enabled}"/>
     <argument value="${listen-port}"/>
+    <argument ref="keepaliveInterval"/>
+    <argument value="${max-missed-keepalives}"/>
   </bean>
 </blueprint>
index 08bc369ce16841956b28dbdedfe9ca3cc1419cd6..5f0a06b082f5751faadc781d75f20cc82beffd4f 100644 (file)
@@ -81,7 +81,8 @@ public class IntegrationTest extends AbstractDataBrokerTest {
     @Test
     public void testSourceToSink() throws InterruptedException, ExecutionException {
         // Make sure to start source...
-        final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT);
+        final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT,
+            Duration.ZERO, 5);
         // ... and give it some time start up and open up the port
         Thread.sleep(1000);
 
@@ -95,7 +96,7 @@ public class IntegrationTest extends AbstractDataBrokerTest {
 
         // Kick of the sink ...
         final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
-            Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO);
+            Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3);
         // ... and sync on it starting up
 
         // verify the connection was established and MSG_EMPTY_DATA was transferred
@@ -126,7 +127,8 @@ public class IntegrationTest extends AbstractDataBrokerTest {
         generateModification(getDataBroker(), deltaCount);
 
         // Make sure to start source...
-        final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT);
+        final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT,
+            Duration.ZERO, 5);
         // ... and give it some time start up and open up the port
         Thread.sleep(1000);
 
@@ -140,7 +142,7 @@ public class IntegrationTest extends AbstractDataBrokerTest {
 
         // Kick of the sink ...
         final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
-            Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO);
+            Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3);
         // ... and sync on it starting up
 
         // verify the connection was established and MSG_EMPTY_DATA was transferred