import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
@GuardedBy("this")
private final Collection<SocketChannel> children = new HashSet<>();
+ private final Duration keepaliveInterval;
+ private final int maxMissedKeepalives;
@GuardedBy("this")
private Channel serverChannel;
SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
- final int listenPort) {
+ final int listenPort, final Duration keepaliveInterval, final int maxMissedKeepalives) {
this.bootstrapSupport = requireNonNull(bootstrapSupport);
this.dtcs = requireNonNull(dtcs);
this.listenPort = listenPort;
+ this.keepaliveInterval = requireNonNull(keepaliveInterval);
+ this.maxMissedKeepalives = maxMissedKeepalives;
LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
}
ch.pipeline()
.addLast("frameDecoder", new MessageFrameDecoder())
+ .addLast("idleStateHandler", new IdleStateHandler(0, keepaliveInterval.toNanos(), 0, TimeUnit.NANOSECONDS))
+ .addLast("keepaliveHandler", new SourceKeepaliveHandler(maxMissedKeepalives))
.addLast("requestHandler", new SourceRequestHandler(dtcs))
// Output, in reverse order
.addLast("frameEncoder", MessageFrameEncoder.INSTANCE)