From 2cbcb2d8589805bcb053462176424dae2b53cd1a Mon Sep 17 00:00:00 2001 From: =?utf8?q?Tibor=20Kr=C3=A1l?= Date: Thu, 25 Jun 2020 18:17:55 +0200 Subject: [PATCH] Provide auto-reconnection for Sink MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit In case there is a network partition the Sink needs to be aware of it and schedule reconnect. This patch adds configuration knob keepalive-interval-seconds to tweak how soon after network failure the Sink gets notified. Change-Id: I1eb2880bb00d1101cd587e4a737ba2f8a485b7ed Signed-off-by: Tibor Král Signed-off-by: Robert Varga --- .../mdsal/replicate/netty/Constants.java | 7 ++++ .../replicate/netty/KeepaliveHandler.java | 41 +++++++++++++++++++ .../replicate/netty/NettyReplication.java | 6 ++- .../replicate/netty/SinkSingletonService.java | 33 +++++++++++---- .../blueprint/netty-replication-sink.xml | 6 +++ .../replicate/netty/IntegrationTest.java | 4 +- 6 files changed, 86 insertions(+), 11 deletions(-) create mode 100644 replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java index 1fff84f4e5..e7f42eb19c 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/Constants.java @@ -32,6 +32,10 @@ final class Constants { * End-of-DataTreeCandidate serialization stream. The payload is empty. */ static final byte MSG_DTC_APPLY = 4; + /** + * Verify the connection is alive. + */ + static final int MSG_PING = 5; /** * Length of the length field in each transmitted frame. @@ -47,6 +51,9 @@ final class Constants { static final ByteBuf DTC_APPLY = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY })); + static final ByteBuf PING = Unpooled.unreleasableBuffer( + Unpooled.wrappedBuffer(new byte[] { MSG_PING })); + private Constants() { // Hidden on purpose } diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java new file mode 100644 index 0000000000..4803d3e59d --- /dev/null +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/KeepaliveHandler.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.mdsal.replicate.netty; + +import static java.util.Objects.requireNonNull; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.timeout.IdleStateEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KeepaliveHandler extends ChannelDuplexHandler { + private static final Logger LOG = LoggerFactory.getLogger(KeepaliveHandler.class); + + private final Runnable reconnect; + + KeepaliveHandler(final Runnable reconnectCallback) { + this.reconnect = requireNonNull(reconnectCallback, "Reconnect callback should not be null"); + } + + @Override + public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) { + if (evt instanceof IdleStateEvent) { + ctx.writeAndFlush(Constants.PING); + } else { + ctx.fireUserEventTriggered(evt); + } + } + + @Override + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + LOG.warn("There was an exception on the channel. Attempting to reconnect.", cause); + reconnect.run(); + } +} diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java index cc70ffad57..13f9c99eda 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/NettyReplication.java @@ -36,10 +36,12 @@ public final class NettyReplication { public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, final ClusterSingletonServiceProvider singletonService, final boolean enabled, - final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay) { + final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay, + final Duration keepaliveInterval) { LOG.debug("Sink {}", enabled ? "enabled" : "disabled"); return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport, - dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay)) : new Disabled(); + dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay, keepaliveInterval)) + : new Disabled(); } public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java index f71e5af1fc..43d93d7fb0 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java @@ -19,6 +19,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; import java.io.IOException; import java.net.InetSocketAddress; @@ -26,6 +27,7 @@ import java.time.Duration; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; @@ -59,16 +61,18 @@ final class SinkSingletonService extends ChannelInitializer imple private final DOMDataBroker dataBroker; private final InetSocketAddress sourceAddress; private final Duration reconnectDelay; + private final Duration keepaliveInterval; @GuardedBy("this") private ChannelFuture futureChannel; SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, - final InetSocketAddress sourceAddress, final Duration reconnectDelay) { + final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval) { this.bootstrapSupport = requireNonNull(bootstrapSupport); this.dataBroker = requireNonNull(dataBroker); this.sourceAddress = requireNonNull(sourceAddress); this.reconnectDelay = requireNonNull(reconnectDelay); + this.keepaliveInterval = requireNonNull(keepaliveInterval); LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress); } @@ -80,27 +84,42 @@ final class SinkSingletonService extends ChannelInitializer imple @Override public synchronized void instantiateServiceInstance() { LOG.info("Replication sink started with source {}", sourceAddress); + doConnect(); + } + @Holding("this") + private void doConnect() { + LOG.info("Connecting to Source"); final Bootstrap bs = bootstrapSupport.newBootstrap(); final ScheduledExecutorService group = bs.config().group(); futureChannel = bs - .option(ChannelOption.SO_KEEPALIVE, true) - .handler(this) - .connect(sourceAddress, null); + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(this) + .connect(sourceAddress, null); futureChannel.addListener(compl -> channelResolved(compl, group)); } @Override public synchronized ListenableFuture closeServiceInstance() { - // FIXME: initiate orderly shutdown - return FluentFutures.immediateNullFluentFuture(); + if (futureChannel == null) { + return FluentFutures.immediateNullFluentFuture(); + } + + // FIXME: this is not really immediate. We also should be closing the resulting channel + return FluentFutures.immediateBooleanFluentFuture(futureChannel.cancel(true)); + } + + private synchronized void reconnect() { + doConnect(); } @Override protected void initChannel(final SocketChannel ch) { ch.pipeline() + .addLast("idleStateHandler", new IdleStateHandler(keepaliveInterval.toNanos(), 0, 0, TimeUnit.NANOSECONDS)) + .addLast("keepaliveHandler", new KeepaliveHandler(this::reconnect)) .addLast("frameDecoder", new MessageFrameDecoder()) .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain( new SinkTransactionChainListener(ch)))) @@ -112,7 +131,7 @@ final class SinkSingletonService extends ChannelInitializer imple if (cause != null) { LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause); group.schedule(() -> { - // FIXME: perform reconnect + reconnect(); }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS); return; } diff --git a/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-sink.xml b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-sink.xml index a5bbbc0063..7aefce2e35 100644 --- a/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-sink.xml +++ b/replicate/mdsal-replicate-netty/src/main/resources/OSGI-INF/blueprint/netty-replication-sink.xml @@ -8,6 +8,7 @@ + @@ -15,6 +16,10 @@ + + + + @@ -28,6 +33,7 @@ + diff --git a/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java b/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java index e688bd5e9b..08bc369ce1 100644 --- a/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java +++ b/replicate/mdsal-replicate-netty/src/test/java/org/opendaylight/mdsal/replicate/netty/IntegrationTest.java @@ -95,7 +95,7 @@ public class IntegrationTest extends AbstractDataBrokerTest { // Kick of the sink ... final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true, - Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO); + Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO); // ... and sync on it starting up // verify the connection was established and MSG_EMPTY_DATA was transferred @@ -140,7 +140,7 @@ public class IntegrationTest extends AbstractDataBrokerTest { // Kick of the sink ... final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true, - Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO); + Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO); // ... and sync on it starting up // verify the connection was established and MSG_EMPTY_DATA was transferred -- 2.36.6