X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=e75bef24c7aa22e7758b430a6ca9aeba31f8f381;hp=a8bc25c40ba14b2ecf8c45926f7871f189dbdafa;hb=927bce5688e4b9d33d3e5e9b769d8a0dba5ccdd4;hpb=30faeb35260541c273a81b8f126b40da94daa825 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index a8bc25c40b..e75bef24c7 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -12,147 +12,162 @@ import akka.actor.ActorRefProvider; import akka.actor.ActorSelection; import akka.actor.Address; import akka.actor.Cancellable; -import akka.actor.UntypedActor; +import akka.actor.Props; import akka.cluster.Cluster; import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; import akka.cluster.Member; -import akka.dispatch.Mapper; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import akka.pattern.Patterns; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.collect.Maps; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; +import scala.concurrent.duration.FiniteDuration; /** * Gossiper that syncs bucket store across nodes in the cluster. - *

+ * + *

* It keeps a local scheduler that periodically sends Gossip ticks to * itself to send bucket store's bucket versions to a randomly selected remote * gossiper. - *

+ * + *

* When bucket versions are received from a remote gossiper, it is compared * with bucket store's bucket versions. Which ever buckets are newer * locally, are sent to remote gossiper. If any bucket is older in bucket store, * a gossip status is sent to remote gossiper so that it can send the newer buckets. - *

+ * + *

* When a bucket is received from a remote gossiper, its sent to the bucket store * for update. - * */ +public class Gossiper extends AbstractUntypedActorWithMetering { + private static final Object GOSSIP_TICK = new Object() { + @Override + public String toString() { + return "gossip tick"; + } + }; -public class Gossiper extends UntypedActor { + private final boolean autoStartGossipTicks; + private final RemoteOpsProviderConfig config; - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + /** + * All known cluster members. + */ + private final List

clusterMembers = new ArrayList<>(); - private Cluster cluster; + /** + * Cached ActorSelections for remote peers. + */ + private final Map peers = new HashMap<>(); /** * ActorSystem's address for the current cluster node. */ private Address selfAddress; - /** - * All known cluster members - */ - private List
clusterMembers = new ArrayList<>(); + private Cluster cluster; private Cancellable gossipTask; - private Boolean autoStartGossipTicks = true; + private BucketStoreAccess bucketStore; - public Gossiper(){} + Gossiper(final RemoteOpsProviderConfig config, final Boolean autoStartGossipTicks) { + this.config = Preconditions.checkNotNull(config); + this.autoStartGossipTicks = autoStartGossipTicks.booleanValue(); + } - /** - * Helpful for testing - * @param autoStartGossipTicks used for turning off gossip ticks during testing. - * Gossip tick can be manually sent. - */ - public Gossiper(Boolean autoStartGossipTicks){ - this.autoStartGossipTicks = autoStartGossipTicks; + Gossiper(final RemoteOpsProviderConfig config) { + this(config, Boolean.TRUE); + } + + public static Props props(final RemoteOpsProviderConfig config) { + return Props.create(Gossiper.class, config); + } + + static Props testProps(final RemoteOpsProviderConfig config) { + return Props.create(Gossiper.class, config, Boolean.FALSE); } @Override - public void preStart(){ + public void preStart() { ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); - if ( provider instanceof ClusterActorRefProvider ) { + bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration()); + + if (provider instanceof ClusterActorRefProvider) { cluster = Cluster.get(getContext().system()); cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), ClusterEvent.MemberEvent.class, + ClusterEvent.ReachableMember.class, ClusterEvent.UnreachableMember.class); } if (autoStartGossipTicks) { gossipTask = getContext().system().scheduler().schedule( new FiniteDuration(1, TimeUnit.SECONDS), //initial delay - new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval - getSelf(), //target - new Messages.GossiperMessages.GossipTick(), //message - getContext().dispatcher(), //execution context - getSelf() //sender + config.getGossipTickInterval(), //interval + getSelf(), //target + GOSSIP_TICK, //message + getContext().dispatcher(), //execution context + getSelf() //sender ); } } @Override - public void postStop(){ - if (cluster != null) + public void postStop() { + if (cluster != null) { cluster.unsubscribe(getSelf()); - if (gossipTask != null) + } + if (gossipTask != null) { gossipTask.cancel(); + } } @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: node[{}], message[{}]", selfAddress, message); - + protected void handleReceive(final Object message) { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing - if (message instanceof GossipTick) + if (GOSSIP_TICK.equals(message)) { receiveGossipTick(); - - //Message from remote gossiper with its bucket versions - else if (message instanceof GossipStatus) + } else if (message instanceof GossipStatus) { + // Message from remote gossiper with its bucket versions receiveGossipStatus((GossipStatus) message); - - //Message from remote gossiper with buckets. This is usually in response to GossipStatus message - //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus - //message with its local versions - else if (message instanceof GossipEnvelope) + } else if (message instanceof GossipEnvelope) { + // Message from remote gossiper with buckets. This is usually in response to GossipStatus + // message. The contained buckets are newer as determined by the remote gossiper by + // comparing the GossipStatus message with its local versions. receiveGossip((GossipEnvelope) message); + } else if (message instanceof ClusterEvent.MemberUp) { + receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member()); - else if (message instanceof ClusterEvent.MemberUp) { - receiveMemberUp(((ClusterEvent.MemberUp) message).member()); + } else if (message instanceof ClusterEvent.ReachableMember) { + receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member()); } else if (message instanceof ClusterEvent.MemberRemoved) { receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member()); - } else if ( message instanceof ClusterEvent.UnreachableMember){ + } else if (message instanceof ClusterEvent.UnreachableMember) { receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member()); - } else + } else { unhandled(message); + } } /** @@ -160,58 +175,81 @@ public class Gossiper extends UntypedActor { * * @param member who went down */ - void receiveMemberRemoveOrUnreachable(Member member) { + private void receiveMemberRemoveOrUnreachable(final Member member) { + LOG.debug("Received memberDown or Unreachable: {}", member); + //if its self, then stop itself - if (selfAddress.equals(member.address())){ + if (selfAddress.equals(member.address())) { getContext().stop(getSelf()); return; } - clusterMembers.remove(member.address()); - log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + removePeer(member.address()); + LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + } + + private void addPeer(final Address address) { + if (!clusterMembers.contains(address)) { + clusterMembers.add(address); + } + peers.computeIfAbsent(address, input -> getContext().system() + .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress())); + } + + private void removePeer(final Address address) { + clusterMembers.remove(address); + peers.remove(address); + bucketStore.removeRemoteBucket(address); } /** - * Add member to the local copy of member list if it doesnt already - * @param member + * Add member to the local copy of member list if it doesn't already. + * + * @param member the member to add */ - void receiveMemberUp(Member member) { - - if (selfAddress.equals(member.address())) - return; //ignore up notification for self + private void receiveMemberUpOrReachable(final Member member) { + LOG.debug("Received memberUp or reachable: {}", member); - if (!clusterMembers.contains(member.address())) - clusterMembers.add(member.address()); + //ignore up notification for self + if (selfAddress.equals(member.address())) { + return; + } - log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); + addPeer(member.address()); + LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); } /** - * Sends Gossip status to other members in the cluster.
- * 1. If there are no member, ignore the tick.
- * 2. If there's only 1 member, send gossip status (bucket versions) to it.
+ * Sends Gossip status to other members in the cluster. + *
+ * 1. If there are no member, ignore the tick.
+ * 2. If there's only 1 member, send gossip status (bucket versions) to it.
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it. */ - void receiveGossipTick(){ - if (clusterMembers.size() == 0) return; //no members to send gossip status to - - Address remoteMemberToGossipTo = null; - - if (clusterMembers.size() == 1) - remoteMemberToGossipTo = clusterMembers.get(0); - else { - Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); - remoteMemberToGossipTo = clusterMembers.get(randomIndex); + @VisibleForTesting + void receiveGossipTick() { + final Address address; + switch (clusterMembers.size()) { + case 0: + //no members to send gossip status to + return; + case 1: + address = clusterMembers.get(0); + break; + default: + final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); + address = clusterMembers.get(randomIndex); + break; } - log.debug("Gossiping to [{}]", remoteMemberToGossipTo); - getLocalStatusAndSendTo(remoteMemberToGossipTo); + LOG.trace("Gossiping to [{}]", address); + getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address))); } /** * Process gossip status received from a remote gossiper. Remote versions are compared with - * the local copy.

- * + * the local copy. + *

* For each bucket *

    *
  • If local copy is newer, the newer buckets are sent in GossipEnvelope to remote
  • @@ -221,220 +259,113 @@ public class Gossiper extends UntypedActor { * * @param status bucket versions from a remote member */ - void receiveGossipStatus(GossipStatus status){ - //Don't accept messages from non-members - if (!clusterMembers.contains(status.from())) - return; - - final ActorRef sender = getSender(); - Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000); - futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher()); - - } - - /** - * Sends the received buckets in the envelope to the parent Bucket store. - * - * @param envelope contains buckets from a remote gossiper - */ - void receiveGossip(GossipEnvelope envelope){ - //TODO: Add more validations - if (!selfAddress.equals(envelope.to())) { - log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); - return; + @VisibleForTesting + void receiveGossipStatus(final GossipStatus status) { + // Don't accept messages from non-members + if (peers.containsKey(status.from())) { + // FIXME: sender should be part of GossipStatus + final ActorRef sender = getSender(); + bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions)); } - - updateRemoteBuckets(envelope.getBuckets()); - } - /** - * Helper to send received buckets to bucket store - * - * @param buckets - */ - void updateRemoteBuckets(Map buckets) { + private void processRemoteStatus(final ActorRef remote, final GossipStatus status, + final Map localVersions) { + final Map remoteVersions = status.versions(); - UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets); - getContext().parent().tell(updateRemoteBuckets, getSelf()); - } + //diff between remote list and local + final Set
    localIsOlder = new HashSet<>(remoteVersions.keySet()); + localIsOlder.removeAll(localVersions.keySet()); - /** - * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper - * - * @param remote remote node to send Buckets to - * @param addresses node addresses whose buckets needs to be sent - */ - void sendGossipTo(final ActorRef remote, final Set
    addresses){ + //diff between local list and remote + final Set
    localIsNewer = new HashSet<>(localVersions.keySet()); + localIsNewer.removeAll(remoteVersions.keySet()); - Future futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000); - futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher()); - } - /** - * Gets bucket versions from bucket store and sends to the supplied address - * - * @param remoteActorSystemAddress remote gossiper to send to - */ - void getLocalStatusAndSendTo(Address remoteActorSystemAddress){ - - //Get local status from bucket store and send to remote - Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000); - ActorSelection remoteRef = getContext().system().actorSelection( - remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress()); + for (Entry entry : remoteVersions.entrySet()) { + Address address = entry.getKey(); + Long remoteVersion = entry.getValue(); + Long localVersion = localVersions.get(address); + if (localVersion == null || remoteVersion == null) { + //this condition is taken care of by above diffs + continue; + } - log.debug("Sending bucket versions to [{}]", remoteRef); + if (localVersion < remoteVersion) { + localIsOlder.add(address); + } else if (localVersion > remoteVersion) { + localIsNewer.add(address); + } + } - futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher()); + if (!localIsOlder.isEmpty()) { + remote.tell(new GossipStatus(selfAddress, localVersions), getSelf()); + } + if (!localIsNewer.isEmpty()) { + //send newer buckets to remote + bucketStore.getBucketsByMembers(localIsNewer, buckets -> { + LOG.trace("Buckets to send from {}: {}", selfAddress, buckets); + remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf()); + }); + } } /** - * Helper to send bucket versions received from local store - * @param remote remote gossiper to send versions to - * @param localVersions bucket versions received from local store + * Sends the received buckets in the envelope to the parent Bucket store. + * + * @param envelope contains buckets from a remote gossiper */ - void sendGossipStatusTo(ActorRef remote, Map localVersions){ - - GossipStatus status = new GossipStatus(selfAddress, localVersions); - remote.tell(status, getSelf()); - } - - void sendGossipStatusTo(ActorSelection remote, Map localVersions){ - - GossipStatus status = new GossipStatus(selfAddress, localVersions); - remote.tell(status, getSelf()); - } - - /// - /// Private factories to create mappers - /// - - private Mapper getMapperToSendLocalStatus(final ActorSelection remote){ - - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetBucketVersionsReply) { - GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; - Map localVersions = reply.getVersions(); - - sendGossipStatusTo(remote, localVersions); + @VisibleForTesting + void receiveGossip(final GossipEnvelope envelope) { + //TODO: Add more validations + if (!selfAddress.equals(envelope.to())) { + LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); + return; + } - } - return null; - } - }; + updateRemoteBuckets(envelope.buckets()); } /** - * Process bucket versions received from - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}. - * Then this method compares remote bucket versions with local bucket versions. - *
      - *
    • The buckets that are newer locally, send - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} - * to remote - *
    • The buckets that are older locally, send - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} - * to remote so that remote sends GossipEnvelop. - *
    - * - * @param sender the remote member - * @param status bucket versions from a remote member - * @return a {@link akka.dispatch.Mapper} that gets evaluated in future + * Helper to send received buckets to bucket store. * + * @param buckets map of Buckets to update */ - private Mapper getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){ - - final Map remoteVersions = status.getVersions(); - - return new Mapper() { - @Override - public Void apply(Object replyMessage) { - if (replyMessage instanceof GetBucketVersionsReply) { - GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; - Map localVersions = reply.getVersions(); - - //diff between remote list and local - Set
    localIsOlder = new HashSet<>(); - localIsOlder.addAll(remoteVersions.keySet()); - localIsOlder.removeAll(localVersions.keySet()); - - //diff between local list and remote - Set
    localIsNewer = new HashSet<>(); - localIsNewer.addAll(localVersions.keySet()); - localIsNewer.removeAll(remoteVersions.keySet()); - - - for (Address address : remoteVersions.keySet()){ - - if (localVersions.get(address) == null || remoteVersions.get(address) == null) - continue; //this condition is taken care of by above diffs - if (localVersions.get(address) < remoteVersions.get(address)) - localIsOlder.add(address); - else if (localVersions.get(address) > remoteVersions.get(address)) - localIsNewer.add(address); - else - continue; - } - - if (!localIsOlder.isEmpty()) - sendGossipStatusTo(sender, localVersions ); - - if (!localIsNewer.isEmpty()) - sendGossipTo(sender, localIsNewer);//send newer buckets to remote - - } - return null; - } - }; + @VisibleForTesting + void updateRemoteBuckets(final Map> buckets) { + // filter this so we only handle buckets for known peers + bucketStore.updateRemoteBuckets(Maps.filterKeys(buckets, peers::containsKey)); } /** - * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} - * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. - * These buckets are sent to a remote member encapsulated in - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} - * - * @param sender the remote member that sent - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} - * in reply to which bucket is being sent back - * @return a {@link akka.dispatch.Mapper} that gets evaluated in future + * Gets bucket versions from bucket store and sends to the supplied address. * + * @param remoteActorSystemAddress remote gossiper to send to */ - private Mapper getMapperToSendGossip(final ActorRef sender) { - - return new Mapper() { - @Override - public Void apply(Object msg) { - if (msg instanceof GetBucketsByMembersReply) { - Map buckets = ((GetBucketsByMembersReply) msg).getBuckets(); - log.debug("Buckets to send from {}: {}", selfAddress, buckets); - GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets); - sender.tell(envelope, getSelf()); - } - return null; - } - }; + @VisibleForTesting + void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) { + bucketStore.getBucketVersions(versions -> { + LOG.trace("Sending bucket versions to [{}]", remoteGossiper); + /* + * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring, + * but can we identify which bucket is the local one? + */ + remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf()); + }); } /// ///Getter Setters /// - List
    getClusterMembers() { - return clusterMembers; - } - void setClusterMembers(List
    clusterMembers) { - this.clusterMembers = clusterMembers; - } + @VisibleForTesting + void setClusterMembers(final Address... members) { + clusterMembers.clear(); + peers.clear(); - Address getSelfAddress() { - return selfAddress; - } - - void setSelfAddress(Address selfAddress) { - this.selfAddress = selfAddress; + for (Address addr : members) { + addPeer(addr); + } } }