X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=2c47c4e2a9242a7d9c56a8dc30ade963b83450b7;hp=9230591d46b4d80e244d3d81c653dfebc559a4cf;hb=00634259fd13ebc57f16ad63340e6472a2b6c6f2;hpb=9ddc65e1ddae50f691566cd9382707679436c055 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 9230591d46..2c47c4e2a9 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -12,6 +12,7 @@ import akka.actor.ActorRefProvider; import akka.actor.ActorSelection; import akka.actor.Address; import akka.actor.Cancellable; +import akka.actor.Props; import akka.cluster.Cluster; import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; @@ -20,7 +21,9 @@ import akka.dispatch.Mapper; import akka.pattern.Patterns; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -33,67 +36,70 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; /** * Gossiper that syncs bucket store across nodes in the cluster. - *

+ * + *

* It keeps a local scheduler that periodically sends Gossip ticks to * itself to send bucket store's bucket versions to a randomly selected remote * gossiper. - *

+ * + *

* When bucket versions are received from a remote gossiper, it is compared * with bucket store's bucket versions. Which ever buckets are newer * locally, are sent to remote gossiper. If any bucket is older in bucket store, * a gossip status is sent to remote gossiper so that it can send the newer buckets. - *

+ * + *

* When a bucket is received from a remote gossiper, its sent to the bucket store * for update. - * */ public class Gossiper extends AbstractUntypedActorWithMetering { + private final boolean autoStartGossipTicks; + private final RemoteRpcProviderConfig config; - private final Logger log = LoggerFactory.getLogger(getClass()); + /** + * All known cluster members. + */ + private final List

clusterMembers = new ArrayList<>(); - private Cluster cluster; + /** + * Cached ActorSelections for remote peers. + */ + private final Map peers = new HashMap<>(); /** * ActorSystem's address for the current cluster node. */ private Address selfAddress; - /** - * All known cluster members. - */ - private List
clusterMembers = new ArrayList<>(); + private Cluster cluster; private Cancellable gossipTask; - private Boolean autoStartGossipTicks = true; + Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) { + this.config = Preconditions.checkNotNull(config); + this.autoStartGossipTicks = autoStartGossipTicks.booleanValue(); + } - private final RemoteRpcProviderConfig config; + Gossiper(final RemoteRpcProviderConfig config) { + this(config, Boolean.TRUE); + } - public Gossiper(RemoteRpcProviderConfig config) { - this.config = Preconditions.checkNotNull(config); + public static Props props(final RemoteRpcProviderConfig config) { + return Props.create(Gossiper.class, config); } - /** - * Constructor for testing. - * - * @param autoStartGossipTicks used for turning off gossip ticks during testing. - * Gossip tick can be manually sent. - */ - @VisibleForTesting - public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) { - this(config); - this.autoStartGossipTicks = autoStartGossipTicks; + static Props testProps(final RemoteRpcProviderConfig config) { + return Props.create(Gossiper.class, config, Boolean.FALSE); } @Override @@ -101,11 +107,12 @@ public class Gossiper extends AbstractUntypedActorWithMetering { ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); - if ( provider instanceof ClusterActorRefProvider ) { + if (provider instanceof ClusterActorRefProvider ) { cluster = Cluster.get(getContext().system()); cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), ClusterEvent.MemberEvent.class, + ClusterEvent.ReachableMember.class, ClusterEvent.UnreachableMember.class); } @@ -113,10 +120,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering { gossipTask = getContext().system().scheduler().schedule( new FiniteDuration(1, TimeUnit.SECONDS), //initial delay config.getGossipTickInterval(), //interval - getSelf(), //target - new Messages.GossiperMessages.GossipTick(), //message - getContext().dispatcher(), //execution context - getSelf() //sender + getSelf(), //target + new Messages.GossiperMessages.GossipTick(), //message + getContext().dispatcher(), //execution context + getSelf() //sender ); } } @@ -133,7 +140,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { @SuppressWarnings({ "rawtypes", "unchecked" }) @Override - protected void handleReceive(Object message) throws Exception { + protected void handleReceive(final Object message) throws Exception { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing if (message instanceof GossipTick) { @@ -147,12 +154,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering { // comparing the GossipStatus message with its local versions. receiveGossip((GossipEnvelope) message); } else if (message instanceof ClusterEvent.MemberUp) { - receiveMemberUp(((ClusterEvent.MemberUp) message).member()); + receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member()); + + } else if (message instanceof ClusterEvent.ReachableMember) { + receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member()); } else if (message instanceof ClusterEvent.MemberRemoved) { receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member()); - } else if ( message instanceof ClusterEvent.UnreachableMember) { + } else if (message instanceof ClusterEvent.UnreachableMember) { receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member()); } else { @@ -165,15 +175,29 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * * @param member who went down */ - void receiveMemberRemoveOrUnreachable(Member member) { + private void receiveMemberRemoveOrUnreachable(final Member member) { //if its self, then stop itself if (selfAddress.equals(member.address())) { getContext().stop(getSelf()); return; } - clusterMembers.remove(member.address()); - log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + removePeer(member.address()); + LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + } + + private void addPeer(final Address address) { + if (!clusterMembers.contains(address)) { + clusterMembers.add(address); + } + peers.computeIfAbsent(address, input -> getContext().system() + .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress())); + } + + private void removePeer(final Address address) { + clusterMembers.remove(address); + peers.remove(address); + getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender()); } /** @@ -181,17 +205,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * * @param member the member to add */ - void receiveMemberUp(Member member) { - + private void receiveMemberUpOrReachable(final Member member) { + //ignore up notification for self if (selfAddress.equals(member.address())) { - return; //ignore up notification for self - } - - if (!clusterMembers.contains(member.address())) { - clusterMembers.add(member.address()); + return; } - log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); + addPeer(member.address()); + LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); } /** @@ -201,22 +222,24 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * 2. If there's only 1 member, send gossip status (bucket versions) to it.
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it. */ + @VisibleForTesting void receiveGossipTick() { - if (clusterMembers.size() == 0) { - return; //no members to send gossip status to - } - - Address remoteMemberToGossipTo; - - if (clusterMembers.size() == 1) { - remoteMemberToGossipTo = clusterMembers.get(0); - } else { - Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); - remoteMemberToGossipTo = clusterMembers.get(randomIndex); + final Address address; + switch (clusterMembers.size()) { + case 0: + //no members to send gossip status to + return; + case 1: + address = clusterMembers.get(0); + break; + default: + final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); + address = clusterMembers.get(randomIndex); + break; } - log.trace("Gossiping to [{}]", remoteMemberToGossipTo); - getLocalStatusAndSendTo(remoteMemberToGossipTo); + LOG.trace("Gossiping to [{}]", address); + getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address))); } /** @@ -232,9 +255,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * * @param status bucket versions from a remote member */ - void receiveGossipStatus(GossipStatus status) { - //Don't accept messages from non-members - if (!clusterMembers.contains(status.from())) { + @VisibleForTesting + void receiveGossipStatus(final GossipStatus status) { + // Don't accept messages from non-members + if (!peers.containsKey(status.from())) { return; } @@ -243,7 +267,6 @@ public class Gossiper extends AbstractUntypedActorWithMetering { Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher()); - } /** @@ -251,18 +274,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * * @param envelope contains buckets from a remote gossiper */ - > void receiveGossip(GossipEnvelope envelope) { + @VisibleForTesting + > void receiveGossip(final GossipEnvelope envelope) { //TODO: Add more validations if (!selfAddress.equals(envelope.to())) { - if (log.isTraceEnabled()) { - log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), - envelope.to()); - } + LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); return; } updateRemoteBuckets(envelope.getBuckets()); - } /** @@ -270,9 +290,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * * @param buckets map of Buckets to update */ - > void updateRemoteBuckets(Map> buckets) { - UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets); - getContext().parent().tell(updateRemoteBuckets, getSelf()); + @VisibleForTesting + > void updateRemoteBuckets(final Map> buckets) { + getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf()); } /** @@ -293,38 +313,16 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * * @param remoteActorSystemAddress remote gossiper to send to */ - void getLocalStatusAndSendTo(Address remoteActorSystemAddress) { + @VisibleForTesting + void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) { //Get local status from bucket store and send to remote Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); - //Find gossiper on remote system - ActorSelection remoteRef = getContext().system().actorSelection( - remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress()); - - log.trace("Sending bucket versions to [{}]", remoteRef); - - futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher()); + LOG.trace("Sending bucket versions to [{}]", remoteGossiper); - } - - /** - * Helper to send bucket versions received from local store. - * - * @param remote remote gossiper to send versions to - * @param localVersions bucket versions received from local store - */ - void sendGossipStatusTo(ActorRef remote, Map localVersions) { - - GossipStatus status = new GossipStatus(selfAddress, localVersions); - remote.tell(status, getSelf()); - } - - void sendGossipStatusTo(ActorSelection remote, Map localVersions) { - - GossipStatus status = new GossipStatus(selfAddress, localVersions); - remote.tell(status, getSelf()); + futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher()); } /// @@ -335,13 +333,12 @@ public class Gossiper extends AbstractUntypedActorWithMetering { return new Mapper() { @Override - public Void apply(Object replyMessage) { + public Void apply(final Object replyMessage) { if (replyMessage instanceof GetBucketVersionsReply) { GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; Map localVersions = reply.getVersions(); - sendGossipStatusTo(remote, localVersions); - + remote.tell(new GossipStatus(selfAddress, localVersions), getSelf()); } return null; } @@ -372,7 +369,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { return new Mapper() { @Override - public Void apply(Object replyMessage) { + public Void apply(final Object replyMessage) { if (replyMessage instanceof GetBucketVersionsReply) { GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; Map localVersions = reply.getVersions(); @@ -388,26 +385,30 @@ public class Gossiper extends AbstractUntypedActorWithMetering { localIsNewer.removeAll(remoteVersions.keySet()); - for (Address address : remoteVersions.keySet()) { - - if (localVersions.get(address) == null || remoteVersions.get(address) == null) { - continue; //this condition is taken care of by above diffs + for (Map.Entry entry : remoteVersions.entrySet()) { + Address address = entry.getKey(); + Long remoteVersion = entry.getValue(); + Long localVersion = localVersions.get(address); + if (localVersion == null || remoteVersion == null) { + //this condition is taken care of by above diffs + continue; } - if (localVersions.get(address) < remoteVersions.get(address)) { + + if (localVersion < remoteVersion) { localIsOlder.add(address); - } else if (localVersions.get(address) > remoteVersions.get(address)) { + } else if (localVersion > remoteVersion) { localIsNewer.add(address); } } if (!localIsOlder.isEmpty()) { - sendGossipStatusTo(sender, localVersions ); + sender.tell(new GossipStatus(selfAddress, localVersions), getSelf()); } if (!localIsNewer.isEmpty()) { - sendGossipTo(sender, localIsNewer);//send newer buckets to remote + //send newer buckets to remote + sendGossipTo(sender, localIsNewer); } - } return null; } @@ -431,10 +432,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering { return new Mapper() { @SuppressWarnings({ "rawtypes", "unchecked" }) @Override - public Void apply(Object msg) { + public Void apply(final Object msg) { if (msg instanceof GetBucketsByMembersReply) { Map> buckets = ((GetBucketsByMembersReply) msg).getBuckets(); - log.trace("Buckets to send from {}: {}", selfAddress, buckets); + LOG.trace("Buckets to send from {}: {}", selfAddress, buckets); GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets); sender.tell(envelope, getSelf()); } @@ -446,19 +447,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering { /// ///Getter Setters /// - List
getClusterMembers() { - return clusterMembers; - } - - void setClusterMembers(List
clusterMembers) { - this.clusterMembers = clusterMembers; - } - Address getSelfAddress() { - return selfAddress; - } + @VisibleForTesting + void setClusterMembers(final Address... members) { + clusterMembers.clear(); + peers.clear(); - void setSelfAddress(Address selfAddress) { - this.selfAddress = selfAddress; + for (Address addr : members) { + addPeer(addr); + } } }