import static java.util.Objects.requireNonNull;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.mdsal.singleton.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.api.ServiceGroupIdentifier;
import org.opendaylight.yangtools.util.concurrent.FluentFutures;
+import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
final class SinkSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class);
- private static final ServiceGroupIdentifier SGID =
- ServiceGroupIdentifier.create(SinkSingletonService.class.getName());
+ private static final ServiceGroupIdentifier SGID = new ServiceGroupIdentifier(SinkSingletonService.class.getName());
// TODO: allow different trees?
- private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
- YangInstanceIdentifier.empty());
+ private static final DOMDataTreeIdentifier TREE = DOMDataTreeIdentifier.of(LogicalDatastoreType.CONFIGURATION,
+ YangInstanceIdentifier.of());
private static long CHANNEL_CLOSE_TIMEOUT_S = 10;
private static final ByteBuf TREE_REQUEST;
@Override
public synchronized void instantiateServiceInstance() {
LOG.info("Replication sink started with source {}", sourceAddress);
- this.bs = bootstrapSupport.newBootstrap();
+ bs = bootstrapSupport.newBootstrap();
doConnect();
}
@Override
protected void initChannel(final SocketChannel ch) {
+ final var txChain = dataBroker.createMergingTransactionChain();
+
ch.pipeline()
.addLast("frameDecoder", new MessageFrameDecoder())
.addLast("idleStateHandler", new IdleStateHandler(
keepaliveInterval.toNanos() * maxMissedKeepalives, 0, 0, TimeUnit.NANOSECONDS))
.addLast("keepaliveHandler", new SinkKeepaliveHandler())
- .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
- new SinkTransactionChainListener(ch))))
+ .addLast("requestHandler", new SinkRequestHandler(TREE, txChain))
.addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
+
+ txChain.addCallback(new FutureCallback<>() {
+ @Override
+ public void onSuccess(final Empty result) {
+ LOG.info("Transaction chain for channel {} completed", ch);
+ }
+
+ @Override
+ public void onFailure(final Throwable cause) {
+ LOG.error("Transaction chain for channel {} failed", ch, cause);
+ ch.close();
+ }
+ });
}
private synchronized void channelResolved(final ChannelFuture completedFuture,
- final ScheduledExecutorService group) {
+ final ScheduledExecutorService group) {
if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
if (completedFuture.isSuccess()) {
final Channel ch = completedFuture.channel();
}
private synchronized void channelClosed(final ChannelFuture completedFuture, final ScheduledExecutorService group) {
- if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
- if (!closingInstance) {
- LOG.info("Channel {} lost connection to source {}, reconnecting in {}", completedFuture.channel(),
- sourceAddress, reconnectDelay.getSeconds());
- group.schedule(() -> {
- reconnect();
- }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
- }
+ if (futureChannel != null && futureChannel.channel() == completedFuture.channel() && !closingInstance) {
+ LOG.info("Channel {} lost connection to source {}, reconnecting in {}", completedFuture.channel(),
+ sourceAddress, reconnectDelay.getSeconds());
+ group.schedule(this::reconnect, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
}
}
try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) {
stream.writeByte(Constants.MSG_SUBSCRIBE_REQ);
try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) {
- tree.getDatastoreType().writeTo(output);
- output.writeYangInstanceIdentifier(tree.getRootIdentifier());
+ tree.datastore().writeTo(output);
+ output.writeYangInstanceIdentifier(tree.path());
}
}