import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
-import akka.dispatch.Mapper;
-import akka.pattern.Patterns;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
/**
* for update.
*/
public class Gossiper extends AbstractUntypedActorWithMetering {
+ private static final Object GOSSIP_TICK = new Object() {
+ @Override
+ public String toString() {
+ return "gossip tick";
+ }
+ };
+
private final boolean autoStartGossipTicks;
private final RemoteRpcProviderConfig config;
private Cancellable gossipTask;
+ private BucketStoreAccess bucketStore;
+
Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
this.config = Preconditions.checkNotNull(config);
this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- if (provider instanceof ClusterActorRefProvider ) {
+ bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration());
+
+ if (provider instanceof ClusterActorRefProvider) {
cluster = Cluster.get(getContext().system());
cluster.subscribe(getSelf(),
ClusterEvent.initialStateAsEvents(),
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
config.getGossipTickInterval(), //interval
getSelf(), //target
- new Messages.GossiperMessages.GossipTick(), //message
+ GOSSIP_TICK, //message
getContext().dispatcher(), //execution context
getSelf() //sender
);
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void handleReceive(final Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
- if (message instanceof GossipTick) {
+ if (GOSSIP_TICK.equals(message)) {
receiveGossipTick();
} else if (message instanceof GossipStatus) {
// Message from remote gossiper with its bucket versions
* @param member who went down
*/
private void receiveMemberRemoveOrUnreachable(final Member member) {
+ LOG.debug("Received memberDown or Unreachable: {}", member);
+
//if its self, then stop itself
if (selfAddress.equals(member.address())) {
getContext().stop(getSelf());
private void removePeer(final Address address) {
clusterMembers.remove(address);
peers.remove(address);
- getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
+ bucketStore.removeRemoteBucket(address);
}
/**
* @param member the member to add
*/
private void receiveMemberUpOrReachable(final Member member) {
+ LOG.debug("Received memberUp or reachable: {}", member);
+
//ignore up notification for self
if (selfAddress.equals(member.address())) {
return;
@VisibleForTesting
void receiveGossipStatus(final GossipStatus status) {
// Don't accept messages from non-members
- if (!peers.containsKey(status.from())) {
- return;
+ if (peers.containsKey(status.from())) {
+ // FIXME: sender should be part of GossipStatus
+ final ActorRef sender = getSender();
+ bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions));
}
+ }
+
+ private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
+ final Map<Address, Long> localVersions) {
+ final Map<Address, Long> remoteVersions = status.versions();
+
+ //diff between remote list and local
+ final Set<Address> localIsOlder = new HashSet<>(remoteVersions.keySet());
+ localIsOlder.removeAll(localVersions.keySet());
+
+ //diff between local list and remote
+ final Set<Address> localIsNewer = new HashSet<>(localVersions.keySet());
+ localIsNewer.removeAll(remoteVersions.keySet());
- final ActorRef sender = getSender();
- Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
- futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
+ for (Entry<Address, Long> entry : remoteVersions.entrySet()) {
+ Address address = entry.getKey();
+ Long remoteVersion = entry.getValue();
+ Long localVersion = localVersions.get(address);
+ if (localVersion == null || remoteVersion == null) {
+ //this condition is taken care of by above diffs
+ continue;
+ }
+
+ if (localVersion < remoteVersion) {
+ localIsOlder.add(address);
+ } else if (localVersion > remoteVersion) {
+ localIsNewer.add(address);
+ }
+ }
+
+ if (!localIsOlder.isEmpty()) {
+ remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
+ }
+
+ if (!localIsNewer.isEmpty()) {
+ //send newer buckets to remote
+ bucketStore.getBucketsByMembers(localIsNewer, buckets -> {
+ LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
+ remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf());
+ });
+ }
}
/**
* @param envelope contains buckets from a remote gossiper
*/
@VisibleForTesting
- <T extends Copier<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
+ void receiveGossip(final GossipEnvelope envelope) {
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
return;
}
- updateRemoteBuckets(envelope.getBuckets());
+ updateRemoteBuckets(envelope.buckets());
}
/**
* @param buckets map of Buckets to update
*/
@VisibleForTesting
- <T extends Copier<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
- getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
- }
-
- /**
- * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
- *
- * @param remote remote node to send Buckets to
- * @param addresses node addresses whose buckets needs to be sent
- */
- void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
-
- Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
- futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
+ void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
+ // filter this so we only handle buckets for known peers
+ bucketStore.updateRemoteBuckets(Maps.filterKeys(buckets, peers::containsKey));
}
/**
*/
@VisibleForTesting
void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
-
- //Get local status from bucket store and send to remote
- Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
-
- LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
-
- futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
- }
-
- ///
- /// Private factories to create mappers
- ///
-
- private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(final Object replyMessage) {
- if (replyMessage instanceof GetBucketVersionsReply) {
- GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
- Map<Address, Long> localVersions = reply.getVersions();
-
- remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
- }
- return null;
- }
- };
- }
-
- /**
- * Process bucket versions received from
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
- * Then this method compares remote bucket versions with local bucket versions.
- * <ul>
- * <li>The buckets that are newer locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
- * to remote
- * <li>The buckets that are older locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * to remote so that remote sends GossipEnvelop.
- * </ul>
- *
- * @param sender the remote member
- * @param status bucket versions from a remote member
- * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
- *
- */
- private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
-
- final Map<Address, Long> remoteVersions = status.getVersions();
-
- return new Mapper<Object, Void>() {
- @Override
- public Void apply(final Object replyMessage) {
- if (replyMessage instanceof GetBucketVersionsReply) {
- GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
- Map<Address, Long> localVersions = reply.getVersions();
-
- //diff between remote list and local
- Set<Address> localIsOlder = new HashSet<>();
- localIsOlder.addAll(remoteVersions.keySet());
- localIsOlder.removeAll(localVersions.keySet());
-
- //diff between local list and remote
- Set<Address> localIsNewer = new HashSet<>();
- localIsNewer.addAll(localVersions.keySet());
- localIsNewer.removeAll(remoteVersions.keySet());
-
-
- for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
- Address address = entry.getKey();
- Long remoteVersion = entry.getValue();
- Long localVersion = localVersions.get(address);
- if (localVersion == null || remoteVersion == null) {
- //this condition is taken care of by above diffs
- continue;
- }
-
- if (localVersion < remoteVersion) {
- localIsOlder.add(address);
- } else if (localVersion > remoteVersion) {
- localIsNewer.add(address);
- }
- }
-
- if (!localIsOlder.isEmpty()) {
- sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
- }
-
- if (!localIsNewer.isEmpty()) {
- //send newer buckets to remote
- sendGossipTo(sender, localIsNewer);
- }
- }
- return null;
- }
- };
- }
-
- /**
- * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
- * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
- * These buckets are sent to a remote member encapsulated in
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
- *
- * @param sender the remote member that sent
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * in reply to which bucket is being sent back
- * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
- *
- */
- private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
-
- return new Mapper<Object, Void>() {
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public Void apply(final Object msg) {
- if (msg instanceof GetBucketsByMembersReply) {
- Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
- GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
- sender.tell(envelope, getSelf());
- }
- return null;
- }
- };
+ bucketStore.getBucketVersions(versions -> {
+ LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
+ /*
+ * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring,
+ * but can we identify which bucket is the local one?
+ */
+ remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf());
+ });
}
///