BUG-3128: cache ActorSelections
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
index db5004145260ffb7c5bc88e85837332e2cd51bc3..33b3f6e813fb95cd1f4090f53255949baee716a7 100644 (file)
@@ -21,8 +21,9 @@ import akka.dispatch.Mapper;
 import akka.pattern.Patterns;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import akka.pattern.Patterns;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -70,6 +71,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     private final List<Address> clusterMembers = new ArrayList<>();
 
      */
     private final List<Address> clusterMembers = new ArrayList<>();
 
+    /**
+     * Cached ActorSelections for remote peers.
+     */
+    private final Map<Address, ActorSelection> peers = new HashMap<>();
+
     /**
      * ActorSystem's address for the current cluster node.
      */
     /**
      * ActorSystem's address for the current cluster node.
      */
@@ -176,10 +182,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             return;
         }
 
             return;
         }
 
-        clusterMembers.remove(member.address());
+        removePeer(member.address());
         LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
         LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+    }
 
 
-        getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender());
+    private void addPeer(final Address address) {
+        if (!clusterMembers.contains(address)) {
+            clusterMembers.add(address);
+        }
+        peers.computeIfAbsent(address, input -> getContext().system()
+            .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
+    }
+
+    private void removePeer(final Address address) {
+        clusterMembers.remove(address);
+        peers.remove(address);
+        getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
     }
 
     /**
     }
 
     /**
@@ -193,10 +211,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             return;
         }
 
             return;
         }
 
-        if (!clusterMembers.contains(member.address())) {
-            clusterMembers.add(member.address());
-        }
-
+        addPeer(member.address());
         LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
     }
 
         LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
     }
 
@@ -209,22 +224,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     @VisibleForTesting
     void receiveGossipTick() {
      */
     @VisibleForTesting
     void receiveGossipTick() {
-        final Address remoteMemberToGossipTo;
+        final Address address;
         switch (clusterMembers.size()) {
             case 0:
                 //no members to send gossip status to
                 return;
             case 1:
         switch (clusterMembers.size()) {
             case 0:
                 //no members to send gossip status to
                 return;
             case 1:
-                remoteMemberToGossipTo = clusterMembers.get(0);
+                address = clusterMembers.get(0);
                 break;
             default:
                 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
                 break;
             default:
                 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
-                remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+                address = clusterMembers.get(randomIndex);
                 break;
         }
 
                 break;
         }
 
-        LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo);
-        getLocalStatusAndSendTo(remoteMemberToGossipTo);
+        LOG.trace("Gossiping to [{}]", address);
+        getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
     }
 
     /**
     }
 
     /**
@@ -242,8 +257,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     @VisibleForTesting
     void receiveGossipStatus(final GossipStatus status) {
      */
     @VisibleForTesting
     void receiveGossipStatus(final GossipStatus status) {
-        //Don't accept messages from non-members
-        if (!clusterMembers.contains(status.from())) {
+        // Don't accept messages from non-members
+        if (!peers.containsKey(status.from())) {
             return;
         }
 
             return;
         }
 
@@ -299,37 +314,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param remoteActorSystemAddress remote gossiper to send to
      */
     @VisibleForTesting
      * @param remoteActorSystemAddress remote gossiper to send to
      */
     @VisibleForTesting
-    void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) {
+    void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
 
         //Get local status from bucket store and send to remote
         Future<Object> futureReply =
                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
 
         //Get local status from bucket store and send to remote
         Future<Object> futureReply =
                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
-        //Find gossiper on remote system
-        ActorSelection remoteRef = getContext().system().actorSelection(
-                remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
-
-        LOG.trace("Sending bucket versions to [{}]", remoteRef);
-
-        futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
-    }
-
-    /**
-     * Helper to send bucket versions received from local store.
-     *
-     * @param remote        remote gossiper to send versions to
-     * @param localVersions bucket versions received from local store
-     */
-    void sendGossipStatusTo(final ActorRef remote, final Map<Address, Long> localVersions) {
+        LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
 
 
-        GossipStatus status = new GossipStatus(selfAddress, localVersions);
-        remote.tell(status, getSelf());
-    }
-
-    void sendGossipStatusTo(final ActorSelection remote, final Map<Address, Long> localVersions) {
-
-        GossipStatus status = new GossipStatus(selfAddress, localVersions);
-        remote.tell(status, getSelf());
+        futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
     }
 
     ///
     }
 
     ///
@@ -345,8 +338,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
                     Map<Address, Long> localVersions = reply.getVersions();
 
                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
                     Map<Address, Long> localVersions = reply.getVersions();
 
-                    sendGossipStatusTo(remote, localVersions);
-
+                    remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
                 }
                 return null;
             }
                 }
                 return null;
             }
@@ -410,7 +402,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                     }
 
                     if (!localIsOlder.isEmpty()) {
                     }
 
                     if (!localIsOlder.isEmpty()) {
-                        sendGossipStatusTo(sender, localVersions);
+                        sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
                     }
 
                     if (!localIsNewer.isEmpty()) {
                     }
 
                     if (!localIsNewer.isEmpty()) {
@@ -459,6 +451,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     @VisibleForTesting
     void setClusterMembers(final Address... members) {
         clusterMembers.clear();
     @VisibleForTesting
     void setClusterMembers(final Address... members) {
         clusterMembers.clear();
-        clusterMembers.addAll(Arrays.asList(members));
+        peers.clear();
+
+        for (Address addr : members) {
+            addPeer(addr);
+        }
     }
 }
     }
 }