From b94fbc60c0b41da2f6645ab51188fbcdfa74e4af Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tibor=20Kr=C3=A1l?= Date: Tue, 30 Jun 2020 03:29:04 +0200 Subject: [PATCH] Netty Replicator - improve the reconnection and keepalive mechanisms MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit In some cases during a network partition the disconnected channel got closed with delay after a new channel was already created. This started reconnection process which closed the new channel and created yet another one. Also improve the keepalive mechanism since some types of network partitions left one side unaware of the issue. It is important both the Sink and the Source are notified about any connection issue as soon as possible. PING-PONG messages are exchanged between the two sides when no deltas are sent for a period of time Configuration options keepalive-interval-seconds and max-missed-keepalives added to both configurations. Change-Id: Iebde72963bddb748ab97617d07cfc77cd8614da4 Signed-off-by: Tibor Král Signed-off-by: Robert Varga --- .../netty/AbstractKeepaliveHandler.java | 23 ++++++ .../mdsal/replicate/netty/Constants.java | 7 ++ .../replicate/netty/KeepaliveException.java | 16 ++++ .../replicate/netty/NettyReplication.java | 15 ++-- ...Handler.java => SinkKeepaliveHandler.java} | 22 +---- .../replicate/netty/SinkRequestHandler.java | 4 + .../replicate/netty/SinkSingletonService.java | 80 ++++++++++++++----- .../netty/SourceKeepaliveHandler.java | 52 ++++++++++++ .../replicate/netty/SourceRequestHandler.java | 10 ++- .../netty/SourceSingletonService.java | 11 ++- .../blueprint/netty-replication-common.xml | 11 ++- .../blueprint/netty-replication-sink.xml | 14 ++-- .../blueprint/netty-replication-source.xml | 8 ++ .../replicate/netty/IntegrationTest.java | 10 ++- 14 files changed, 225 insertions(+), 58 deletions(-) create mode 100644 replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractKeepaliveHandler.java create mode 100644 replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveException.java rename replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/{KeepaliveHandler.java => SinkKeepaliveHandler.java} (50%) create mode 100644 replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceKeepaliveHandler.java diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractKeepaliveHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractKeepaliveHandler.java new file mode 100644 index 0000000000..ced8884520 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/AbstractKeepaliveHandler.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class AbstractKeepaliveHandler extends ChannelDuplexHandler { + private static final Logger LOG = LoggerFactory.getLogger(AbstractKeepaliveHandler.class); + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + LOG.warn("Closing channel {} due to an error", ctx.channel(), cause); + ctx.close(); + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java index e7f42eb19c..8041640ebc 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java @@ -36,6 +36,10 @@ final class Constants { * Verify the connection is alive. */ static final int MSG_PING = 5; + /** + * Verify the connection is alive. + */ + static final int MSG_PONG = 6; /** * Length of the length field in each transmitted frame. @@ -54,6 +58,9 @@ final class Constants { static final ByteBuf PING = Unpooled.unreleasableBuffer( Unpooled.wrappedBuffer(new byte[] { MSG_PING })); + static final ByteBuf PONG = Unpooled.unreleasableBuffer( + Unpooled.wrappedBuffer(new byte[] { MSG_PONG })); + private Constants() { // Hidden on purpose } diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveException.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveException.java new file mode 100644 index 0000000000..a4b402902f --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveException.java @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +final class KeepaliveException extends Exception { + private static final long serialVersionUID = 1L; + + KeepaliveException(final int missedKeepalives) { + super("Keepalive Exchange Failed - missed " + missedKeepalives + " pings"); + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java index 13f9c99eda..14aed6437a 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java @@ -7,6 +7,7 @@ */ package org.opendaylight.mdsal.replicate.netty; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; import java.net.InetAddress; @@ -37,20 +38,22 @@ public final class NettyReplication { public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, final ClusterSingletonServiceProvider singletonService, final boolean enabled, final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay, - final Duration keepaliveInterval) { + final Duration keepaliveInterval, final int maxMissedKeepalives) { LOG.debug("Sink {}", enabled ? "enabled" : "disabled"); + checkArgument(maxMissedKeepalives > 0, "max-missed-keepalives %s must be greater than 0", maxMissedKeepalives); return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport, - dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay, keepaliveInterval)) - : new Disabled(); + dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay, keepaliveInterval, + maxMissedKeepalives)) : new Disabled(); } public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, - final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort) { + final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort, + final Duration keepaliveInterval, final int maxMissedKeepalives) { LOG.debug("Source {}", enabled ? "enabled" : "disabled"); final DOMDataTreeChangeService dtcs = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class); verify(dtcs != null, "Missing DOMDataTreeChangeService in broker %s", dataBroker); - + checkArgument(maxMissedKeepalives > 0, "max-missed-keepalives %s must be greater than 0", maxMissedKeepalives); return enabled ? singletonService.registerClusterSingletonService(new SourceSingletonService(bootstrapSupport, - dtcs, listenPort)) : new Disabled(); + dtcs, listenPort, keepaliveInterval, maxMissedKeepalives)) : new Disabled(); } } diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkKeepaliveHandler.java similarity index 50% rename from replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java rename to replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkKeepaliveHandler.java index 4803d3e59d..753bfe0c46 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkKeepaliveHandler.java @@ -7,35 +7,21 @@ */ package org.opendaylight.mdsal.replicate.netty; -import static java.util.Objects.requireNonNull; - -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleStateEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class KeepaliveHandler extends ChannelDuplexHandler { - private static final Logger LOG = LoggerFactory.getLogger(KeepaliveHandler.class); - - private final Runnable reconnect; - - KeepaliveHandler(final Runnable reconnectCallback) { - this.reconnect = requireNonNull(reconnectCallback, "Reconnect callback should not be null"); - } +final class SinkKeepaliveHandler extends AbstractKeepaliveHandler { + private static final Logger LOG = LoggerFactory.getLogger(SinkKeepaliveHandler.class); @Override public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) { if (evt instanceof IdleStateEvent) { - ctx.writeAndFlush(Constants.PING); + LOG.debug("IdleStateEvent received. Closing channel {}.", ctx.channel()); + ctx.close(); } else { ctx.fireUserEventTriggered(evt); } } - - @Override - public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { - LOG.warn("There was an exception on the channel. Attempting to reconnect.", cause); - reconnect.run(); - } } diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java index 0b65bc16ee..24904eaaa9 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkRequestHandler.java @@ -70,6 +70,10 @@ final class SinkRequestHandler extends SimpleChannelInboundHandler { case Constants.MSG_DTC_APPLY: handleDtcApply(); break; + case Constants.MSG_PING: + LOG.trace("Received PING from Source, sending PONG"); + ctx.channel().writeAndFlush(Constants.PONG); + break; default: throw new IllegalStateException("Unexpected message type " + msgType); } diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java index 43d93d7fb0..98e154e3d8 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java @@ -16,11 +16,11 @@ import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; import io.netty.handler.timeout.IdleStateHandler; -import io.netty.util.concurrent.Future; import java.io.IOException; import java.net.InetSocketAddress; import java.time.Duration; @@ -47,6 +47,7 @@ final class SinkSingletonService extends ChannelInitializer imple // TODO: allow different trees? private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty()); + private static long CHANNEL_CLOSE_TIMEOUT_S = 10; private static final ByteBuf TREE_REQUEST; static { @@ -61,18 +62,23 @@ final class SinkSingletonService extends ChannelInitializer imple private final DOMDataBroker dataBroker; private final InetSocketAddress sourceAddress; private final Duration reconnectDelay; + private final int maxMissedKeepalives; private final Duration keepaliveInterval; @GuardedBy("this") private ChannelFuture futureChannel; + private boolean closingInstance; + private Bootstrap bs; SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, - final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval) { + final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval, + final int maxMissedKeepalives) { this.bootstrapSupport = requireNonNull(bootstrapSupport); this.dataBroker = requireNonNull(dataBroker); this.sourceAddress = requireNonNull(sourceAddress); this.reconnectDelay = requireNonNull(reconnectDelay); this.keepaliveInterval = requireNonNull(keepaliveInterval); + this.maxMissedKeepalives = maxMissedKeepalives; LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress); } @@ -84,61 +90,95 @@ final class SinkSingletonService extends ChannelInitializer imple @Override public synchronized void instantiateServiceInstance() { LOG.info("Replication sink started with source {}", sourceAddress); + this.bs = bootstrapSupport.newBootstrap(); doConnect(); } @Holding("this") private void doConnect() { LOG.info("Connecting to Source"); - final Bootstrap bs = bootstrapSupport.newBootstrap(); final ScheduledExecutorService group = bs.config().group(); futureChannel = bs .option(ChannelOption.SO_KEEPALIVE, true) .handler(this) .connect(sourceAddress, null); - - futureChannel.addListener(compl -> channelResolved(compl, group)); + futureChannel.addListener((ChannelFutureListener) future -> channelResolved(future, group)); } @Override public synchronized ListenableFuture closeServiceInstance() { + closingInstance = true; if (futureChannel == null) { return FluentFutures.immediateNullFluentFuture(); } - // FIXME: this is not really immediate. We also should be closing the resulting channel - return FluentFutures.immediateBooleanFluentFuture(futureChannel.cancel(true)); + return FluentFutures.immediateBooleanFluentFuture(disconnect()); } private synchronized void reconnect() { + disconnect(); doConnect(); } + private synchronized boolean disconnect() { + boolean shutdownSuccess = true; + final Channel channel = futureChannel.channel(); + if (channel != null && channel.isActive()) { + try { + // close the resulting channel. Even when this triggers the closeFuture, it won't try to reconnect since + // the closingInstance flag is set + channel.close().await(CHANNEL_CLOSE_TIMEOUT_S, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.error("The channel didn't close properly within {} seconds", CHANNEL_CLOSE_TIMEOUT_S); + shutdownSuccess = false; + } + } + shutdownSuccess &= futureChannel.cancel(true); + futureChannel = null; + return shutdownSuccess; + } + @Override protected void initChannel(final SocketChannel ch) { ch.pipeline() - .addLast("idleStateHandler", new IdleStateHandler(keepaliveInterval.toNanos(), 0, 0, TimeUnit.NANOSECONDS)) - .addLast("keepaliveHandler", new KeepaliveHandler(this::reconnect)) .addLast("frameDecoder", new MessageFrameDecoder()) + .addLast("idleStateHandler", new IdleStateHandler( + keepaliveInterval.toNanos() * maxMissedKeepalives, 0, 0, TimeUnit.NANOSECONDS)) + .addLast("keepaliveHandler", new SinkKeepaliveHandler()) .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain( new SinkTransactionChainListener(ch)))) .addLast("frameEncoder", MessageFrameEncoder.INSTANCE); } - private synchronized void channelResolved(final Future completedFuture, final ScheduledExecutorService group) { - final Throwable cause = completedFuture.cause(); - if (cause != null) { - LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause); - group.schedule(() -> { - reconnect(); - }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS); - return; + private synchronized void channelResolved(final ChannelFuture completedFuture, + final ScheduledExecutorService group) { + if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) { + if (completedFuture.isSuccess()) { + final Channel ch = completedFuture.channel(); + LOG.info("Channel {} established", ch); + ch.closeFuture().addListener((ChannelFutureListener) future -> channelClosed(future, group)); + ch.writeAndFlush(TREE_REQUEST); + } else { + LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, + reconnectDelay.getSeconds(), completedFuture.cause()); + group.schedule(() -> { + reconnect(); + }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS); + } } + } - final Channel ch = futureChannel.channel(); - LOG.info("Channel {} established", ch); - ch.writeAndFlush(TREE_REQUEST); + private synchronized void channelClosed(final ChannelFuture completedFuture, final ScheduledExecutorService group) { + if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) { + if (!closingInstance) { + LOG.info("Channel {} lost connection to source {}, reconnecting in {}", completedFuture.channel(), + sourceAddress, reconnectDelay.getSeconds()); + group.schedule(() -> { + reconnect(); + }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS); + } + } } private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException { diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceKeepaliveHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceKeepaliveHandler.java new file mode 100644 index 0000000000..d98c23ad07 --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceKeepaliveHandler.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.timeout.IdleStateEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class SourceKeepaliveHandler extends AbstractKeepaliveHandler { + private static final Logger LOG = LoggerFactory.getLogger(SourceKeepaliveHandler.class); + + private final int maxMissedKeepalives; + + private int pingsSinceLastContact; + + SourceKeepaliveHandler(final int maxMissedKeepalives) { + this.maxMissedKeepalives = maxMissedKeepalives; + } + + /** + * Intercept messages from the Sink. Reset the pingsSinceLastContact counter and forward the message. + */ + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + pingsSinceLastContact = 0; + ctx.fireChannelRead(msg); + } + + /** + * If the IdleStateEvent was fired, it means the Source has not written anything to the Sink for the duration + * specified by the keepalive-interval. PING will be sent and pingsSinceLastContact incremented. + * If pingsSinceLastContact reaches max-missed-keepalives a KeepaliveException will be raised and channel closed. + */ + @Override + public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) { + if (evt instanceof IdleStateEvent) { + LOG.trace("IdleStateEvent received. Sending PING to sink"); + if (pingsSinceLastContact > maxMissedKeepalives) { + ctx.fireExceptionCaught(new KeepaliveException(maxMissedKeepalives)); + } + ctx.channel().writeAndFlush(Constants.PING); + } else { + ctx.fireUserEventTriggered(evt); + } + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java index b7811f0184..2d2065d50d 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceRequestHandler.java @@ -43,7 +43,7 @@ final class SourceRequestHandler extends SimpleChannelInboundHandler { @Override public void channelInactive(final ChannelHandlerContext ctx) { - LOG.trace("Channel {} going inactive", ctx.channel()); + LOG.info("Channel {} going inactive", ctx.channel()); if (reg != null) { reg.close(); reg = null; @@ -62,11 +62,19 @@ final class SourceRequestHandler extends SimpleChannelInboundHandler { case Constants.MSG_SUBSCRIBE_REQ: subscribe(channel, msg); break; + case Constants.MSG_PONG: + break; default: throw new IllegalStateException("Unexpected message type " + msgType); } } + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + LOG.warn("Closing channel {} due to an error", ctx.channel(), cause); + ctx.close(); + } + private void subscribe(final Channel channel, final ByteBuf msg) throws IOException { verify(reg == null, "Unexpected subscription when already subscribed"); diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java index 2d2b35ecfb..d707fd1407 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java @@ -18,10 +18,13 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.concurrent.TimeUnit; import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; @@ -45,14 +48,18 @@ final class SourceSingletonService extends ChannelInitializer imp @GuardedBy("this") private final Collection children = new HashSet<>(); + private final Duration keepaliveInterval; + private final int maxMissedKeepalives; @GuardedBy("this") private Channel serverChannel; SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs, - final int listenPort) { + final int listenPort, final Duration keepaliveInterval, final int maxMissedKeepalives) { this.bootstrapSupport = requireNonNull(bootstrapSupport); this.dtcs = requireNonNull(dtcs); this.listenPort = listenPort; + this.keepaliveInterval = requireNonNull(keepaliveInterval); + this.maxMissedKeepalives = maxMissedKeepalives; LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort); } @@ -112,6 +119,8 @@ final class SourceSingletonService extends ChannelInitializer imp ch.pipeline() .addLast("frameDecoder", new MessageFrameDecoder()) + .addLast("idleStateHandler", new IdleStateHandler(0, keepaliveInterval.toNanos(), 0, TimeUnit.NANOSECONDS)) + .addLast("keepaliveHandler", new SourceKeepaliveHandler(maxMissedKeepalives)) .addLast("requestHandler", new SourceRequestHandler(dtcs)) // Output, in reverse order .addLast("frameEncoder", MessageFrameEncoder.INSTANCE) diff --git a/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml index e394dfa253..7eb0bca2c6 100644 --- a/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml +++ b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-common.xml @@ -1,7 +1,14 @@ - + xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0" + xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0"> + + + + + + - + + @@ -16,14 +17,14 @@ - - - - + + + + @@ -33,7 +34,8 @@ - + + diff --git a/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-source.xml b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-source.xml index 41b4e106ad..48545feefb 100644 --- a/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-source.xml +++ b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-source.xml @@ -5,9 +5,15 @@ + + + + + + @@ -15,5 +21,7 @@ + + diff --git a/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java b/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java index 08bc369ce1..5f0a06b082 100644 --- a/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java +++ b/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java @@ -81,7 +81,8 @@ public class IntegrationTest extends AbstractDataBrokerTest { @Test public void testSourceToSink() throws InterruptedException, ExecutionException { // Make sure to start source... - final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT); + final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT, + Duration.ZERO, 5); // ... and give it some time start up and open up the port Thread.sleep(1000); @@ -95,7 +96,7 @@ public class IntegrationTest extends AbstractDataBrokerTest { // Kick of the sink ... final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true, - Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO); + Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3); // ... and sync on it starting up // verify the connection was established and MSG_EMPTY_DATA was transferred @@ -126,7 +127,8 @@ public class IntegrationTest extends AbstractDataBrokerTest { generateModification(getDataBroker(), deltaCount); // Make sure to start source... - final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT); + final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT, + Duration.ZERO, 5); // ... and give it some time start up and open up the port Thread.sleep(1000); @@ -140,7 +142,7 @@ public class IntegrationTest extends AbstractDataBrokerTest { // Kick of the sink ... final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true, - Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO); + Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3); // ... and sync on it starting up // verify the connection was established and MSG_EMPTY_DATA was transferred -- 2.36.6