Remove (DOM)TransactionChainListener
[mdsal.git] / replicate / mdsal-replicate-netty / src / main / java / org / opendaylight / mdsal / replicate / netty / SinkSingletonService.java
index 975d75f6bf4163c14e57455ef5efd3b69621d0b9..243e08c2c6db81f18a97105b1c6708c47513cdb8 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.mdsal.replicate.netty;
 
 import static java.util.Objects.requireNonNull;
 
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
@@ -34,6 +35,7 @@ import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.yangtools.util.concurrent.FluentFutures;
+import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeDataOutput;
 import org.opendaylight.yangtools.yang.data.codec.binfmt.NormalizedNodeStreamVersion;
@@ -140,18 +142,32 @@ final class SinkSingletonService extends ChannelInitializer<SocketChannel> imple
 
     @Override
     protected void initChannel(final SocketChannel ch) {
+        final var txChain = dataBroker.createMergingTransactionChain();
+
         ch.pipeline()
             .addLast("frameDecoder", new MessageFrameDecoder())
             .addLast("idleStateHandler", new IdleStateHandler(
                 keepaliveInterval.toNanos() * maxMissedKeepalives, 0, 0, TimeUnit.NANOSECONDS))
             .addLast("keepaliveHandler", new SinkKeepaliveHandler())
-            .addLast("requestHandler", new SinkRequestHandler(TREE, dataBroker.createMergingTransactionChain(
-                new SinkTransactionChainListener(ch))))
+            .addLast("requestHandler", new SinkRequestHandler(TREE, txChain))
             .addLast("frameEncoder", MessageFrameEncoder.INSTANCE);
+
+        txChain.addCallback(new FutureCallback<>() {
+            @Override
+            public void onSuccess(final Empty result) {
+                LOG.info("Transaction chain for channel {} completed", ch);
+            }
+
+            @Override
+            public void onFailure(final Throwable cause) {
+                LOG.error("Transaction chain for channel {} failed", ch, cause);
+                ch.close();
+            }
+        });
     }
 
     private synchronized void channelResolved(final ChannelFuture completedFuture,
-        final ScheduledExecutorService group) {
+            final ScheduledExecutorService group) {
         if (futureChannel != null && futureChannel.channel() == completedFuture.channel()) {
             if (completedFuture.isSuccess()) {
                 final Channel ch = completedFuture.channel();