Code Review
/
mdsal.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Netty Replicator - improve the reconnection and keepalive mechanisms
[mdsal.git]
/
replicate
/
mdsal-replicate-netty
/
src
/
main
/
java
/
org
/
opendaylight
/
mdsal
/
replicate
/
netty
/
SourceSingletonService.java
diff --git
a/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java
b/replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java
index 415637269b0277954f59210893b3ba7665b366c2..d707fd1407e42f21f8791cefaf77d89c7d9c79cf 100644
(file)
--- a/
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java
+++ b/
replicate/mdsal-replicate-netty/src/main/java/org/opendaylight/mdsal/replicate/netty/SourceSingletonService.java
@@
-18,10
+18,13
@@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
@@
-45,14
+48,19
@@
final class SourceSingletonService extends ChannelInitializer<SocketChannel> imp
@GuardedBy("this")
private final Collection<SocketChannel> children = new HashSet<>();
@GuardedBy("this")
private final Collection<SocketChannel> children = new HashSet<>();
+ private final Duration keepaliveInterval;
+ private final int maxMissedKeepalives;
@GuardedBy("this")
private Channel serverChannel;
SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
@GuardedBy("this")
private Channel serverChannel;
SourceSingletonService(final BootstrapSupport bootstrapSupport, final DOMDataTreeChangeService dtcs,
- final int listenPort) {
+ final int listenPort
, final Duration keepaliveInterval, final int maxMissedKeepalives
) {
this.bootstrapSupport = requireNonNull(bootstrapSupport);
this.dtcs = requireNonNull(dtcs);
this.listenPort = listenPort;
this.bootstrapSupport = requireNonNull(bootstrapSupport);
this.dtcs = requireNonNull(dtcs);
this.listenPort = listenPort;
+ this.keepaliveInterval = requireNonNull(keepaliveInterval);
+ this.maxMissedKeepalives = maxMissedKeepalives;
+ LOG.info("Replication source on port {} waiting for cluster-wide mastership", listenPort);
}
@Override
}
@Override
@@
-111,12
+119,15
@@
final class SourceSingletonService extends ChannelInitializer<SocketChannel> imp
ch.pipeline()
.addLast("frameDecoder", new MessageFrameDecoder())
ch.pipeline()
.addLast("frameDecoder", new MessageFrameDecoder())
+ .addLast("idleStateHandler", new IdleStateHandler(0, keepaliveInterval.toNanos(), 0, TimeUnit.NANOSECONDS))
+ .addLast("keepaliveHandler", new SourceKeepaliveHandler(maxMissedKeepalives))
.addLast("requestHandler", new SourceRequestHandler(dtcs))
.addLast("requestHandler", new SourceRequestHandler(dtcs))
- .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()))
- .addLast("frameEncoder", MessageFrameEncoder.instance());
+ // Output, in reverse order
+ .addLast("frameEncoder", MessageFrameEncoder.INSTANCE)
+ .addLast("dtclHandler", new DeltaEncoder(NormalizedNodeStreamVersion.current()));
children.add(ch);
children.add(ch);
- LOG.
debug
("Channel {} established", ch);
+ LOG.
info
("Channel {} established", ch);
}
private static ListenableFuture<Void> closeChannel(final Channel ch) {
}
private static ListenableFuture<Void> closeChannel(final Channel ch) {