package org.opendaylight.controller.remote.rpc.registry.gossip;
import akka.actor.ActorRef;
+import akka.actor.ActorRefProvider;
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
-import akka.actor.UntypedActor;
+import akka.actor.Props;
import akka.cluster.Cluster;
+import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
-import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import akka.pattern.Patterns;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Maps;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import scala.concurrent.duration.FiniteDuration;
/**
* Gossiper that syncs bucket store across nodes in the cluster.
- * <p/>
+ *
+ * <p>
* It keeps a local scheduler that periodically sends Gossip ticks to
* itself to send bucket store's bucket versions to a randomly selected remote
* gossiper.
- * <p/>
+ *
+ * <p>
* When bucket versions are received from a remote gossiper, it is compared
* with bucket store's bucket versions. Which ever buckets are newer
* locally, are sent to remote gossiper. If any bucket is older in bucket store,
* a gossip status is sent to remote gossiper so that it can send the newer buckets.
- * <p/>
+ *
+ * <p>
* When a bucket is received from a remote gossiper, its sent to the bucket store
* for update.
- *
*/
+public class Gossiper extends AbstractUntypedActorWithMetering {
+ private static final Object GOSSIP_TICK = new Object() {
+ @Override
+ public String toString() {
+ return "gossip tick";
+ }
+ };
-public class Gossiper extends UntypedActor {
-
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ private final boolean autoStartGossipTicks;
+ private final RemoteRpcProviderConfig config;
- Cluster cluster = Cluster.get(getContext().system());
+ /**
+ * All known cluster members.
+ */
+ private final List<Address> clusterMembers = new ArrayList<>();
/**
- * ActorSystem's address for the current cluster node.
+ * Cached ActorSelections for remote peers.
*/
- private Address selfAddress = cluster.selfAddress();
+ private final Map<Address, ActorSelection> peers = new HashMap<>();
/**
- * All known cluster members
+ * ActorSystem's address for the current cluster node.
*/
- private List<Address> clusterMembers = new ArrayList<>();
+ private Address selfAddress;
+
+ private Cluster cluster;
private Cancellable gossipTask;
- private Boolean autoStartGossipTicks = true;
+ private BucketStoreAccess bucketStore;
- public Gossiper(){}
+ Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
+ this.config = Preconditions.checkNotNull(config);
+ this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
+ }
- /**
- * Helpful for testing
- * @param autoStartGossipTicks used for turning off gossip ticks during testing.
- * Gossip tick can be manually sent.
- */
- public Gossiper(Boolean autoStartGossipTicks){
- this.autoStartGossipTicks = autoStartGossipTicks;
+ Gossiper(final RemoteRpcProviderConfig config) {
+ this(config, Boolean.TRUE);
}
- @Override
- public void preStart(){
+ public static Props props(final RemoteRpcProviderConfig config) {
+ return Props.create(Gossiper.class, config);
+ }
- cluster.subscribe(getSelf(),
- ClusterEvent.initialStateAsEvents(),
- ClusterEvent.MemberEvent.class,
- ClusterEvent.UnreachableMember.class);
+ static Props testProps(final RemoteRpcProviderConfig config) {
+ return Props.create(Gossiper.class, config, Boolean.FALSE);
+ }
+
+ @Override
+ public void preStart() {
+ ActorRefProvider provider = getContext().provider();
+ selfAddress = provider.getDefaultAddress();
+
+ bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration());
+
+ if (provider instanceof ClusterActorRefProvider) {
+ cluster = Cluster.get(getContext().system());
+ cluster.subscribe(getSelf(),
+ ClusterEvent.initialStateAsEvents(),
+ ClusterEvent.MemberEvent.class,
+ ClusterEvent.ReachableMember.class,
+ ClusterEvent.UnreachableMember.class);
+ }
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
- new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
- getSelf(), //target
- new Messages.GossiperMessages.GossipTick(), //message
- getContext().dispatcher(), //execution context
- getSelf() //sender
+ config.getGossipTickInterval(), //interval
+ getSelf(), //target
+ GOSSIP_TICK, //message
+ getContext().dispatcher(), //execution context
+ getSelf() //sender
);
}
}
@Override
- public void postStop(){
- if (cluster != null)
+ public void postStop() {
+ if (cluster != null) {
cluster.unsubscribe(getSelf());
- if (gossipTask != null)
+ }
+ if (gossipTask != null) {
gossipTask.cancel();
+ }
}
@Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
-
+ protected void handleReceive(final Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
- if (message instanceof GossipTick)
+ if (GOSSIP_TICK.equals(message)) {
receiveGossipTick();
-
- //Message from remote gossiper with its bucket versions
- else if (message instanceof GossipStatus)
+ } else if (message instanceof GossipStatus) {
+ // Message from remote gossiper with its bucket versions
receiveGossipStatus((GossipStatus) message);
-
- //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
- //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
- //message with its local versions
- else if (message instanceof GossipEnvelope)
+ } else if (message instanceof GossipEnvelope) {
+ // Message from remote gossiper with buckets. This is usually in response to GossipStatus
+ // message. The contained buckets are newer as determined by the remote gossiper by
+ // comparing the GossipStatus message with its local versions.
receiveGossip((GossipEnvelope) message);
+ } else if (message instanceof ClusterEvent.MemberUp) {
+ receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
- else if (message instanceof ClusterEvent.MemberUp) {
- receiveMemberUp(((ClusterEvent.MemberUp) message).member());
+ } else if (message instanceof ClusterEvent.ReachableMember) {
+ receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
} else if (message instanceof ClusterEvent.MemberRemoved) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
- } else if ( message instanceof ClusterEvent.UnreachableMember){
+ } else if (message instanceof ClusterEvent.UnreachableMember) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
- } else
+ } else {
unhandled(message);
+ }
}
/**
*
* @param member who went down
*/
- void receiveMemberRemoveOrUnreachable(Member member) {
+ private void receiveMemberRemoveOrUnreachable(final Member member) {
+ LOG.debug("Received memberDown or Unreachable: {}", member);
+
//if its self, then stop itself
- if (selfAddress.equals(member.address())){
+ if (selfAddress.equals(member.address())) {
getContext().stop(getSelf());
return;
}
- clusterMembers.remove(member.address());
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ removePeer(member.address());
+ LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
+
+ private void addPeer(final Address address) {
+ if (!clusterMembers.contains(address)) {
+ clusterMembers.add(address);
+ }
+ peers.computeIfAbsent(address, input -> getContext().system()
+ .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
+ }
+
+ private void removePeer(final Address address) {
+ clusterMembers.remove(address);
+ peers.remove(address);
+ bucketStore.removeRemoteBucket(address);
}
/**
- * Add member to the local copy of member list if it doesnt already
- * @param member
+ * Add member to the local copy of member list if it doesn't already.
+ *
+ * @param member the member to add
*/
- void receiveMemberUp(Member member) {
+ private void receiveMemberUpOrReachable(final Member member) {
+ LOG.debug("Received memberUp or reachable: {}", member);
- if (selfAddress.equals(member.address()))
- return; //ignore up notification for self
-
- if (!clusterMembers.contains(member.address()))
- clusterMembers.add(member.address());
+ //ignore up notification for self
+ if (selfAddress.equals(member.address())) {
+ return;
+ }
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ addPeer(member.address());
+ LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
- * Sends Gossip status to other members in the cluster. <br/>
- * 1. If there are no member, ignore the tick. </br>
- * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
+ * Sends Gossip status to other members in the cluster.
+ * <br>
+ * 1. If there are no member, ignore the tick. <br>
+ * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
- void receiveGossipTick(){
- if (clusterMembers.size() == 0) return; //no members to send gossip status to
-
- Address remoteMemberToGossipTo = null;
-
- if (clusterMembers.size() == 1)
- remoteMemberToGossipTo = clusterMembers.get(0);
- else {
- Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
- remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+ @VisibleForTesting
+ void receiveGossipTick() {
+ final Address address;
+ switch (clusterMembers.size()) {
+ case 0:
+ //no members to send gossip status to
+ return;
+ case 1:
+ address = clusterMembers.get(0);
+ break;
+ default:
+ final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
+ address = clusterMembers.get(randomIndex);
+ break;
}
- log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
- getLocalStatusAndSendTo(remoteMemberToGossipTo);
+ LOG.trace("Gossiping to [{}]", address);
+ getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
}
/**
* Process gossip status received from a remote gossiper. Remote versions are compared with
- * the local copy. <p>
- *
+ * the local copy.
+ * <p/>
* For each bucket
* <ul>
* <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
*
* @param status bucket versions from a remote member
*/
- void receiveGossipStatus(GossipStatus status){
- //Don't accept messages from non-members
- if (!clusterMembers.contains(status.from()))
- return;
-
- final ActorRef sender = getSender();
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
- futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
-
- }
-
- /**
- * Sends the received buckets in the envelope to the parent Bucket store.
- *
- * @param envelope contains buckets from a remote gossiper
- */
- void receiveGossip(GossipEnvelope envelope){
- //TODO: Add more validations
- if (!selfAddress.equals(envelope.to())) {
- log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
- return;
+ @VisibleForTesting
+ void receiveGossipStatus(final GossipStatus status) {
+ // Don't accept messages from non-members
+ if (peers.containsKey(status.from())) {
+ // FIXME: sender should be part of GossipStatus
+ final ActorRef sender = getSender();
+ bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions));
}
-
- updateRemoteBuckets(envelope.getBuckets());
-
}
- /**
- * Helper to send received buckets to bucket store
- *
- * @param buckets
- */
- void updateRemoteBuckets(Map<Address, Bucket> buckets) {
+ private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
+ final Map<Address, Long> localVersions) {
+ final Map<Address, Long> remoteVersions = status.versions();
- UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
- getContext().parent().tell(updateRemoteBuckets, getSelf());
- }
+ //diff between remote list and local
+ final Set<Address> localIsOlder = new HashSet<>(remoteVersions.keySet());
+ localIsOlder.removeAll(localVersions.keySet());
- /**
- * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
- *
- * @param remote remote node to send Buckets to
- * @param addresses node addresses whose buckets needs to be sent
- */
- void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
+ //diff between local list and remote
+ final Set<Address> localIsNewer = new HashSet<>(localVersions.keySet());
+ localIsNewer.removeAll(remoteVersions.keySet());
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
- futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
- }
- /**
- * Gets bucket versions from bucket store and sends to the supplied address
- *
- * @param remoteActorSystemAddress remote gossiper to send to
- */
- void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
-
- //Get local status from bucket store and send to remote
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
- ActorSelection remoteRef = getContext().system().actorSelection(
- remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
+ for (Entry<Address, Long> entry : remoteVersions.entrySet()) {
+ Address address = entry.getKey();
+ Long remoteVersion = entry.getValue();
+ Long localVersion = localVersions.get(address);
+ if (localVersion == null || remoteVersion == null) {
+ //this condition is taken care of by above diffs
+ continue;
+ }
- log.debug("Sending bucket versions to [{}]", remoteRef);
+ if (localVersion < remoteVersion) {
+ localIsOlder.add(address);
+ } else if (localVersion > remoteVersion) {
+ localIsNewer.add(address);
+ }
+ }
- futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
+ if (!localIsOlder.isEmpty()) {
+ remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
+ }
+ if (!localIsNewer.isEmpty()) {
+ //send newer buckets to remote
+ bucketStore.getBucketsByMembers(localIsNewer, buckets -> {
+ LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
+ remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf());
+ });
+ }
}
/**
- * Helper to send bucket versions received from local store
- * @param remote remote gossiper to send versions to
- * @param localVersions bucket versions received from local store
+ * Sends the received buckets in the envelope to the parent Bucket store.
+ *
+ * @param envelope contains buckets from a remote gossiper
*/
- void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
-
- GossipStatus status = new GossipStatus(selfAddress, localVersions);
- remote.tell(status, getSelf());
- }
-
- void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
-
- GossipStatus status = new GossipStatus(selfAddress, localVersions);
- remote.tell(status, getSelf());
- }
-
- ///
- /// Private factories to create mappers
- ///
-
- private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetBucketVersionsReply) {
- GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
- Map<Address, Long> localVersions = reply.getVersions();
-
- sendGossipStatusTo(remote, localVersions);
+ @VisibleForTesting
+ void receiveGossip(final GossipEnvelope envelope) {
+ //TODO: Add more validations
+ if (!selfAddress.equals(envelope.to())) {
+ LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ return;
+ }
- }
- return null;
- }
- };
+ updateRemoteBuckets(envelope.buckets());
}
/**
- * Process bucket versions received from
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
- * Then this method compares remote bucket versions with local bucket versions.
- * <ul>
- * <li>The buckets that are newer locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
- * to remote
- * <li>The buckets that are older locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * to remote so that remote sends GossipEnvelop.
- * </ul>
- *
- * @param sender the remote member
- * @param status bucket versions from a remote member
- * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
+ * Helper to send received buckets to bucket store.
*
+ * @param buckets map of Buckets to update
*/
- private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
-
- final Map<Address, Long> remoteVersions = status.getVersions();
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object replyMessage) {
- if (replyMessage instanceof GetBucketVersionsReply) {
- GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
- Map<Address, Long> localVersions = reply.getVersions();
-
- //diff between remote list and local
- Set<Address> localIsOlder = new HashSet<>();
- localIsOlder.addAll(remoteVersions.keySet());
- localIsOlder.removeAll(localVersions.keySet());
-
- //diff between local list and remote
- Set<Address> localIsNewer = new HashSet<>();
- localIsNewer.addAll(localVersions.keySet());
- localIsNewer.removeAll(remoteVersions.keySet());
-
-
- for (Address address : remoteVersions.keySet()){
-
- if (localVersions.get(address) == null || remoteVersions.get(address) == null)
- continue; //this condition is taken care of by above diffs
- if (localVersions.get(address) < remoteVersions.get(address))
- localIsOlder.add(address);
- else if (localVersions.get(address) > remoteVersions.get(address))
- localIsNewer.add(address);
- else
- continue;
- }
-
- if (!localIsOlder.isEmpty())
- sendGossipStatusTo(sender, localVersions );
-
- if (!localIsNewer.isEmpty())
- sendGossipTo(sender, localIsNewer);//send newer buckets to remote
-
- }
- return null;
- }
- };
+ @VisibleForTesting
+ void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
+ // filter this so we only handle buckets for known peers
+ bucketStore.updateRemoteBuckets(Maps.filterKeys(buckets, peers::containsKey));
}
/**
- * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
- * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
- * These buckets are sent to a remote member encapsulated in
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
- *
- * @param sender the remote member that sent
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * in reply to which bucket is being sent back
- * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
+ * Gets bucket versions from bucket store and sends to the supplied address.
*
+ * @param remoteActorSystemAddress remote gossiper to send to
*/
- private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(Object msg) {
- if (msg instanceof GetBucketsByMembersReply) {
- Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- log.debug("Buckets to send from {}: {}", selfAddress, buckets);
- GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
- sender.tell(envelope, getSelf());
- }
- return null;
- }
- };
+ @VisibleForTesting
+ void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
+ bucketStore.getBucketVersions(versions -> {
+ LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
+ /*
+ * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring,
+ * but can we identify which bucket is the local one?
+ */
+ remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf());
+ });
}
///
///Getter Setters
///
- List<Address> getClusterMembers() {
- return clusterMembers;
- }
- void setClusterMembers(List<Address> clusterMembers) {
- this.clusterMembers = clusterMembers;
- }
+ @VisibleForTesting
+ void setClusterMembers(final Address... members) {
+ clusterMembers.clear();
+ peers.clear();
- Address getSelfAddress() {
- return selfAddress;
- }
-
- void setSelfAddress(Address selfAddress) {
- this.selfAddress = selfAddress;
+ for (Address addr : members) {
+ addPeer(addr);
+ }
}
}