import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.checkerframework.checker.lock.qual.Holding;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
private final DOMDataBroker dataBroker;
private final InetSocketAddress sourceAddress;
private final Duration reconnectDelay;
+ private final Duration keepaliveInterval;
@GuardedBy("this")
private ChannelFuture futureChannel;
SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker,
- final InetSocketAddress sourceAddress, final Duration reconnectDelay) {
+ final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval) {
this.bootstrapSupport = requireNonNull(bootstrapSupport);
this.dataBroker = requireNonNull(dataBroker);
this.sourceAddress = requireNonNull(sourceAddress);
this.reconnectDelay = requireNonNull(reconnectDelay);
+ this.keepaliveInterval = requireNonNull(keepaliveInterval);
LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
}
@Override
public synchronized void instantiateServiceInstance() {
LOG.info("Replication sink started with source {}", sourceAddress);
+ doConnect();
+ }
+ @Holding("this")
+ private void doConnect() {
+ LOG.info("Connecting to Source");
final Bootstrap bs = bootstrapSupport.newBootstrap();
final ScheduledExecutorService group = bs.config().group();
futureChannel = bs
- .option(ChannelOption.SO_KEEPALIVE, true)
- .handler(this)
- .connect(sourceAddress, null);
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .handler(this)
+ .connect(sourceAddress, null);
futureChannel.addListener(compl -> channelResolved(compl, group));
}
@Override
public synchronized ListenableFuture<?> closeServiceInstance() {
- // FIXME: initiate orderly shutdown
- return FluentFutures.immediateNullFluentFuture();
+ if (futureChannel == null) {
+ return FluentFutures.immediateNullFluentFuture();
+ }
+
+ // FIXME: this is not really immediate. We also should be closing the resulting channel
+ return FluentFutures.immediateBooleanFluentFuture(futureChannel.cancel(true));
+ }
+
+ private synchronized void reconnect() {
+ doConnect();
}
@Override
protected void initChannel(final SocketChannel ch) {
ch.pipeline()
+ .addLast("idleStateHandler", new IdleStateHandler(keepaliveInterval.toNanos(), 0, 0, TimeUnit.NANOSECONDS))
+ .addLast("keepaliveHandler", new KeepaliveHandler(this::reconnect))
.addLast("frameDecoder", new MessageFrameDecoder())
.addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
new SinkTransactionChainListener(ch))))
if (cause != null) {
LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
group.schedule(() -> {
- // FIXME: perform reconnect
+ reconnect();
}, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
return;
}