X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=db5004145260ffb7c5bc88e85837332e2cd51bc3;hp=8af1c83c558a0a6b4842b9cb7accfa0a6e68a79b;hb=2418a6052d7eba917d5972f0630cf746d22f690c;hpb=726ee824671781b5031c0108794c22bd0d96eaad diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 8af1c83c55..db50041452 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -12,13 +12,17 @@ import akka.actor.ActorRefProvider; import akka.actor.ActorSelection; import akka.actor.Address; import akka.actor.Cancellable; +import akka.actor.Props; import akka.cluster.Cluster; import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; import akka.cluster.Member; import akka.dispatch.Mapper; import akka.pattern.Patterns; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -31,77 +35,78 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; /** * Gossiper that syncs bucket store across nodes in the cluster. - *

+ * + *

* It keeps a local scheduler that periodically sends Gossip ticks to * itself to send bucket store's bucket versions to a randomly selected remote * gossiper. - *

+ * + *

* When bucket versions are received from a remote gossiper, it is compared * with bucket store's bucket versions. Which ever buckets are newer * locally, are sent to remote gossiper. If any bucket is older in bucket store, * a gossip status is sent to remote gossiper so that it can send the newer buckets. - *

+ * + *

* When a bucket is received from a remote gossiper, its sent to the bucket store * for update. - * */ - public class Gossiper extends AbstractUntypedActorWithMetering { + private final boolean autoStartGossipTicks; + private final RemoteRpcProviderConfig config; - private final Logger log = LoggerFactory.getLogger(getClass()); - - private Cluster cluster; + /** + * All known cluster members. + */ + private final List

clusterMembers = new ArrayList<>(); /** * ActorSystem's address for the current cluster node. */ private Address selfAddress; - /** - * All known cluster members - */ - private List
clusterMembers = new ArrayList<>(); + private Cluster cluster; private Cancellable gossipTask; - private Boolean autoStartGossipTicks = true; + Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) { + this.config = Preconditions.checkNotNull(config); + this.autoStartGossipTicks = autoStartGossipTicks.booleanValue(); + } - private RemoteRpcProviderConfig config; + Gossiper(final RemoteRpcProviderConfig config) { + this(config, Boolean.TRUE); + } - public Gossiper(){ - config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + public static Props props(final RemoteRpcProviderConfig config) { + return Props.create(Gossiper.class, config); } - /** - * Helpful for testing - * @param autoStartGossipTicks used for turning off gossip ticks during testing. - * Gossip tick can be manually sent. - */ - public Gossiper(Boolean autoStartGossipTicks){ - this.autoStartGossipTicks = autoStartGossipTicks; + static Props testProps(final RemoteRpcProviderConfig config) { + return Props.create(Gossiper.class, config, Boolean.FALSE); } @Override - public void preStart(){ + public void preStart() { ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); - if ( provider instanceof ClusterActorRefProvider ) { + if (provider instanceof ClusterActorRefProvider ) { cluster = Cluster.get(getContext().system()); cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), ClusterEvent.MemberEvent.class, + ClusterEvent.ReachableMember.class, ClusterEvent.UnreachableMember.class); } @@ -109,16 +114,16 @@ public class Gossiper extends AbstractUntypedActorWithMetering { gossipTask = getContext().system().scheduler().schedule( new FiniteDuration(1, TimeUnit.SECONDS), //initial delay config.getGossipTickInterval(), //interval - getSelf(), //target - new Messages.GossiperMessages.GossipTick(), //message - getContext().dispatcher(), //execution context - getSelf() //sender + getSelf(), //target + new Messages.GossiperMessages.GossipTick(), //message + getContext().dispatcher(), //execution context + getSelf() //sender ); } } @Override - public void postStop(){ + public void postStop() { if (cluster != null) { cluster.unsubscribe(getSelf()); } @@ -127,8 +132,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { } } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Override - protected void handleReceive(Object message) throws Exception { + protected void handleReceive(final Object message) throws Exception { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing if (message instanceof GossipTick) { @@ -142,12 +148,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering { // comparing the GossipStatus message with its local versions. receiveGossip((GossipEnvelope) message); } else if (message instanceof ClusterEvent.MemberUp) { - receiveMemberUp(((ClusterEvent.MemberUp) message).member()); + receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member()); + + } else if (message instanceof ClusterEvent.ReachableMember) { + receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member()); } else if (message instanceof ClusterEvent.MemberRemoved) { receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member()); - } else if ( message instanceof ClusterEvent.UnreachableMember){ + } else if (message instanceof ClusterEvent.UnreachableMember) { receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member()); } else { @@ -160,66 +169,68 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * * @param member who went down */ - void receiveMemberRemoveOrUnreachable(Member member) { + private void receiveMemberRemoveOrUnreachable(final Member member) { //if its self, then stop itself - if (selfAddress.equals(member.address())){ + if (selfAddress.equals(member.address())) { getContext().stop(getSelf()); return; } clusterMembers.remove(member.address()); - if(log.isDebugEnabled()) { - log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); - } + LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + + getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender()); } /** - * Add member to the local copy of member list if it doesnt already - * @param member + * Add member to the local copy of member list if it doesn't already. + * + * @param member the member to add */ - void receiveMemberUp(Member member) { - + private void receiveMemberUpOrReachable(final Member member) { + //ignore up notification for self if (selfAddress.equals(member.address())) { - return; //ignore up notification for self + return; } if (!clusterMembers.contains(member.address())) { clusterMembers.add(member.address()); } - if(log.isDebugEnabled()) { - log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); - } + + LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); } /** - * Sends Gossip status to other members in the cluster.
- * 1. If there are no member, ignore the tick.
- * 2. If there's only 1 member, send gossip status (bucket versions) to it.
+ * Sends Gossip status to other members in the cluster. + *
+ * 1. If there are no member, ignore the tick.
+ * 2. If there's only 1 member, send gossip status (bucket versions) to it.
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it. */ - void receiveGossipTick(){ - if (clusterMembers.size() == 0) { - return; //no members to send gossip status to + @VisibleForTesting + void receiveGossipTick() { + final Address remoteMemberToGossipTo; + switch (clusterMembers.size()) { + case 0: + //no members to send gossip status to + return; + case 1: + remoteMemberToGossipTo = clusterMembers.get(0); + break; + default: + final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); + remoteMemberToGossipTo = clusterMembers.get(randomIndex); + break; } - Address remoteMemberToGossipTo; - - if (clusterMembers.size() == 1) { - remoteMemberToGossipTo = clusterMembers.get(0); - } else { - Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); - remoteMemberToGossipTo = clusterMembers.get(randomIndex); - } - if(log.isDebugEnabled()) { - log.debug("Gossiping to [{}]", remoteMemberToGossipTo); - } + LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo); getLocalStatusAndSendTo(remoteMemberToGossipTo); } /** * Process gossip status received from a remote gossiper. Remote versions are compared with - * the local copy.

- * + * the local copy. + *

* For each bucket *