X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=33b3f6e813fb95cd1f4090f53255949baee716a7;hp=db5004145260ffb7c5bc88e85837332e2cd51bc3;hb=8426e7a67b1235e8ecc67b1a98a5bd096c88e729;hpb=2418a6052d7eba917d5972f0630cf746d22f690c diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index db50041452..33b3f6e813 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -21,8 +21,9 @@ import akka.dispatch.Mapper; import akka.pattern.Patterns; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -70,6 +71,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ private final List
clusterMembers = new ArrayList<>(); + /** + * Cached ActorSelections for remote peers. + */ + private final Map peers = new HashMap<>(); + /** * ActorSystem's address for the current cluster node. */ @@ -176,10 +182,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering { return; } - clusterMembers.remove(member.address()); + removePeer(member.address()); LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + } - getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender()); + private void addPeer(final Address address) { + if (!clusterMembers.contains(address)) { + clusterMembers.add(address); + } + peers.computeIfAbsent(address, input -> getContext().system() + .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress())); + } + + private void removePeer(final Address address) { + clusterMembers.remove(address); + peers.remove(address); + getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender()); } /** @@ -193,10 +211,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { return; } - if (!clusterMembers.contains(member.address())) { - clusterMembers.add(member.address()); - } - + addPeer(member.address()); LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); } @@ -209,22 +224,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ @VisibleForTesting void receiveGossipTick() { - final Address remoteMemberToGossipTo; + final Address address; switch (clusterMembers.size()) { case 0: //no members to send gossip status to return; case 1: - remoteMemberToGossipTo = clusterMembers.get(0); + address = clusterMembers.get(0); break; default: final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); - remoteMemberToGossipTo = clusterMembers.get(randomIndex); + address = clusterMembers.get(randomIndex); break; } - LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo); - getLocalStatusAndSendTo(remoteMemberToGossipTo); + LOG.trace("Gossiping to [{}]", address); + getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address))); } /** @@ -242,8 +257,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ @VisibleForTesting void receiveGossipStatus(final GossipStatus status) { - //Don't accept messages from non-members - if (!clusterMembers.contains(status.from())) { + // Don't accept messages from non-members + if (!peers.containsKey(status.from())) { return; } @@ -299,37 +314,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param remoteActorSystemAddress remote gossiper to send to */ @VisibleForTesting - void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) { + void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) { //Get local status from bucket store and send to remote Future