X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=replicate%2Fmdsal-replicate-netty%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Freplicate%2Fnetty%2FSinkSingletonService.java;h=43d93d7fb0606ad32dec030943c46292ba63d2cd;hb=2cbcb2d8589805bcb053462176424dae2b53cd1a;hp=f71e5af1fc82020ef3839f11d6257637489a8eb6;hpb=a2f2028f4d111e77ca542d2bc2110f38fafd82fb;p=mdsal.git diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java index f71e5af1fc..43d93d7fb0 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java @@ -19,6 +19,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; import java.io.IOException; import java.net.InetSocketAddress; @@ -26,6 +27,7 @@ import java.time.Duration; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; @@ -59,16 +61,18 @@ final class SinkSingletonService extends ChannelInitializer imple private final DOMDataBroker dataBroker; private final InetSocketAddress sourceAddress; private final Duration reconnectDelay; + private final Duration keepaliveInterval; @GuardedBy("this") private ChannelFuture futureChannel; SinkSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataBroker dataBroker, - final InetSocketAddress sourceAddress, final Duration reconnectDelay) { + final InetSocketAddress sourceAddress, final Duration reconnectDelay, final Duration keepaliveInterval) { this.bootstrapSupport = requireNonNull(bootstrapSupport); this.dataBroker = requireNonNull(dataBroker); this.sourceAddress = requireNonNull(sourceAddress); this.reconnectDelay = requireNonNull(reconnectDelay); + this.keepaliveInterval = requireNonNull(keepaliveInterval); LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress); } @@ -80,27 +84,42 @@ final class SinkSingletonService extends ChannelInitializer imple @Override public synchronized void instantiateServiceInstance() { LOG.info("Replication sink started with source {}", sourceAddress); + doConnect(); + } + @Holding("this") + private void doConnect() { + LOG.info("Connecting to Source"); final Bootstrap bs = bootstrapSupport.newBootstrap(); final ScheduledExecutorService group = bs.config().group(); futureChannel = bs - .option(ChannelOption.SO_KEEPALIVE, true) - .handler(this) - .connect(sourceAddress, null); + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(this) + .connect(sourceAddress, null); futureChannel.addListener(compl -> channelResolved(compl, group)); } @Override public synchronized ListenableFuture closeServiceInstance() { - // FIXME: initiate orderly shutdown - return FluentFutures.immediateNullFluentFuture(); + if (futureChannel == null) { + return FluentFutures.immediateNullFluentFuture(); + } + + // FIXME: this is not really immediate. We also should be closing the resulting channel + return FluentFutures.immediateBooleanFluentFuture(futureChannel.cancel(true)); + } + + private synchronized void reconnect() { + doConnect(); } @Override protected void initChannel(final SocketChannel ch) { ch.pipeline() + .addLast("idleStateHandler", new IdleStateHandler(keepaliveInterval.toNanos(), 0, 0, TimeUnit.NANOSECONDS)) + .addLast("keepaliveHandler", new KeepaliveHandler(this::reconnect)) .addLast("frameDecoder", new MessageFrameDecoder()) .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain( new SinkTransactionChainListener(ch)))) @@ -112,7 +131,7 @@ final class SinkSingletonService extends ChannelInitializer imple if (cause != null) { LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause); group.schedule(() -> { - // FIXME: perform reconnect + reconnect(); }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS); return; }