import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
// TODO: allow different trees?
private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
YangInstanceIdentifier.empty());
+ private static long CHANNEL_CLOSE_TIMEOUT_S = 10;
private static final ByteBuf TREE_REQUEST;
static {
private final DOMDataBroker dataBroker;
private final InetSocketAddress sourceAddress;
private final Duration reconnectDelay;
+ private final int maxMissedKeepalives;
private final Duration keepaliveInterval;
@GuardedBy("this")
private ChannelFuture futureChannel;
+ private boolean closingInstance;
+ private Bootstrap bs;
SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
- final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval) {
+ final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval,
+ final int maxMissedKeepalives) {
this.bootstrapSupport = requireNonNull(bootstrapSupport);
this.dataBroker = requireNonNull(dataBroker);
this.sourceAddress = requireNonNull(sourceAddress);
this.reconnectDelay = requireNonNull(reconnectDelay);
this.keepaliveInterval = requireNonNull(keepaliveInterval);
+ this.maxMissedKeepalives = maxMissedKeepalives;
LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
}
@Override
public synchronized void instantiateServiceInstance() {
LOG.info("Replication sink started with source {}", sourceAddress);
+ this.bs = bootstrapSupport.newBootstrap();
doConnect();
}
@Holding("this")
private void doConnect() {
LOG.info("Connecting to Source");
- final Bootstrap bs = bootstrapSupport.newBootstrap();
final ScheduledExecutorService group = bs.config().group();
futureChannel = bs
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(this)
.connect(sourceAddress, null);
-
- futureChannel.addListener(compl -> channelResolved(compl, group));
+ futureChannel.addListener((ChannelFutureListener) future -> channelResolved(future, group));
}
@Override
public synchronized ListenableFuture<?> closeServiceInstance() {
+ closingInstance = true;
if (futureChannel == null) {
return FluentFutures.immediateNullFluentFuture();
}
- // FIXME: this is not really immediate. We also should be closing the resulting channel
- return FluentFutures.immediateBooleanFluentFuture(futureChannel.cancel(true));
+ return FluentFutures.immediateBooleanFluentFuture(disconnect());
}
private synchronized void reconnect() {
+ disconnect();
doConnect();
}
+ private synchronized boolean disconnect() {
+ boolean shutdownSuccess = true;
+ final Channel channel = futureChannel.channel();
+ if (channel != null && channel.isActive()) {
+ try {
+ // close the resulting channel. Even when this triggers the closeFuture, it won't try to reconnect since
+ // the closingInstance flag is set
+ channel.close().await(CHANNEL_CLOSE_TIMEOUT_S, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("The channel didn't close properly within {} seconds", CHANNEL_CLOSE_TIMEOUT_S);
+ shutdownSuccess = false;
+ }
+ }
+ shutdownSuccess &= futureChannel.cancel(true);
+ futureChannel = null;
+ return shutdownSuccess;
+ }
+
@Override
protected void initChannel(final SocketChannel ch) {
ch.pipeline()
- .addLast("idleStateHandler", new IdleStateHandler(keepaliveInterval.toNanos(), 0, 0, TimeUnit.NANOSECONDS))
- .addLast("keepaliveHandler", new KeepaliveHandler(this::reconnect))
.addLast("frameDecoder", new MessageFrameDecoder())
+ .addLast("idleStateHandler", new IdleStateHandler(
+ keepaliveInterval.toNanos() * maxMissedKeepalives, 0, 0, TimeUnit.NANOSECONDS))
+ .addLast("keepaliveHandler", new SinkKeepaliveHandler())
.addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
new SinkTransactionChainListener(ch))))
.addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
}
- private synchronized void channelResolved(final Future<?> completedFuture, final ScheduledExecutorService group) {
- final Throwable cause = completedFuture.cause();
- if (cause != null) {
- LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
- group.schedule(() -> {
- reconnect();
- }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
- return;
+ private synchronized void channelResolved(final ChannelFuture completedFuture,
+ final ScheduledExecutorService group) {
+ if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
+ if (completedFuture.isSuccess()) {
+ final Channel ch = completedFuture.channel();
+ LOG.info("Channel {} established", ch);
+ ch.closeFuture().addListener((ChannelFutureListener) future -> channelClosed(future, group));
+ ch.writeAndFlush(TREE_REQUEST);
+ } else {
+ LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress,
+ reconnectDelay.getSeconds(), completedFuture.cause());
+ group.schedule(() -> {
+ reconnect();
+ }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
+ }
}
+ }
- final Channel ch = futureChannel.channel();
- LOG.info("Channel {} established", ch);
- ch.writeAndFlush(TREE_REQUEST);
+ private synchronized void channelClosed(final ChannelFuture completedFuture, final ScheduledExecutorService group) {
+ if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
+ if (!closingInstance) {
+ LOG.info("Channel {} lost connection to source {}, reconnecting in {}", completedFuture.channel(),
+ sourceAddress, reconnectDelay.getSeconds());
+ group.schedule(() -> {
+ reconnect();
+ }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
+ }
+ }
}
private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException {