import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.yangtools.util.concurrent.FluentFutures;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class SinkSingletonService implements ClusterSingletonService {
+final class SinkSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class);
private static final ServiceGroupIdentifier SGID =
ServiceGroupIdentifier.create(SinkSingletonService.class.getName());
static {
try {
- TREE_REQUEST = requestTree(TREE);
+ TREE_REQUEST = Unpooled.unreleasableBuffer(requestTree(TREE));
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
this.dataBroker = requireNonNull(dataBroker);
this.sourceAddress = requireNonNull(sourceAddress);
this.reconnectDelay = requireNonNull(reconnectDelay);
+ LOG.info("Replication sink from {} waiting for cluster-wide mastership", sourceAddress);
}
@Override
futureChannel = bs
.option(ChannelOption.SO_KEEPALIVE, true)
+ .handler(this)
.connect(sourceAddress, null);
futureChannel.addListener(compl -> channelResolved(compl, group));
@Override
public synchronized ListenableFuture<?> closeServiceInstance() {
- // TODO Auto-generated method stub
- return null;
+ // FIXME: initiate orderly shutdown
+ return FluentFutures.immediateNullFluentFuture();
+ }
+
+ @Override
+ protected void initChannel(final SocketChannel ch) {
+ ch.pipeline()
+ .addLast("frameDecoder", new MessageFrameDecoder())
+ .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
+ new SinkTransactionChainListener(ch))))
+ .addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
}
private synchronized void channelResolved(final Future<?> completedFuture, final ScheduledExecutorService group) {
- if (completedFuture != futureChannel) {
- // Future changed, this callback is irrelevant
+ final Throwable cause = completedFuture.cause();
+ if (cause != null) {
+ LOG.info("Failed to connect to source {}, reconnecting in {}", sourceAddress, reconnectDelay, cause);
+ group.schedule(() -> {
+ // FIXME: perform reconnect
+ }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
return;
}
- final Channel channel = futureChannel.channel();
- channel.pipeline()
- .addLast("frameDecoder", new MessageFrameDecoder())
- .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
- new SinkTransactionChainListener(channel))))
- .addLast("frameEncoder", MessageFrameEncoder.instance());
-
- channel.writeAndFlush(TREE_REQUEST);
+ final Channel ch = futureChannel.channel();
+ LOG.info("Channel {} established", ch);
+ ch.writeAndFlush(TREE_REQUEST);
}
private static ByteBuf requestTree(final DOMDataTreeIdentifier tree) throws IOException {
final ByteBuf ret = Unpooled.buffer();
try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) {
+ stream.writeByte(Constants.MSG_SUBSCRIBE_REQ);
try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) {
tree.getDatastoreType().writeTo(output);
output.writeYangInstanceIdentifier(tree.getRootIdentifier());