Merge "(Bug 2035) - Increasing default Akka config for auto-down of unreachable nodes...
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
index 85c6ebe26f859e0751122d4ab60065a0f1b48aba..8af1c83c558a0a6b4842b9cb7accfa0a6e68a79b 100644 (file)
@@ -17,14 +17,7 @@ import akka.cluster.ClusterActorRefProvider;
 import akka.cluster.ClusterEvent;
 import akka.cluster.Member;
 import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
 import akka.pattern.Patterns;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -32,15 +25,20 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Gossiper that syncs bucket store across nodes in the cluster.
@@ -61,7 +59,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go
 
 public class Gossiper extends AbstractUntypedActorWithMetering {
 
-    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+    private final Logger log = LoggerFactory.getLogger(getClass());
 
     private Cluster cluster;
 
@@ -121,30 +119,29 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
     @Override
     public void postStop(){
-        if (cluster != null)
+        if (cluster != null) {
             cluster.unsubscribe(getSelf());
-        if (gossipTask != null)
+        }
+        if (gossipTask != null) {
             gossipTask.cancel();
+        }
     }
 
     @Override
     protected void handleReceive(Object message) throws Exception {
         //Usually sent by self via gossip task defined above. But its not enforced.
         //These ticks can be sent by another actor as well which is esp. useful while testing
-        if (message instanceof GossipTick)
+        if (message instanceof GossipTick) {
             receiveGossipTick();
-
-            //Message from remote gossiper with its bucket versions
-        else if (message instanceof GossipStatus)
+        } else if (message instanceof GossipStatus) {
+            // Message from remote gossiper with its bucket versions
             receiveGossipStatus((GossipStatus) message);
-
-            //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
-            //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
-            //message with its local versions
-        else if (message instanceof GossipEnvelope)
+        } else if (message instanceof GossipEnvelope) {
+            // Message from remote gossiper with buckets. This is usually in response to GossipStatus
+            // message. The contained buckets are newer as determined by the remote gossiper by
+            // comparing the GossipStatus message with its local versions.
             receiveGossip((GossipEnvelope) message);
-
-        else if (message instanceof ClusterEvent.MemberUp) {
+        } else if (message instanceof ClusterEvent.MemberUp) {
             receiveMemberUp(((ClusterEvent.MemberUp) message).member());
 
         } else if (message instanceof ClusterEvent.MemberRemoved) {
@@ -153,8 +150,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         } else if ( message instanceof ClusterEvent.UnreachableMember){
             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
 
-        } else
+        } else {
             unhandled(message);
+        }
     }
 
     /**
@@ -170,7 +168,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         }
 
         clusterMembers.remove(member.address());
-        log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+        if(log.isDebugEnabled()) {
+            log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+        }
     }
 
     /**
@@ -179,13 +179,16 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     void receiveMemberUp(Member member) {
 
-        if (selfAddress.equals(member.address()))
+        if (selfAddress.equals(member.address())) {
             return; //ignore up notification for self
+        }
 
-        if (!clusterMembers.contains(member.address()))
+        if (!clusterMembers.contains(member.address())) {
             clusterMembers.add(member.address());
-
-        log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+        }
+        if(log.isDebugEnabled()) {
+            log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+        }
     }
 
     /**
@@ -195,18 +198,21 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
      */
     void receiveGossipTick(){
-        if (clusterMembers.size() == 0) return; //no members to send gossip status to
+        if (clusterMembers.size() == 0) {
+            return; //no members to send gossip status to
+        }
 
         Address remoteMemberToGossipTo;
 
-        if (clusterMembers.size() == 1)
+        if (clusterMembers.size() == 1) {
             remoteMemberToGossipTo = clusterMembers.get(0);
-        else {
+        else {
             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
         }
-
-        log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+        if(log.isDebugEnabled()) {
+            log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+        }
         getLocalStatusAndSendTo(remoteMemberToGossipTo);
     }
 
@@ -225,8 +231,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     void receiveGossipStatus(GossipStatus status){
         //Don't accept messages from non-members
-        if (!clusterMembers.contains(status.from()))
+        if (!clusterMembers.contains(status.from())) {
             return;
+        }
 
         final ActorRef sender = getSender();
         Future<Object> futureReply =
@@ -244,7 +251,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     void receiveGossip(GossipEnvelope envelope){
         //TODO: Add more validations
         if (!selfAddress.equals(envelope.to())) {
-            log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+            if(log.isDebugEnabled()) {
+                log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+            }
             return;
         }
 
@@ -291,7 +300,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
         ActorSelection remoteRef = getContext().system().actorSelection(
                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
 
-        log.debug("Sending bucket versions to [{}]", remoteRef);
+        if(log.isDebugEnabled()) {
+            log.debug("Sending bucket versions to [{}]", remoteRef);
+        }
 
         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
 
@@ -377,19 +388,23 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
 
                     for (Address address : remoteVersions.keySet()){
 
-                        if (localVersions.get(address) == null || remoteVersions.get(address) == null)
+                        if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
                             continue; //this condition is taken care of by above diffs
-                        if (localVersions.get(address) <  remoteVersions.get(address))
+                        }
+                        if (localVersions.get(address) <  remoteVersions.get(address)) {
                             localIsOlder.add(address);
-                        else if (localVersions.get(address) > remoteVersions.get(address))
+                        } else if (localVersions.get(address) > remoteVersions.get(address)) {
                             localIsNewer.add(address);
+                        }
                     }
 
-                    if (!localIsOlder.isEmpty())
+                    if (!localIsOlder.isEmpty()) {
                         sendGossipStatusTo(sender, localVersions );
+                    }
 
-                    if (!localIsNewer.isEmpty())
+                    if (!localIsNewer.isEmpty()) {
                         sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+                    }
 
                 }
                 return null;
@@ -416,7 +431,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             public Void apply(Object msg) {
                 if (msg instanceof GetBucketsByMembersReply) {
                     Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
-                    log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+                    if(log.isDebugEnabled()) {
+                        log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+                    }
                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
                     sender.tell(envelope, getSelf());
                 }