Netty Replicator - improve the reconnection and keepalive mechanisms
[mdsal.git] / replicate / mdsal-replicate-netty / src / main / java / org / opendaylight / mdsal / replicate / netty / SourceSingletonService.java
index 2d2b35ecfb851f5c08fb06511e9292063459fc52..d707fd1407e42f21f8791cefaf77d89c7d9c79cf 100644 (file)
@@ -18,10 +18,13 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
@@ -45,14 +48,18 @@ final class SourceSingletonService extends ChannelInitializer<SocketChannel> imp
 
     @GuardedBy("this")
     private final Collection<SocketChannel> children = new HashSet<>();
+    private final Duration keepaliveInterval;
+    private final int maxMissedKeepalives;
     @GuardedBy("this")
     private Channel serverChannel;
 
     SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
-            final int listenPort) {
+            final int listenPort, final Duration keepaliveInterval, final int maxMissedKeepalives) {
         this.bootstrapSupport = requireNonNull(bootstrapSupport);
         this.dtcs = requireNonNull(dtcs);
         this.listenPort = listenPort;
+        this.keepaliveInterval = requireNonNull(keepaliveInterval);
+        this.maxMissedKeepalives = maxMissedKeepalives;
         LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
     }
 
@@ -112,6 +119,8 @@ final class SourceSingletonService extends ChannelInitializer<SocketChannel> imp
 
         ch.pipeline()
             .addLast("frameDecoder", new MessageFrameDecoder())
+            .addLast("idleStateHandler", new IdleStateHandler(0, keepaliveInterval.toNanos(), 0, TimeUnit.NANOSECONDS))
+            .addLast("keepaliveHandler", new SourceKeepaliveHandler(maxMissedKeepalives))
             .addLast("requestHandler", new SourceRequestHandler(dtcs))
             // Output, in reverse order
             .addLast("frameEncoder", MessageFrameEncoder.INSTANCE)