X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=27b68763daa9900e59bbbe4b8ac4c653aa60d17f;hb=1b164355ea2d868bcc92052ce78160f5244231f2;hp=f6ce5e55f3ee63602fc92529e92d6e93d0ff9bb3;hpb=351a78c9840c5b98a478b91ffd50befad998eb0e;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index f6ce5e55f3..27b68763da 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -12,35 +12,37 @@ import akka.actor.ActorRefProvider; import akka.actor.ActorSelection; import akka.actor.Address; import akka.actor.Cancellable; -import akka.actor.UntypedActor; +import akka.actor.Props; import akka.cluster.Cluster; import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; import akka.cluster.Member; import akka.dispatch.Mapper; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.pattern.Patterns; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * Gossiper that syncs bucket store across nodes in the cluster. @@ -59,35 +61,44 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go * */ -public class Gossiper extends UntypedActor { +public class Gossiper extends AbstractUntypedActorWithMetering { + private final boolean autoStartGossipTicks; + private final RemoteRpcProviderConfig config; - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + /** + * All known cluster members. + */ + private final List
clusterMembers = new ArrayList<>(); - private Cluster cluster; + /** + * Cached ActorSelections for remote peers. + */ + private final Map peers = new HashMap<>(); /** * ActorSystem's address for the current cluster node. */ private Address selfAddress; - /** - * All known cluster members - */ - private List clusterMembers = new ArrayList<>(); + private Cluster cluster; private Cancellable gossipTask; - private Boolean autoStartGossipTicks = true; + Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) { + this.config = Preconditions.checkNotNull(config); + this.autoStartGossipTicks = autoStartGossipTicks.booleanValue(); + } + + Gossiper(final RemoteRpcProviderConfig config) { + this(config, Boolean.TRUE); + } - public Gossiper(){} + public static Props props(final RemoteRpcProviderConfig config) { + return Props.create(Gossiper.class, config); + } - /** - * Helpful for testing - * @param autoStartGossipTicks used for turning off gossip ticks during testing. - * Gossip tick can be manually sent. - */ - public Gossiper(Boolean autoStartGossipTicks){ - this.autoStartGossipTicks = autoStartGossipTicks; + static Props testProps(final RemoteRpcProviderConfig config) { + return Props.create(Gossiper.class, config, Boolean.FALSE); } @Override @@ -95,65 +106,66 @@ public class Gossiper extends UntypedActor { ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); - if ( provider instanceof ClusterActorRefProvider ) { + if (provider instanceof ClusterActorRefProvider ) { cluster = Cluster.get(getContext().system()); cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), ClusterEvent.MemberEvent.class, + ClusterEvent.ReachableMember.class, ClusterEvent.UnreachableMember.class); } if (autoStartGossipTicks) { gossipTask = getContext().system().scheduler().schedule( new FiniteDuration(1, TimeUnit.SECONDS), //initial delay - ActorUtil.GOSSIP_TICK_INTERVAL, //interval - getSelf(), //target - new Messages.GossiperMessages.GossipTick(), //message - getContext().dispatcher(), //execution context - getSelf() //sender + config.getGossipTickInterval(), //interval + getSelf(), //target + new Messages.GossiperMessages.GossipTick(), //message + getContext().dispatcher(), //execution context + getSelf() //sender ); } } @Override public void postStop(){ - if (cluster != null) + if (cluster != null) { cluster.unsubscribe(getSelf()); - if (gossipTask != null) + } + if (gossipTask != null) { gossipTask.cancel(); + } } @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: node[{}], message[{}]", selfAddress, message); - + protected void handleReceive(final Object message) throws Exception { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing - if (message instanceof GossipTick) + if (message instanceof GossipTick) { receiveGossipTick(); - - //Message from remote gossiper with its bucket versions - else if (message instanceof GossipStatus) + } else if (message instanceof GossipStatus) { + // Message from remote gossiper with its bucket versions receiveGossipStatus((GossipStatus) message); - - //Message from remote gossiper with buckets. This is usually in response to GossipStatus message - //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus - //message with its local versions - else if (message instanceof GossipEnvelope) + } else if (message instanceof GossipEnvelope) { + // Message from remote gossiper with buckets. This is usually in response to GossipStatus + // message. The contained buckets are newer as determined by the remote gossiper by + // comparing the GossipStatus message with its local versions. receiveGossip((GossipEnvelope) message); + } else if (message instanceof ClusterEvent.MemberUp) { + receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member()); - else if (message instanceof ClusterEvent.MemberUp) { - receiveMemberUp(((ClusterEvent.MemberUp) message).member()); + } else if (message instanceof ClusterEvent.ReachableMember) { + receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member()); } else if (message instanceof ClusterEvent.MemberRemoved) { receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member()); - } else if ( message instanceof ClusterEvent.UnreachableMember){ + } else if (message instanceof ClusterEvent.UnreachableMember) { receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member()); - } else + } else { unhandled(message); + } } /** @@ -161,30 +173,43 @@ public class Gossiper extends UntypedActor { * * @param member who went down */ - void receiveMemberRemoveOrUnreachable(Member member) { + private void receiveMemberRemoveOrUnreachable(final Member member) { //if its self, then stop itself if (selfAddress.equals(member.address())){ getContext().stop(getSelf()); return; } - clusterMembers.remove(member.address()); - log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + removePeer(member.address()); + LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + } + + private void addPeer(final Address address) { + if (!clusterMembers.contains(address)) { + clusterMembers.add(address); + } + peers.computeIfAbsent(address, input -> getContext().system() + .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress())); + } + + private void removePeer(final Address address) { + clusterMembers.remove(address); + peers.remove(address); + getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender()); } /** * Add member to the local copy of member list if it doesnt already * @param member */ - void receiveMemberUp(Member member) { - - if (selfAddress.equals(member.address())) - return; //ignore up notification for self - - if (!clusterMembers.contains(member.address())) - clusterMembers.add(member.address()); + private void receiveMemberUpOrReachable(final Member member) { + //ignore up notification for self + if (selfAddress.equals(member.address())) { + return; + } - log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); + addPeer(member.address()); + LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); } /** @@ -193,20 +218,24 @@ public class Gossiper extends UntypedActor { * 2. If there's only 1 member, send gossip status (bucket versions) to it.