import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
@GuardedBy("this")
private final Collection<SocketChannel> children = new HashSet<>();
+ private final Duration keepaliveInterval;
+ private final int maxMissedKeepalives;
@GuardedBy("this")
private Channel serverChannel;
SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
- final int listenPort) {
+ final int listenPort, final Duration keepaliveInterval, final int maxMissedKeepalives) {
this.bootstrapSupport = requireNonNull(bootstrapSupport);
this.dtcs = requireNonNull(dtcs);
this.listenPort = listenPort;
+ this.keepaliveInterval = requireNonNull(keepaliveInterval);
+ this.maxMissedKeepalives = maxMissedKeepalives;
+ LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
}
@Override
ch.pipeline()
.addLast("frameDecoder", new MessageFrameDecoder())
+ .addLast("idleStateHandler", new IdleStateHandler(0, keepaliveInterval.toNanos(), 0, TimeUnit.NANOSECONDS))
+ .addLast("keepaliveHandler", new SourceKeepaliveHandler(maxMissedKeepalives))
.addLast("requestHandler", new SourceRequestHandler(dtcs))
- .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()))
- .addLast("frameEncoder", MessageFrameEncoder.instance());
+ // Output, in reverse order
+ .addLast("frameEncoder", MessageFrameEncoder.INSTANCE)
+ .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()));
children.add(ch);
- LOG.debug("Channel {} established", ch);
+ LOG.info("Channel {} established", ch);
}
private static ListenableFuture<Void> closeChannel(final Channel ch) {