X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=replicate%2Fmdsal-replicate-netty%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Freplicate%2Fnetty%2FSinkSingletonService.java;h=f71e5af1fc82020ef3839f11d6257637489a8eb6;hb=refs%2Fchanges%2F62%2F90562%2F8;hp=c2b5f4a83822488efb4be54903adf09b90a3139c;hpb=54eeeeaa0eb9c98933c20f30336f82fb15501566;p=mdsal.git diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java index c2b5f4a838..f71e5af1fc 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SinkSingletonService.java @@ -16,25 +16,29 @@ import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; +import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.Future; import java.io.IOException; import java.net.InetSocketAddress; import java.time.Duration; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataBroker; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.yangtools.util.concurrent.FluentFutures; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput; import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class SinkSingletonService implements ClusterSingletonService { +final class SinkSingletonService extends ChannelInitializer implements ClusterSingletonService { private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class); private static final ServiceGroupIdentifier SGID = ServiceGroupIdentifier.create(SinkSingletonService.class.getName()); @@ -45,7 +49,7 @@ final class SinkSingletonService implements ClusterSingletonService { static { try { - TREE_REQUEST = requestTree(TREE); + TREE_REQUEST = Unpooled.unreleasableBuffer(requestTree(TREE)); } catch (IOException e) { throw new ExceptionInInitializerError(e); } @@ -65,6 +69,7 @@ final class SinkSingletonService implements ClusterSingletonService { this.dataBroker = requireNonNull(dataBroker); this.sourceAddress = requireNonNull(sourceAddress); this.reconnectDelay = requireNonNull(reconnectDelay); + LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress); } @Override @@ -81,6 +86,7 @@ final class SinkSingletonService implements ClusterSingletonService { futureChannel = bs .option(ChannelOption.SO_KEEPALIVE, true) + .handler(this) .connect(sourceAddress, null); futureChannel.addListener(compl -> channelResolved(compl, group)); @@ -88,30 +94,39 @@ final class SinkSingletonService implements ClusterSingletonService { @Override public synchronized ListenableFuture closeServiceInstance() { - // TODO Auto-generated method stub - return null; + // FIXME: initiate orderly shutdown + return FluentFutures.immediateNullFluentFuture(); + } + + @Override + protected void initChannel(final SocketChannel ch) { + ch.pipeline() + .addLast("frameDecoder", new MessageFrameDecoder()) + .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain( + new SinkTransactionChainListener(ch)))) + .addLast("frameEncoder", MessageFrameEncoder.INSTANCE); } private synchronized void channelResolved(final Future completedFuture, final ScheduledExecutorService group) { - if (completedFuture != futureChannel) { - // Future changed, this callback is irrelevant + final Throwable cause = completedFuture.cause(); + if (cause != null) { + LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause); + group.schedule(() -> { + // FIXME: perform reconnect + }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS); return; } - final Channel channel = futureChannel.channel(); - channel.pipeline() - .addLast("frameDecoder", new MessageFrameDecoder()) - .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain( - new SinkTransactionChainListener(channel)))) - .addLast("frameEncoder", MessageFrameEncoder.instance()); - - channel.writeAndFlush(TREE_REQUEST); + final Channel ch = futureChannel.channel(); + LOG.info("Channel {} established", ch); + ch.writeAndFlush(TREE_REQUEST); } private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException { final ByteBuf ret = Unpooled.buffer(); try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) { + stream.writeByte(Constants.MSG_SUBSCRIBE_REQ); try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) { tree.getDatastoreType().writeTo(output); output.writeYangInstanceIdentifier(tree.getRootIdentifier());