BUG-6937: Add ReachableMember case to Gossiper
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
index 8af1c83c558a0a6b4842b9cb7accfa0a6e68a79b..a50a2723cd2e71e55604be4162acfe5ed47dc333 100644 (file)
@@ -18,6 +18,7 @@ import akka.cluster.ClusterEvent;
 import akka.cluster.Member;
 import akka.dispatch.Mapper;
 import akka.pattern.Patterns;
+import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -77,10 +78,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
     private Boolean autoStartGossipTicks = true;
 
-    private RemoteRpcProviderConfig config;
+    private final RemoteRpcProviderConfig config;
 
-    public Gossiper(){
-        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+    public Gossiper(RemoteRpcProviderConfig config){
+        this.config = Preconditions.checkNotNull(config);
     }
 
     /**
@@ -88,7 +89,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param autoStartGossipTicks used for turning off gossip ticks during testing.
      *                             Gossip tick can be manually sent.
      */
-    public Gossiper(Boolean autoStartGossipTicks){
+    public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config){
+        this(config);
         this.autoStartGossipTicks = autoStartGossipTicks;
     }
 
@@ -102,6 +104,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             cluster.subscribe(getSelf(),
                     ClusterEvent.initialStateAsEvents(),
                     ClusterEvent.MemberEvent.class,
+                    ClusterEvent.ReachableMember.class,
                     ClusterEvent.UnreachableMember.class);
         }
 
@@ -142,7 +145,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             // comparing the GossipStatus message with its local versions.
             receiveGossip((GossipEnvelope) message);
         } else if (message instanceof ClusterEvent.MemberUp) {
-            receiveMemberUp(((ClusterEvent.MemberUp) message).member());
+            receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
+
+        } else if (message instanceof ClusterEvent.ReachableMember) {
+            receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
 
         } else if (message instanceof ClusterEvent.MemberRemoved) {
             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
@@ -177,10 +183,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * Add member to the local copy of member list if it doesnt already
      * @param member
      */
-    void receiveMemberUp(Member member) {
+    void receiveMemberUpOrReachable(final Member member) {
 
         if (selfAddress.equals(member.address())) {
-            return; //ignore up notification for self
+            //ignore up notification for self
+            return;
         }
 
         if (!clusterMembers.contains(member.address())) {
@@ -210,8 +217,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
         }
-        if(log.isDebugEnabled()) {
-            log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+        if(log.isTraceEnabled()) {
+            log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
         }
         getLocalStatusAndSendTo(remoteMemberToGossipTo);
     }
@@ -251,8 +258,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     void receiveGossip(GossipEnvelope envelope){
         //TODO: Add more validations
         if (!selfAddress.equals(envelope.to())) {
-            if(log.isDebugEnabled()) {
-                log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+            if(log.isTraceEnabled()) {
+                log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
             }
             return;
         }
@@ -300,8 +307,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorSelection remoteRef = getContext().system().actorSelection(
                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
 
-        if(log.isDebugEnabled()) {
-            log.debug("Sending bucket versions to [{}]", remoteRef);
+        if(log.isTraceEnabled()) {
+            log.trace("Sending bucket versions to [{}]", remoteRef);
         }
 
         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
@@ -431,8 +438,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             public Void apply(Object msg) {
                 if (msg instanceof GetBucketsByMembersReply) {
                     Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
-                    if(log.isDebugEnabled()) {
-                        log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+                    if(log.isTraceEnabled()) {
+                        log.trace("Buckets to send from {}: {}", selfAddress, buckets);
                     }
                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
                     sender.tell(envelope, getSelf());