X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=9230591d46b4d80e244d3d81c653dfebc559a4cf;hb=9ddc65e1ddae50f691566cd9382707679436c055;hp=8af1c83c558a0a6b4842b9cb7accfa0a6e68a79b;hpb=3997099eb61b0f2adc47f7a85952c324e9de223f;p=controller.git
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
index 8af1c83c55..9230591d46 100644
--- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
+++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
@@ -18,6 +18,8 @@ import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -56,7 +58,6 @@ import scala.concurrent.duration.FiniteDuration;
* for update.
*
*/
-
public class Gossiper extends AbstractUntypedActorWithMetering {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -69,7 +70,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
private Address selfAddress;
/**
- * All known cluster members
+ * All known cluster members.
*/
private List
clusterMembers = new ArrayList<>();
@@ -77,23 +78,26 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
private Boolean autoStartGossipTicks = true;
- private RemoteRpcProviderConfig config;
+ private final RemoteRpcProviderConfig config;
- public Gossiper(){
- config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ public Gossiper(RemoteRpcProviderConfig config) {
+ this.config = Preconditions.checkNotNull(config);
}
/**
- * Helpful for testing
+ * Constructor for testing.
+ *
* @param autoStartGossipTicks used for turning off gossip ticks during testing.
* Gossip tick can be manually sent.
*/
- public Gossiper(Boolean autoStartGossipTicks){
+ @VisibleForTesting
+ public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
+ this(config);
this.autoStartGossipTicks = autoStartGossipTicks;
}
@Override
- public void preStart(){
+ public void preStart() {
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
@@ -118,7 +122,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
}
@Override
- public void postStop(){
+ public void postStop() {
if (cluster != null) {
cluster.unsubscribe(getSelf());
}
@@ -127,6 +131,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
}
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
@@ -147,7 +152,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
} else if (message instanceof ClusterEvent.MemberRemoved) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
- } else if ( message instanceof ClusterEvent.UnreachableMember){
+ } else if ( message instanceof ClusterEvent.UnreachableMember) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
} else {
@@ -162,20 +167,19 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
*/
void receiveMemberRemoveOrUnreachable(Member member) {
//if its self, then stop itself
- if (selfAddress.equals(member.address())){
+ if (selfAddress.equals(member.address())) {
getContext().stop(getSelf());
return;
}
clusterMembers.remove(member.address());
- if(log.isDebugEnabled()) {
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
- }
+ log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
- * Add member to the local copy of member list if it doesnt already
- * @param member
+ * Add member to the local copy of member list if it doesn't already.
+ *
+ * @param member the member to add
*/
void receiveMemberUp(Member member) {
@@ -186,18 +190,18 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
if (!clusterMembers.contains(member.address())) {
clusterMembers.add(member.address());
}
- if(log.isDebugEnabled()) {
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
- }
+
+ log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
- * Sends Gossip status to other members in the cluster.
- * 1. If there are no member, ignore the tick.
- * 2. If there's only 1 member, send gossip status (bucket versions) to it.
+ * Sends Gossip status to other members in the cluster.
+ *
+ * 1. If there are no member, ignore the tick.
+ * 2. If there's only 1 member, send gossip status (bucket versions) to it.
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
- void receiveGossipTick(){
+ void receiveGossipTick() {
if (clusterMembers.size() == 0) {
return; //no members to send gossip status to
}
@@ -210,16 +214,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
- if(log.isDebugEnabled()) {
- log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
- }
+
+ log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
/**
* Process gossip status received from a remote gossiper. Remote versions are compared with
- * the local copy.
- *
+ * the local copy.
+ *
* For each bucket
*
*
If local copy is newer, the newer buckets are sent in GossipEnvelope to remote
@@ -229,7 +232,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
*
* @param status bucket versions from a remote member
*/
- void receiveGossipStatus(GossipStatus status){
+ void receiveGossipStatus(GossipStatus status) {
//Don't accept messages from non-members
if (!clusterMembers.contains(status.from())) {
return;
@@ -248,11 +251,12 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
*
* @param envelope contains buckets from a remote gossiper
*/
- void receiveGossip(GossipEnvelope envelope){
+ > void receiveGossip(GossipEnvelope envelope) {
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- if(log.isDebugEnabled()) {
- log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ if (log.isTraceEnabled()) {
+ log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
+ envelope.to());
}
return;
}
@@ -262,23 +266,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
}
/**
- * Helper to send received buckets to bucket store
+ * Helper to send received buckets to bucket store.
*
- * @param buckets
+ * @param buckets map of Buckets to update
*/
- void updateRemoteBuckets(Map buckets) {
-
- UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
+ > void updateRemoteBuckets(Map> buckets) {
+ UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
getContext().parent().tell(updateRemoteBuckets, getSelf());
}
/**
- * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
+ * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
*
* @param remote remote node to send Buckets to
* @param addresses node addresses whose buckets needs to be sent
*/
- void sendGossipTo(final ActorRef remote, final Set addresses){
+ void sendGossipTo(final ActorRef remote, final Set addresses) {
Future