X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=33b3f6e813fb95cd1f4090f53255949baee716a7;hp=db5004145260ffb7c5bc88e85837332e2cd51bc3;hb=8426e7a67b1235e8ecc67b1a98a5bd096c88e729;hpb=2418a6052d7eba917d5972f0630cf746d22f690c diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index db50041452..33b3f6e813 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -21,8 +21,9 @@ import akka.dispatch.Mapper; import akka.pattern.Patterns; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -70,6 +71,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ private final List
clusterMembers = new ArrayList<>(); + /** + * Cached ActorSelections for remote peers. + */ + private final Map peers = new HashMap<>(); + /** * ActorSystem's address for the current cluster node. */ @@ -176,10 +182,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering { return; } - clusterMembers.remove(member.address()); + removePeer(member.address()); LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + } - getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender()); + private void addPeer(final Address address) { + if (!clusterMembers.contains(address)) { + clusterMembers.add(address); + } + peers.computeIfAbsent(address, input -> getContext().system() + .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress())); + } + + private void removePeer(final Address address) { + clusterMembers.remove(address); + peers.remove(address); + getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender()); } /** @@ -193,10 +211,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { return; } - if (!clusterMembers.contains(member.address())) { - clusterMembers.add(member.address()); - } - + addPeer(member.address()); LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); } @@ -209,22 +224,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ @VisibleForTesting void receiveGossipTick() { - final Address remoteMemberToGossipTo; + final Address address; switch (clusterMembers.size()) { case 0: //no members to send gossip status to return; case 1: - remoteMemberToGossipTo = clusterMembers.get(0); + address = clusterMembers.get(0); break; default: final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); - remoteMemberToGossipTo = clusterMembers.get(randomIndex); + address = clusterMembers.get(randomIndex); break; } - LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo); - getLocalStatusAndSendTo(remoteMemberToGossipTo); + LOG.trace("Gossiping to [{}]", address); + getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address))); } /** @@ -242,8 +257,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ @VisibleForTesting void receiveGossipStatus(final GossipStatus status) { - //Don't accept messages from non-members - if (!clusterMembers.contains(status.from())) { + // Don't accept messages from non-members + if (!peers.containsKey(status.from())) { return; } @@ -299,37 +314,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param remoteActorSystemAddress remote gossiper to send to */ @VisibleForTesting - void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) { + void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) { //Get local status from bucket store and send to remote Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); - //Find gossiper on remote system - ActorSelection remoteRef = getContext().system().actorSelection( - remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress()); - - LOG.trace("Sending bucket versions to [{}]", remoteRef); - - futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher()); - } - - /** - * Helper to send bucket versions received from local store. - * - * @param remote remote gossiper to send versions to - * @param localVersions bucket versions received from local store - */ - void sendGossipStatusTo(final ActorRef remote, final Map localVersions) { + LOG.trace("Sending bucket versions to [{}]", remoteGossiper); - GossipStatus status = new GossipStatus(selfAddress, localVersions); - remote.tell(status, getSelf()); - } - - void sendGossipStatusTo(final ActorSelection remote, final Map localVersions) { - - GossipStatus status = new GossipStatus(selfAddress, localVersions); - remote.tell(status, getSelf()); + futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher()); } /// @@ -345,8 +338,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; Map localVersions = reply.getVersions(); - sendGossipStatusTo(remote, localVersions); - + remote.tell(new GossipStatus(selfAddress, localVersions), getSelf()); } return null; } @@ -410,7 +402,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { } if (!localIsOlder.isEmpty()) { - sendGossipStatusTo(sender, localVersions); + sender.tell(new GossipStatus(selfAddress, localVersions), getSelf()); } if (!localIsNewer.isEmpty()) { @@ -459,6 +451,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering { @VisibleForTesting void setClusterMembers(final Address... members) { clusterMembers.clear(); - clusterMembers.addAll(Arrays.asList(members)); + peers.clear(); + + for (Address addr : members) { + addPeer(addr); + } } }