* End-of-DataTreeCandidate serialization stream. The payload is empty.
*/
static final byte MSG_DTC_APPLY = 4;
+ /**
+ * Verify the connection is alive.
+ */
+ static final int MSG_PING = 5;
/**
* Length of the length field in each transmitted frame.
static final ByteBuf DTC_APPLY = Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { MSG_DTC_APPLY }));
+ static final ByteBuf PING = Unpooled.unreleasableBuffer(
+ Unpooled.wrappedBuffer(new byte[] { MSG_PING }));
+
private Constants() {
// Hidden on purpose
}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.mdsal.replicate.netty;
+
+import static java.util.Objects.requireNonNull;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleStateEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KeepaliveHandler extends ChannelDuplexHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(KeepaliveHandler.class);
+
+ private final Runnable reconnect;
+
+ KeepaliveHandler(final Runnable reconnectCallback) {
+ this.reconnect = requireNonNull(reconnectCallback, "Reconnect callback should not be null");
+ }
+
+ @Override
+ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
+ if (evt instanceof IdleStateEvent) {
+ ctx.writeAndFlush(Constants.PING);
+ } else {
+ ctx.fireUserEventTriggered(evt);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+ LOG.warn("There was an exception on the channel. Attempting to reconnect.", cause);
+ reconnect.run();
+ }
+}
public static Registration createSink(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
final ClusterSingletonServiceProvider singletonService, final boolean enabled,
- final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay) {
+ final InetAddress sourceAddress, final int sourcePort, final Duration reconnectDelay,
+ final Duration keepaliveInterval) {
LOG.debug("Sink {}", enabled ? "enabled" : "disabled");
return enabled ? singletonService.registerClusterSingletonService(new SinkSingletonService(bootstrapSupport,
- dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay)) : new Disabled();
+ dataBroker, new InetSocketAddress(sourceAddress, sourcePort), reconnectDelay, keepaliveInterval))
+ : new Disabled();
}
public static Registration createSource(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
private final DOMDataBroker dataBroker;
private final InetSocketAddress sourceAddress;
private final Duration reconnectDelay;
+ private final Duration keepaliveInterval;
@GuardedBy("this")
private ChannelFuture futureChannel;
SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
- final InetSocketAddress sourceAddress, final Duration reconnectDelay) {
+ final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval) {
this.bootstrapSupport = requireNonNull(bootstrapSupport);
this.dataBroker = requireNonNull(dataBroker);
this.sourceAddress = requireNonNull(sourceAddress);
this.reconnectDelay = requireNonNull(reconnectDelay);
+ this.keepaliveInterval = requireNonNull(keepaliveInterval);
LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
}
@Override
public synchronized void instantiateServiceInstance() {
LOG.info("Replication sink started with source {}", sourceAddress);
+ doConnect();
+ }
+ @Holding("this")
+ private void doConnect() {
+ LOG.info("Connecting to Source");
final Bootstrap bs = bootstrapSupport.newBootstrap();
final ScheduledExecutorService group = bs.config().group();
futureChannel = bs
- .option(ChannelOption.SO_KEEPALIVE, true)
- .handler(this)
- .connect(sourceAddress, null);
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .handler(this)
+ .connect(sourceAddress, null);
futureChannel.addListener(compl -> channelResolved(compl, group));
}
@Override
public synchronized ListenableFuture<?> closeServiceInstance() {
- // FIXME: initiate orderly shutdown
- return FluentFutures.immediateNullFluentFuture();
+ if (futureChannel == null) {
+ return FluentFutures.immediateNullFluentFuture();
+ }
+
+ // FIXME: this is not really immediate. We also should be closing the resulting channel
+ return FluentFutures.immediateBooleanFluentFuture(futureChannel.cancel(true));
+ }
+
+ private synchronized void reconnect() {
+ doConnect();
}
@Override
protected void initChannel(final SocketChannel ch) {
ch.pipeline()
+ .addLast("idleStateHandler", new IdleStateHandler(keepaliveInterval.toNanos(), 0, 0, TimeUnit.NANOSECONDS))
+ .addLast("keepaliveHandler", new KeepaliveHandler(this::reconnect))
.addLast("frameDecoder", new MessageFrameDecoder())
.addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
new SinkTransactionChainListener(ch))))
if (cause != null) {
LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
group.schedule(() -> {
- // FIXME: perform reconnect
+ reconnect();
}, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
return;
}
<cm:property name="source-host" value="127.0.0.1"/>
<cm:property name="source-port" value="9999"/>
<cm:property name="reconnect-delay-millis" value="3000"/>
+ <cm:property name="keepalive-interval-seconds" value="10"/>
</cm:default-properties>
</cm:property-placeholder>
<argument value="$(reconnect-delay-millis)"/>
</bean>
+ <bean id="keepaliveInterval" class="java.time.Duration" factory-method="ofSeconds">
+ <argument value="$(keepalive-interval-seconds)"/>
+ </bean>
+
<bean id="sourceAddress" class="java.net.InetAddress" factory-method="getByName">
<argument value="$(source-host)"/>
</bean>
<argument ref="sourceAddress"/>
<argument value="$(source-port)"/>
<argument ref="reconnectDelay"/>
+ <argument ref="keepaliveInterval"/>
</bean>
</blueprint>
// Kick of the sink ...
final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
- Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO);
+ Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO);
// ... and sync on it starting up
// verify the connection was established and MSG_EMPTY_DATA was transferred
// Kick of the sink ...
final Registration sink = NettyReplication.createSink(support, sinkBroker, css, true,
- Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO);
+ Inet4Address.getLoopbackAddress(), TEST_PORT, Duration.ZERO, Duration.ZERO);
// ... and sync on it starting up
// verify the connection was established and MSG_EMPTY_DATA was transferred