From: Robert Varga Date: Tue, 17 Jan 2017 22:58:01 +0000 (+0100) Subject: BUG-3128: cache ActorSelections X-Git-Tag: release/carbon~298 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=8426e7a67b1235e8ecc67b1a98a5bd096c88e729;ds=sidebyside BUG-3128: cache ActorSelections This is a performance optimization. Since the ActorSelection for a remote node is an invariant, keep a handy cache of these objects so we do not have to construct them on every GossipTick. Change-Id: I820c1d9be5c198a6cac7932b0de0e0776b35b0a5 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index db50041452..33b3f6e813 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -21,8 +21,9 @@ import akka.dispatch.Mapper; import akka.pattern.Patterns; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -70,6 +71,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ private final List
clusterMembers = new ArrayList<>(); + /** + * Cached ActorSelections for remote peers. + */ + private final Map peers = new HashMap<>(); + /** * ActorSystem's address for the current cluster node. */ @@ -176,10 +182,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering { return; } - clusterMembers.remove(member.address()); + removePeer(member.address()); LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + } - getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender()); + private void addPeer(final Address address) { + if (!clusterMembers.contains(address)) { + clusterMembers.add(address); + } + peers.computeIfAbsent(address, input -> getContext().system() + .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress())); + } + + private void removePeer(final Address address) { + clusterMembers.remove(address); + peers.remove(address); + getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender()); } /** @@ -193,10 +211,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { return; } - if (!clusterMembers.contains(member.address())) { - clusterMembers.add(member.address()); - } - + addPeer(member.address()); LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); } @@ -209,22 +224,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ @VisibleForTesting void receiveGossipTick() { - final Address remoteMemberToGossipTo; + final Address address; switch (clusterMembers.size()) { case 0: //no members to send gossip status to return; case 1: - remoteMemberToGossipTo = clusterMembers.get(0); + address = clusterMembers.get(0); break; default: final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); - remoteMemberToGossipTo = clusterMembers.get(randomIndex); + address = clusterMembers.get(randomIndex); break; } - LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo); - getLocalStatusAndSendTo(remoteMemberToGossipTo); + LOG.trace("Gossiping to [{}]", address); + getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address))); } /** @@ -242,8 +257,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ @VisibleForTesting void receiveGossipStatus(final GossipStatus status) { - //Don't accept messages from non-members - if (!clusterMembers.contains(status.from())) { + // Don't accept messages from non-members + if (!peers.containsKey(status.from())) { return; } @@ -299,37 +314,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param remoteActorSystemAddress remote gossiper to send to */ @VisibleForTesting - void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) { + void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) { //Get local status from bucket store and send to remote Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); - //Find gossiper on remote system - ActorSelection remoteRef = getContext().system().actorSelection( - remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress()); - - LOG.trace("Sending bucket versions to [{}]", remoteRef); - - futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher()); - } - - /** - * Helper to send bucket versions received from local store. - * - * @param remote remote gossiper to send versions to - * @param localVersions bucket versions received from local store - */ - void sendGossipStatusTo(final ActorRef remote, final Map localVersions) { + LOG.trace("Sending bucket versions to [{}]", remoteGossiper); - GossipStatus status = new GossipStatus(selfAddress, localVersions); - remote.tell(status, getSelf()); - } - - void sendGossipStatusTo(final ActorSelection remote, final Map localVersions) { - - GossipStatus status = new GossipStatus(selfAddress, localVersions); - remote.tell(status, getSelf()); + futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher()); } /// @@ -345,8 +338,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; Map localVersions = reply.getVersions(); - sendGossipStatusTo(remote, localVersions); - + remote.tell(new GossipStatus(selfAddress, localVersions), getSelf()); } return null; } @@ -410,7 +402,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { } if (!localIsOlder.isEmpty()) { - sendGossipStatusTo(sender, localVersions); + sender.tell(new GossipStatus(selfAddress, localVersions), getSelf()); } if (!localIsNewer.isEmpty()) { @@ -459,6 +451,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering { @VisibleForTesting void setClusterMembers(final Address... members) { clusterMembers.clear(); - clusterMembers.addAll(Arrays.asList(members)); + peers.clear(); + + for (Address addr : members) { + addPeer(addr); + } } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java index cd4f6d54c8..9fccb06943 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/gossip/GossiperTest.java @@ -16,6 +16,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; @@ -68,17 +69,17 @@ public class GossiperTest { @Test public void testReceiveGossipTick_WhenNoRemoteMemberShouldIgnore() { mockGossiper.setClusterMembers(); - doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class)); + doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(ActorSelection.class)); mockGossiper.receiveGossipTick(); - verify(mockGossiper, times(0)).getLocalStatusAndSendTo(any(Address.class)); + verify(mockGossiper, times(0)).getLocalStatusAndSendTo(any(ActorSelection.class)); } @Test public void testReceiveGossipTick_WhenRemoteMemberExistsShouldSendStatus() { mockGossiper.setClusterMembers(new Address("tcp", "member")); - doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class)); + doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(ActorSelection.class)); mockGossiper.receiveGossipTick(); - verify(mockGossiper, times(1)).getLocalStatusAndSendTo(any(Address.class)); + verify(mockGossiper, times(1)).getLocalStatusAndSendTo(any(ActorSelection.class)); } @SuppressWarnings("unchecked")