X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=replicate%2Fmdsal-replicate-netty%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fmdsal%2Freplicate%2Fnetty%2FSourceSingletonService.java;h=d707fd1407e42f21f8791cefaf77d89c7d9c79cf;hb=b94fbc60c0b41da2f6645ab51188fbcdfa74e4af;hp=2d2b35ecfb851f5c08fb06511e9292063459fc52;hpb=2bccf96d3e3e512c02cd07a43ad2bd60dd181886;p=mdsal.git diff --git a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java index 2d2b35ecfb..d707fd1407 100644 --- a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java +++ b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java @@ -18,10 +18,13 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.concurrent.TimeUnit; import org.checkerframework.checker.lock.qual.GuardedBy; import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; @@ -45,14 +48,18 @@ final class SourceSingletonService extends ChannelInitializer imp @GuardedBy("this") private final Collection children = new HashSet<>(); + private final Duration keepaliveInterval; + private final int maxMissedKeepalives; @GuardedBy("this") private Channel serverChannel; SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs, - final int listenPort) { + final int listenPort, final Duration keepaliveInterval, final int maxMissedKeepalives) { this.bootstrapSupport = requireNonNull(bootstrapSupport); this.dtcs = requireNonNull(dtcs); this.listenPort = listenPort; + this.keepaliveInterval = requireNonNull(keepaliveInterval); + this.maxMissedKeepalives = maxMissedKeepalives; LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort); } @@ -112,6 +119,8 @@ final class SourceSingletonService extends ChannelInitializer imp ch.pipeline() .addLast("frameDecoder", new MessageFrameDecoder()) + .addLast("idleStateHandler", new IdleStateHandler(0, keepaliveInterval.toNanos(), 0, TimeUnit.NANOSECONDS)) + .addLast("keepaliveHandler", new SourceKeepaliveHandler(maxMissedKeepalives)) .addLast("requestHandler", new SourceRequestHandler(dtcs)) // Output, in reverse order .addLast("frameEncoder", MessageFrameEncoder.INSTANCE)