import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
+import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
* for update.
*/
public class Gossiper extends AbstractUntypedActorWithMetering {
+ private final boolean autoStartGossipTicks;
+ private final RemoteRpcProviderConfig config;
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private Cluster cluster;
+ /**
+ * All known cluster members.
+ */
+ private final List<Address> clusterMembers = new ArrayList<>();
/**
* ActorSystem's address for the current cluster node.
*/
private Address selfAddress;
- /**
- * All known cluster members.
- */
- private List<Address> clusterMembers = new ArrayList<>();
+ private Cluster cluster;
private Cancellable gossipTask;
- private Boolean autoStartGossipTicks = true;
+ Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
+ this.config = Preconditions.checkNotNull(config);
+ this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
+ }
- private final RemoteRpcProviderConfig config;
+ Gossiper(final RemoteRpcProviderConfig config) {
+ this(config, Boolean.TRUE);
+ }
- public Gossiper(RemoteRpcProviderConfig config) {
- this.config = Preconditions.checkNotNull(config);
+ public static Props props(final RemoteRpcProviderConfig config) {
+ return Props.create(Gossiper.class, config);
}
- /**
- * Constructor for testing.
- *
- * @param autoStartGossipTicks used for turning off gossip ticks during testing.
- * Gossip tick can be manually sent.
- */
- @VisibleForTesting
- public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
- this(config);
- this.autoStartGossipTicks = autoStartGossipTicks;
+ static Props testProps(final RemoteRpcProviderConfig config) {
+ return Props.create(Gossiper.class, config, Boolean.FALSE);
}
@Override
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- if ( provider instanceof ClusterActorRefProvider ) {
+ if (provider instanceof ClusterActorRefProvider ) {
cluster = Cluster.get(getContext().system());
cluster.subscribe(getSelf(),
ClusterEvent.initialStateAsEvents(),
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
config.getGossipTickInterval(), //interval
- getSelf(), //target
- new Messages.GossiperMessages.GossipTick(), //message
- getContext().dispatcher(), //execution context
- getSelf() //sender
+ getSelf(), //target
+ new Messages.GossiperMessages.GossipTick(), //message
+ getContext().dispatcher(), //execution context
+ getSelf() //sender
);
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- protected void handleReceive(Object message) throws Exception {
+ protected void handleReceive(final Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
if (message instanceof GossipTick) {
} else if (message instanceof ClusterEvent.MemberRemoved) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
- } else if ( message instanceof ClusterEvent.UnreachableMember) {
+ } else if (message instanceof ClusterEvent.UnreachableMember) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
} else {
*
* @param member who went down
*/
- void receiveMemberRemoveOrUnreachable(Member member) {
+ private void receiveMemberRemoveOrUnreachable(final Member member) {
//if its self, then stop itself
if (selfAddress.equals(member.address())) {
getContext().stop(getSelf());
}
clusterMembers.remove(member.address());
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+
+ getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender());
}
/**
*
* @param member the member to add
*/
- void receiveMemberUpOrReachable(final Member member) {
-
+ private void receiveMemberUpOrReachable(final Member member) {
+ //ignore up notification for self
if (selfAddress.equals(member.address())) {
- //ignore up notification for self
return;
}
clusterMembers.add(member.address());
}
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
* 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
+ @VisibleForTesting
void receiveGossipTick() {
- if (clusterMembers.size() == 0) {
- return; //no members to send gossip status to
- }
-
- Address remoteMemberToGossipTo;
-
- if (clusterMembers.size() == 1) {
- remoteMemberToGossipTo = clusterMembers.get(0);
- } else {
- Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
- remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+ final Address remoteMemberToGossipTo;
+ switch (clusterMembers.size()) {
+ case 0:
+ //no members to send gossip status to
+ return;
+ case 1:
+ remoteMemberToGossipTo = clusterMembers.get(0);
+ break;
+ default:
+ final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
+ remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+ break;
}
- log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
+ LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo);
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
*
* @param status bucket versions from a remote member
*/
- void receiveGossipStatus(GossipStatus status) {
+ @VisibleForTesting
+ void receiveGossipStatus(final GossipStatus status) {
//Don't accept messages from non-members
if (!clusterMembers.contains(status.from())) {
return;
Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
-
}
/**
*
* @param envelope contains buckets from a remote gossiper
*/
- <T extends Copier<T>> void receiveGossip(GossipEnvelope<T> envelope) {
+ @VisibleForTesting
+ <T extends Copier<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- if (log.isTraceEnabled()) {
- log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
- envelope.to());
- }
+ LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
return;
}
updateRemoteBuckets(envelope.getBuckets());
-
}
/**
*
* @param buckets map of Buckets to update
*/
- <T extends Copier<T>> void updateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
- UpdateRemoteBuckets<T> updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
- getContext().parent().tell(updateRemoteBuckets, getSelf());
+ @VisibleForTesting
+ <T extends Copier<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
+ getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
}
/**
*
* @param remoteActorSystemAddress remote gossiper to send to
*/
- void getLocalStatusAndSendTo(Address remoteActorSystemAddress) {
+ @VisibleForTesting
+ void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) {
//Get local status from bucket store and send to remote
Future<Object> futureReply =
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
- log.trace("Sending bucket versions to [{}]", remoteRef);
+ LOG.trace("Sending bucket versions to [{}]", remoteRef);
futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
-
}
/**
* @param remote remote gossiper to send versions to
* @param localVersions bucket versions received from local store
*/
- void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions) {
+ void sendGossipStatusTo(final ActorRef remote, final Map<Address, Long> localVersions) {
GossipStatus status = new GossipStatus(selfAddress, localVersions);
remote.tell(status, getSelf());
}
- void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions) {
+ void sendGossipStatusTo(final ActorSelection remote, final Map<Address, Long> localVersions) {
GossipStatus status = new GossipStatus(selfAddress, localVersions);
remote.tell(status, getSelf());
return new Mapper<Object, Void>() {
@Override
- public Void apply(Object replyMessage) {
+ public Void apply(final Object replyMessage) {
if (replyMessage instanceof GetBucketVersionsReply) {
GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
Map<Address, Long> localVersions = reply.getVersions();
return new Mapper<Object, Void>() {
@Override
- public Void apply(Object replyMessage) {
+ public Void apply(final Object replyMessage) {
if (replyMessage instanceof GetBucketVersionsReply) {
GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
Map<Address, Long> localVersions = reply.getVersions();
Long remoteVersion = entry.getValue();
Long localVersion = localVersions.get(address);
if (localVersion == null || remoteVersion == null) {
- continue; //this condition is taken care of by above diffs
+ //this condition is taken care of by above diffs
+ continue;
}
if (localVersion < remoteVersion) {
}
if (!localIsOlder.isEmpty()) {
- sendGossipStatusTo(sender, localVersions );
+ sendGossipStatusTo(sender, localVersions);
}
if (!localIsNewer.isEmpty()) {
- sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+ //send newer buckets to remote
+ sendGossipTo(sender, localIsNewer);
}
-
}
return null;
}
return new Mapper<Object, Void>() {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- public Void apply(Object msg) {
+ public Void apply(final Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- log.trace("Buckets to send from {}: {}", selfAddress, buckets);
+ LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}
///
///Getter Setters
///
- List<Address> getClusterMembers() {
- return clusterMembers;
- }
- void setClusterMembers(List<Address> clusterMembers) {
- this.clusterMembers = clusterMembers;
- }
-
- Address getSelfAddress() {
- return selfAddress;
- }
-
- void setSelfAddress(Address selfAddress) {
- this.selfAddress = selfAddress;
+ @VisibleForTesting
+ void setClusterMembers(final Address... members) {
+ clusterMembers.clear();
+ clusterMembers.addAll(Arrays.asList(members));
}
}