BUG-3128: cache ActorSelections 84/50584/12
authorRobert Varga <rovarga@cisco.com>
Tue, 17 Jan 2017 22:58:01 +0000 (23:58 +0100)
committerRobert Varga <rovarga@cisco.com>
Mon, 30 Jan 2017 13:52:44 +0000 (14:52 +0100)
This is a performance optimization. Since the ActorSelection
for a remote node is an invariant, keep a handy cache of
these objects so we do not have to construct them on every
GossipTick.

Change-Id: I820c1d9be5c198a6cac7932b0de0e0776b35b0a5
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java

index db50041..33b3f6e 100644 (file)
@@ -21,8 +21,9 @@ import akka.dispatch.Mapper;
 import akka.pattern.Patterns;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -70,6 +71,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     private final List<Address> clusterMembers = new ArrayList<>();
 
+    /**
+     * Cached ActorSelections for remote peers.
+     */
+    private final Map<Address, ActorSelection> peers = new HashMap<>();
+
     /**
      * ActorSystem's address for the current cluster node.
      */
@@ -176,10 +182,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             return;
         }
 
-        clusterMembers.remove(member.address());
+        removePeer(member.address());
         LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+    }
 
-        getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender());
+    private void addPeer(final Address address) {
+        if (!clusterMembers.contains(address)) {
+            clusterMembers.add(address);
+        }
+        peers.computeIfAbsent(address, input -> getContext().system()
+            .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
+    }
+
+    private void removePeer(final Address address) {
+        clusterMembers.remove(address);
+        peers.remove(address);
+        getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
     }
 
     /**
@@ -193,10 +211,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
             return;
         }
 
-        if (!clusterMembers.contains(member.address())) {
-            clusterMembers.add(member.address());
-        }
-
+        addPeer(member.address());
         LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
     }
 
@@ -209,22 +224,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     @VisibleForTesting
     void receiveGossipTick() {
-        final Address remoteMemberToGossipTo;
+        final Address address;
         switch (clusterMembers.size()) {
             case 0:
                 //no members to send gossip status to
                 return;
             case 1:
-                remoteMemberToGossipTo = clusterMembers.get(0);
+                address = clusterMembers.get(0);
                 break;
             default:
                 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
-                remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+                address = clusterMembers.get(randomIndex);
                 break;
         }
 
-        LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo);
-        getLocalStatusAndSendTo(remoteMemberToGossipTo);
+        LOG.trace("Gossiping to [{}]", address);
+        getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
     }
 
     /**
@@ -242,8 +257,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      */
     @VisibleForTesting
     void receiveGossipStatus(final GossipStatus status) {
-        //Don't accept messages from non-members
-        if (!clusterMembers.contains(status.from())) {
+        // Don't accept messages from non-members
+        if (!peers.containsKey(status.from())) {
             return;
         }
 
@@ -299,37 +314,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
      * @param remoteActorSystemAddress remote gossiper to send to
      */
     @VisibleForTesting
-    void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) {
+    void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
 
         //Get local status from bucket store and send to remote
         Future<Object> futureReply =
                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
 
-        //Find gossiper on remote system
-        ActorSelection remoteRef = getContext().system().actorSelection(
-                remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
-
-        LOG.trace("Sending bucket versions to [{}]", remoteRef);
-
-        futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
-    }
-
-    /**
-     * Helper to send bucket versions received from local store.
-     *
-     * @param remote        remote gossiper to send versions to
-     * @param localVersions bucket versions received from local store
-     */
-    void sendGossipStatusTo(final ActorRef remote, final Map<Address, Long> localVersions) {
+        LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
 
-        GossipStatus status = new GossipStatus(selfAddress, localVersions);
-        remote.tell(status, getSelf());
-    }
-
-    void sendGossipStatusTo(final ActorSelection remote, final Map<Address, Long> localVersions) {
-
-        GossipStatus status = new GossipStatus(selfAddress, localVersions);
-        remote.tell(status, getSelf());
+        futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
     }
 
     ///
@@ -345,8 +338,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
                     Map<Address, Long> localVersions = reply.getVersions();
 
-                    sendGossipStatusTo(remote, localVersions);
-
+                    remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
                 }
                 return null;
             }
@@ -410,7 +402,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
                     }
 
                     if (!localIsOlder.isEmpty()) {
-                        sendGossipStatusTo(sender, localVersions);
+                        sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
                     }
 
                     if (!localIsNewer.isEmpty()) {
@@ -459,6 +451,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
     @VisibleForTesting
     void setClusterMembers(final Address... members) {
         clusterMembers.clear();
-        clusterMembers.addAll(Arrays.asList(members));
+        peers.clear();
+
+        for (Address addr : members) {
+            addPeer(addr);
+        }
     }
 }
index cd4f6d5..9fccb06 100644 (file)
@@ -16,6 +16,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
@@ -68,17 +69,17 @@ public class GossiperTest {
     @Test
     public void testReceiveGossipTick_WhenNoRemoteMemberShouldIgnore() {
         mockGossiper.setClusterMembers();
-        doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class));
+        doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(ActorSelection.class));
         mockGossiper.receiveGossipTick();
-        verify(mockGossiper, times(0)).getLocalStatusAndSendTo(any(Address.class));
+        verify(mockGossiper, times(0)).getLocalStatusAndSendTo(any(ActorSelection.class));
     }
 
     @Test
     public void testReceiveGossipTick_WhenRemoteMemberExistsShouldSendStatus() {
         mockGossiper.setClusterMembers(new Address("tcp", "member"));
-        doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class));
+        doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(ActorSelection.class));
         mockGossiper.receiveGossipTick();
-        verify(mockGossiper, times(1)).getLocalStatusAndSendTo(any(Address.class));
+        verify(mockGossiper, times(1)).getLocalStatusAndSendTo(any(ActorSelection.class));
     }
 
     @SuppressWarnings("unchecked")

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.