Do not use ClusteredDOMDataTreeChangeListener
[mdsal.git] / replicate / mdsal-replicate-netty / src / main / java / org / opendaylight / mdsal / replicate / netty / SinkSingletonService.java
index 98e154e3d8f7a6f58983474d4552c7316c09f3d0..648cf13547f04f158a0dad7aecffc680b8c7696a 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.mdsal.replicate.netty;
 
 import static java.util.Objects.requireNonNull;
 
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
@@ -31,9 +32,10 @@ import org.checkerframework.checker.lock.qual.Holding;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
-import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.mdsal.singleton.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.api.ServiceGroupIdentifier;
 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
@@ -42,11 +44,10 @@ import org.slf4j.LoggerFactory;
 
 final class SinkSingletonService extends ChannelInitializer<SocketChannel> implements ClusterSingletonService {
     private static final Logger LOG = LoggerFactory.getLogger(SinkSingletonService.class);
-    private static final ServiceGroupIdentifier SGID =
-            ServiceGroupIdentifier.create(SinkSingletonService.class.getName());
+    private static final ServiceGroupIdentifier SGID = new ServiceGroupIdentifier(SinkSingletonService.class.getName());
     // TODO: allow different trees?
-    private static final DOMDataTreeIdentifier TREE = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
-        YangInstanceIdentifier.empty());
+    private static final DOMDataTreeIdentifier TREE = DOMDataTreeIdentifier.of(LogicalDatastoreType.CONFIGURATION,
+        YangInstanceIdentifier.of());
     private static long CHANNEL_CLOSE_TIMEOUT_S = 10;
     private static final ByteBuf TREE_REQUEST;
 
@@ -90,7 +91,7 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     @Override
     public synchronized void instantiateServiceInstance() {
         LOG.info("Replication sink started with source {}", sourceAddress);
-        this.bs = bootstrapSupport.newBootstrap();
+        bs = bootstrapSupport.newBootstrap();
         doConnect();
     }
 
@@ -141,18 +142,32 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
 
     @Override
     protected void initChannel(final SocketChannel ch) {
+        final var txChain = dataBroker.createMergingTransactionChain();
+
         ch.pipeline()
             .addLast("frameDecoder", new MessageFrameDecoder())
             .addLast("idleStateHandler", new IdleStateHandler(
                 keepaliveInterval.toNanos() * maxMissedKeepalives, 0, 0, TimeUnit.NANOSECONDS))
             .addLast("keepaliveHandler", new SinkKeepaliveHandler())
-            .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
-                new SinkTransactionChainListener(ch))))
+            .addLast("requestHandler", new SinkRequestHandler(TREE, txChain))
             .addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
+
+        txChain.addCallback(new FutureCallback<>() {
+            @Override
+            public void onSuccess(final Empty result) {
+                LOG.info("Transaction chain for channel {} completed", ch);
+            }
+
+            @Override
+            public void onFailure(final Throwable cause) {
+                LOG.error("Transaction chain for channel {} failed", ch, cause);
+                ch.close();
+            }
+        });
     }
 
     private synchronized void channelResolved(final ChannelFuture completedFuture,
-        final ScheduledExecutorService group) {
+            final ScheduledExecutorService group) {
         if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
             if (completedFuture.isSuccess()) {
                 final Channel ch = completedFuture.channel();
@@ -170,14 +185,10 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
     }
 
     private synchronized void channelClosed(final ChannelFuture completedFuture, final ScheduledExecutorService group) {
-        if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
-            if (!closingInstance) {
-                LOG.info("Channel {} lost connection to source {}, reconnecting in {}", completedFuture.channel(),
-                    sourceAddress, reconnectDelay.getSeconds());
-                group.schedule(() -> {
-                    reconnect();
-                }, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
-            }
+        if (futureChannel != null && futureChannel.channel() == completedFuture.channel() && !closingInstance) {
+            LOG.info("Channel {} lost connection to source {}, reconnecting in {}", completedFuture.channel(),
+                sourceAddress, reconnectDelay.getSeconds());
+            group.schedule(this::reconnect, reconnectDelay.toNanos(), TimeUnit.NANOSECONDS);
         }
     }
 
@@ -187,8 +198,8 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
         try (ByteBufOutputStream stream = new ByteBufOutputStream(ret)) {
             stream.writeByte(Constants.MSG_SUBSCRIBE_REQ);
             try (NormalizedNodeDataOutput output = NormalizedNodeStreamVersion.current().newDataOutput(stream)) {
-                tree.getDatastoreType().writeTo(output);
-                output.writeYangInstanceIdentifier(tree.getRootIdentifier());
+                tree.datastore().writeTo(output);
+                output.writeYangInstanceIdentifier(tree.path());
             }
         }