--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractKeepaliveHandler extends ChannelDuplexHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractKeepaliveHandler.class);
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ LOG.warn("Closing channel {} due to an error", ctx.channel(), cause);
+ ctx.close();
+ }
+}
* Verify the connection is alive.
*/
static final int MSG_PING = 5;
+ /**
+ * Verify the connection is alive.
+ */
+ static final int MSG_PONG = 6;
/**
* Length of the length field in each transmitted frame.
static final ByteBuf PING = Unpooled.unreleasableBuffer(
Unpooled.wrappedBuffer(new byte[] { MSG_PING }));
+ static final ByteBuf PONG = Unpooled.unreleasableBuffer(
+ Unpooled.wrappedBuffer(new byte[] { MSG_PONG }));
+
private Constants() {
// Hidden on purpose
}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+final class KeepaliveException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ KeepaliveException(final int missedKeepalives) {
+ super("Keepalive Exchange Failed - missed " + missedKeepalives + " pings");
+ }
+}
*/
package org.opendaylight.mdsal.replicate.netty;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import java.net.InetAddress;
public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
final ClusterSingletonServiceProvider singletonService, final boolean enabled,
final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay,
- final Duration keepaliveInterval) {
+ final Duration keepaliveInterval, final int maxMissedKeepalives) {
LOG.debug("Sink {}", enabled ? "enabled" : "disabled");
+ checkArgument(maxMissedKeepalives > 0, "max-missed-keepalives %s must be greater than 0", maxMissedKeepalives);
return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport,
- dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay, keepaliveInterval))
- : new Disabled();
+ dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay, keepaliveInterval,
+ maxMissedKeepalives)) : new Disabled();
}
public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
- final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort) {
+ final ClusterSingletonServiceProvider singletonService, final boolean enabled, final int listenPort,
+ final Duration keepaliveInterval, final int maxMissedKeepalives) {
LOG.debug("Source {}", enabled ? "enabled" : "disabled");
final DOMDataTreeChangeService dtcs = dataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
verify(dtcs != null, "Missing DOMDataTreeChangeService in broker %s", dataBroker);
-
+ checkArgument(maxMissedKeepalives > 0, "max-missed-keepalives %s must be greater than 0", maxMissedKeepalives);
return enabled ? singletonService.registerClusterSingletonService(new SourceSingletonService(bootstrapSupport,
- dtcs, listenPort)) : new Disabled();
+ dtcs, listenPort, keepaliveInterval, maxMissedKeepalives)) : new Disabled();
}
}
*/
package org.opendaylight.mdsal.replicate.netty;
-import static java.util.Objects.requireNonNull;
-
-import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class KeepaliveHandler extends ChannelDuplexHandler {
- private static final Logger LOG = LoggerFactory.getLogger(KeepaliveHandler.class);
-
- private final Runnable reconnect;
-
- KeepaliveHandler(final Runnable reconnectCallback) {
- this.reconnect = requireNonNull(reconnectCallback, "Reconnect callback should not be null");
- }
+final class SinkKeepaliveHandler extends AbstractKeepaliveHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(SinkKeepaliveHandler.class);
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
if (evt instanceof IdleStateEvent) {
- ctx.writeAndFlush(Constants.PING);
+ LOG.debug("IdleStateEvent received. Closing channel {}.", ctx.channel());
+ ctx.close();
} else {
ctx.fireUserEventTriggered(evt);
}
}
-
- @Override
- public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
- LOG.warn("There was an exception on the channel. Attempting to reconnect.", cause);
- reconnect.run();
- }
}
case Constants.MSG_DTC_APPLY:
handleDtcApply();
break;
+ case Constants.MSG_PING:
+ LOG.trace("Received PING from Source, sending PONG");
+ ctx.channel().writeAndFlush(Constants.PONG);
+ break;
default:
throw new IllegalStateException("Unexpected message type " + msgType);
}
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
// TODO: allow different trees?
private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
YangInstanceIdentifier.empty());
+ private static long CHANNEL_CLOSE_TIMEOUT_S = 10;
private static final ByteBuf TREE_REQUEST;
static {
private final DOMDataBroker dataBroker;
private final InetSocketAddress sourceAddress;
private final Duration reconnectDelay;
+ private final int maxMissedKeepalives;
private final Duration keepaliveInterval;
@GuardedBy("this")
private ChannelFuture futureChannel;
+ private boolean closingInstance;
+ private Bootstrap bs;
SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
- final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval) {
+ final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval,
+ final int maxMissedKeepalives) {
this.bootstrapSupport = requireNonNull(bootstrapSupport);
this.dataBroker = requireNonNull(dataBroker);
this.sourceAddress = requireNonNull(sourceAddress);
this.reconnectDelay = requireNonNull(reconnectDelay);
this.keepaliveInterval = requireNonNull(keepaliveInterval);
+ this.maxMissedKeepalives = maxMissedKeepalives;
LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
}
@Override
public synchronized void instantiateServiceInstance() {
LOG.info("Replication sink started with source {}", sourceAddress);
+ this.bs = bootstrapSupport.newBootstrap();
doConnect();
}
@Holding("this")
private void doConnect() {
LOG.info("Connecting to Source");
- final Bootstrap bs = bootstrapSupport.newBootstrap();
final ScheduledExecutorService group = bs.config().group();
futureChannel = bs
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(this)
.connect(sourceAddress, null);
-
- futureChannel.addListener(compl -> channelResolved(compl, group));
+ futureChannel.addListener((ChannelFutureListener) future -> channelResolved(future, group));
}
@Override
public synchronized ListenableFuture<?> closeServiceInstance() {
+ closingInstance = true;
if (futureChannel == null) {
return FluentFutures.immediateNullFluentFuture();
}
- // FIXME: this is not really immediate. We also should be closing the resulting channel
- return FluentFutures.immediateBooleanFluentFuture(futureChannel.cancel(true));
+ return FluentFutures.immediateBooleanFluentFuture(disconnect());
}
private synchronized void reconnect() {
+ disconnect();
doConnect();
}
+ private synchronized boolean disconnect() {
+ boolean shutdownSuccess = true;
+ final Channel channel = futureChannel.channel();
+ if (channel != null && channel.isActive()) {
+ try {
+ // close the resulting channel. Even when this triggers the closeFuture, it won't try to reconnect since
+ // the closingInstance flag is set
+ channel.close().await(CHANNEL_CLOSE_TIMEOUT_S, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("The channel didn't close properly within {} seconds", CHANNEL_CLOSE_TIMEOUT_S);
+ shutdownSuccess = false;
+ }
+ }
+ shutdownSuccess &= futureChannel.cancel(true);
+ futureChannel = null;
+ return shutdownSuccess;
+ }
+
@Override
protected void initChannel(final SocketChannel ch) {
ch.pipeline()
- .addLast("idleStateHandler", new IdleStateHandler(keepaliveInterval.toNanos(), 0, 0, TimeUnit.NANOSECONDS))
- .addLast("keepaliveHandler", new KeepaliveHandler(this::reconnect))
.addLast("frameDecoder", new MessageFrameDecoder())
+ .addLast("idleStateHandler", new IdleStateHandler(
+ keepaliveInterval.toNanos() * maxMissedKeepalives, 0, 0, TimeUnit.NANOSECONDS))
+ .addLast("keepaliveHandler", new SinkKeepaliveHandler())
.addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
new SinkTransactionChainListener(ch))))
.addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
}
- private synchronized void channelResolved(final Future<?> completedFuture, final ScheduledExecutorService group) {
- final Throwable cause = completedFuture.cause();
- if (cause != null) {
- LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
- group.schedule(() -> {
- reconnect();
- }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
- return;
+ private synchronized void channelResolved(final ChannelFuture completedFuture,
+ final ScheduledExecutorService group) {
+ if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
+ if (completedFuture.isSuccess()) {
+ final Channel ch = completedFuture.channel();
+ LOG.info("Channel {} established", ch);
+ ch.closeFuture().addListener((ChannelFutureListener) future -> channelClosed(future, group));
+ ch.writeAndFlush(TREE_REQUEST);
+ } else {
+ LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress,
+ reconnectDelay.getSeconds(), completedFuture.cause());
+ group.schedule(() -> {
+ reconnect();
+ }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
+ }
}
+ }
- final Channel ch = futureChannel.channel();
- LOG.info("Channel {} established", ch);
- ch.writeAndFlush(TREE_REQUEST);
+ private synchronized void channelClosed(final ChannelFuture completedFuture, final ScheduledExecutorService group) {
+ if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
+ if (!closingInstance) {
+ LOG.info("Channel {} lost connection to source {}, reconnecting in {}", completedFuture.channel(),
+ sourceAddress, reconnectDelay.getSeconds());
+ group.schedule(() -> {
+ reconnect();
+ }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
+ }
+ }
}
private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException {
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleStateEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SourceKeepaliveHandler extends AbstractKeepaliveHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(SourceKeepaliveHandler.class);
+
+ private final int maxMissedKeepalives;
+
+ private int pingsSinceLastContact;
+
+ SourceKeepaliveHandler(final int maxMissedKeepalives) {
+ this.maxMissedKeepalives = maxMissedKeepalives;
+ }
+
+ /**
+ * Intercept messages from the Sink. Reset the pingsSinceLastContact counter and forward the message.
+ */
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
+ pingsSinceLastContact = 0;
+ ctx.fireChannelRead(msg);
+ }
+
+ /**
+ * If the IdleStateEvent was fired, it means the Source has not written anything to the Sink for the duration
+ * specified by the keepalive-interval. PING will be sent and pingsSinceLastContact incremented.
+ * If pingsSinceLastContact reaches max-missed-keepalives a KeepaliveException will be raised and channel closed.
+ */
+ @Override
+ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+ if (evt instanceof IdleStateEvent) {
+ LOG.trace("IdleStateEvent received. Sending PING to sink");
+ if (pingsSinceLastContact > maxMissedKeepalives) {
+ ctx.fireExceptionCaught(new KeepaliveException(maxMissedKeepalives));
+ }
+ ctx.channel().writeAndFlush(Constants.PING);
+ } else {
+ ctx.fireUserEventTriggered(evt);
+ }
+ }
+}
@Override
public void channelInactive(final ChannelHandlerContext ctx) {
- LOG.trace("Channel {} going inactive", ctx.channel());
+ LOG.info("Channel {} going inactive", ctx.channel());
if (reg != null) {
reg.close();
reg = null;
case Constants.MSG_SUBSCRIBE_REQ:
subscribe(channel, msg);
break;
+ case Constants.MSG_PONG:
+ break;
default:
throw new IllegalStateException("Unexpected message type " + msgType);
}
}
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ LOG.warn("Closing channel {} due to an error", ctx.channel(), cause);
+ ctx.close();
+ }
+
private void subscribe(final Channel channel, final ByteBuf msg) throws IOException {
verify(reg == null, "Unexpected subscription when already subscribed");
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
@GuardedBy("this")
private final Collection<SocketChannel> children = new HashSet<>();
+ private final Duration keepaliveInterval;
+ private final int maxMissedKeepalives;
@GuardedBy("this")
private Channel serverChannel;
SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
- final int listenPort) {
+ final int listenPort, final Duration keepaliveInterval, final int maxMissedKeepalives) {
this.bootstrapSupport = requireNonNull(bootstrapSupport);
this.dtcs = requireNonNull(dtcs);
this.listenPort = listenPort;
+ this.keepaliveInterval = requireNonNull(keepaliveInterval);
+ this.maxMissedKeepalives = maxMissedKeepalives;
LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
}
ch.pipeline()
.addLast("frameDecoder", new MessageFrameDecoder())
+ .addLast("idleStateHandler", new IdleStateHandler(0, keepaliveInterval.toNanos(), 0, TimeUnit.NANOSECONDS))
+ .addLast("keepaliveHandler", new SourceKeepaliveHandler(maxMissedKeepalives))
.addLast("requestHandler", new SourceRequestHandler(dtcs))
// Output, in reverse order
.addLast("frameEncoder", MessageFrameEncoder.INSTANCE)
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
- xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0">
-
+ xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
+ xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
+ <cm:property-placeholder persistent-id="org.opendaylight.mdsal.replicate.netty.common" update-strategy="none"
+ placeholder-prefix="$common(" placeholder-suffix=")">
+ <cm:default-properties>
+ <cm:property name="keepalive-interval-seconds" value="10"/>
+ <cm:property name="max-missed-keepalives" value="5"/>
+ </cm:default-properties>
+ </cm:property-placeholder>
<reference id="dataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker" odl:type="default"/>
<reference id="singletonServiceProvider" interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"
<cm:property name="source-host" value="127.0.0.1"/>
<cm:property name="source-port" value="9999"/>
<cm:property name="reconnect-delay-millis" value="3000"/>
- <cm:property name="keepalive-interval-seconds" value="10"/>
+ <cm:property name="keepalive-interval-seconds" value="$common(keepalive-interval-seconds)"/>
+ <cm:property name="max-missed-keepalives" value="$common(max-missed-keepalives)"/>
</cm:default-properties>
</cm:property-placeholder>
<argument value="$(reconnect-delay-millis)"/>
</bean>
- <bean id="keepaliveInterval" class="java.time.Duration" factory-method="ofSeconds">
- <argument value="$(keepalive-interval-seconds)"/>
- </bean>
-
<bean id="sourceAddress" class="java.net.InetAddress" factory-method="getByName">
<argument value="$(source-host)"/>
</bean>
+ <bean id="keepaliveInt" class="java.time.Duration" factory-method="ofSeconds">
+ <argument value="$(keepalive-interval-seconds)"/>
+ </bean>
+
<bean id="nettyReplicationSink" class="org.opendaylight.mdsal.replicate.netty.NettyReplication"
factory-method="createSink" destroy-method="close">
<argument ref="bootstrapSupport"/>
<argument ref="sourceAddress"/>
<argument value="$(source-port)"/>
<argument ref="reconnectDelay"/>
- <argument ref="keepaliveInterval"/>
+ <argument ref="keepaliveInt"/>
+ <argument value="$(max-missed-keepalives)"/>
</bean>
</blueprint>
<cm:default-properties>
<cm:property name="enabled" value="false"/>
<cm:property name="listen-port" value="9999"/>
+ <cm:property name="keepalive-interval-seconds" value="$common(keepalive-interval-seconds)"/>
+ <cm:property name="max-missed-keepalives" value="$common(max-missed-keepalives)"/>
</cm:default-properties>
</cm:property-placeholder>
+ <bean id="keepaliveInterval" class="java.time.Duration" factory-method="ofSeconds">
+ <argument value="${keepalive-interval-seconds}"/>
+ </bean>
+
<bean id="nettyReplicationSource" class="org.opendaylight.mdsal.replicate.netty.NettyReplication"
factory-method="createSource" destroy-method="close">
<argument ref="bootstrapSupport"/>
<argument ref="singletonServiceProvider"/>
<argument value="${enabled}"/>
<argument value="${listen-port}"/>
+ <argument ref="keepaliveInterval"/>
+ <argument value="${max-missed-keepalives}"/>
</bean>
</blueprint>
@Test
public void testSourceToSink() throws InterruptedException, ExecutionException {
// Make sure to start source...
- final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT);
+ final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT,
+ Duration.ZERO, 5);
// ... and give it some time start up and open up the port
Thread.sleep(1000);
// Kick of the sink ...
final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
- Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO);
+ Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3);
// ... and sync on it starting up
// verify the connection was established and MSG_EMPTY_DATA was transferred
generateModification(getDataBroker(), deltaCount);
// Make sure to start source...
- final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT);
+ final Registration source = NettyReplication.createSource(support, getDomBroker(), css, true, TEST_PORT,
+ Duration.ZERO, 5);
// ... and give it some time start up and open up the port
Thread.sleep(1000);
// Kick of the sink ...
final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
- Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO);
+ Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO, 3);
// ... and sync on it starting up
// verify the connection was established and MSG_EMPTY_DATA was transferred