* It keeps a local scheduler that periodically sends Gossip ticks to
* itself to send bucket store's bucket versions to a randomly selected remote
* gossiper.
- *
+ *
+ *
* When bucket versions are received from a remote gossiper, it is compared
* with bucket store's bucket versions. Which ever buckets are newer
* locally, are sent to remote gossiper. If any bucket is older in bucket store,
* a gossip status is sent to remote gossiper so that it can send the newer buckets.
- *
+ *
+ *
* When a bucket is received from a remote gossiper, its sent to the bucket store
* for update.
- *
*/
-
public class Gossiper extends AbstractUntypedActorWithMetering {
+ private final boolean autoStartGossipTicks;
+ private final RemoteRpcProviderConfig config;
- private final Logger log = LoggerFactory.getLogger(getClass());
+ /**
+ * All known cluster members.
+ */
+ private final List
clusterMembers = new ArrayList<>();
- private Cluster cluster;
+ /**
+ * Cached ActorSelections for remote peers.
+ */
+ private final Map peers = new HashMap<>();
/**
* ActorSystem's address for the current cluster node.
*/
private Address selfAddress;
- /**
- * All known cluster members
- */
- private List clusterMembers = new ArrayList<>();
+ private Cluster cluster;
private Cancellable gossipTask;
- private Boolean autoStartGossipTicks = true;
+ Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
+ this.config = Preconditions.checkNotNull(config);
+ this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
+ }
- private RemoteRpcProviderConfig config;
+ Gossiper(final RemoteRpcProviderConfig config) {
+ this(config, Boolean.TRUE);
+ }
- public Gossiper(){
- config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ public static Props props(final RemoteRpcProviderConfig config) {
+ return Props.create(Gossiper.class, config);
}
- /**
- * Helpful for testing
- * @param autoStartGossipTicks used for turning off gossip ticks during testing.
- * Gossip tick can be manually sent.
- */
- public Gossiper(Boolean autoStartGossipTicks){
- this.autoStartGossipTicks = autoStartGossipTicks;
+ static Props testProps(final RemoteRpcProviderConfig config) {
+ return Props.create(Gossiper.class, config, Boolean.FALSE);
}
@Override
- public void preStart(){
+ public void preStart() {
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- if ( provider instanceof ClusterActorRefProvider ) {
+ if (provider instanceof ClusterActorRefProvider ) {
cluster = Cluster.get(getContext().system());
cluster.subscribe(getSelf(),
ClusterEvent.initialStateAsEvents(),
ClusterEvent.MemberEvent.class,
+ ClusterEvent.ReachableMember.class,
ClusterEvent.UnreachableMember.class);
}
@@ -109,16 +120,16 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
config.getGossipTickInterval(), //interval
- getSelf(), //target
- new Messages.GossiperMessages.GossipTick(), //message
- getContext().dispatcher(), //execution context
- getSelf() //sender
+ getSelf(), //target
+ new Messages.GossiperMessages.GossipTick(), //message
+ getContext().dispatcher(), //execution context
+ getSelf() //sender
);
}
}
@Override
- public void postStop(){
+ public void postStop() {
if (cluster != null) {
cluster.unsubscribe(getSelf());
}
@@ -127,8 +138,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
}
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- protected void handleReceive(Object message) throws Exception {
+ protected void handleReceive(final Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
if (message instanceof GossipTick) {
@@ -142,12 +154,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
// comparing the GossipStatus message with its local versions.
receiveGossip((GossipEnvelope) message);
} else if (message instanceof ClusterEvent.MemberUp) {
- receiveMemberUp(((ClusterEvent.MemberUp) message).member());
+ receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
+
+ } else if (message instanceof ClusterEvent.ReachableMember) {
+ receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
} else if (message instanceof ClusterEvent.MemberRemoved) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
- } else if ( message instanceof ClusterEvent.UnreachableMember){
+ } else if (message instanceof ClusterEvent.UnreachableMember) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
} else {
@@ -160,66 +175,77 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
*
* @param member who went down
*/
- void receiveMemberRemoveOrUnreachable(Member member) {
+ private void receiveMemberRemoveOrUnreachable(final Member member) {
//if its self, then stop itself
- if (selfAddress.equals(member.address())){
+ if (selfAddress.equals(member.address())) {
getContext().stop(getSelf());
return;
}
- clusterMembers.remove(member.address());
- if(log.isDebugEnabled()) {
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ removePeer(member.address());
+ LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
+
+ private void addPeer(final Address address) {
+ if (!clusterMembers.contains(address)) {
+ clusterMembers.add(address);
}
+ peers.computeIfAbsent(address, input -> getContext().system()
+ .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
+ }
+
+ private void removePeer(final Address address) {
+ clusterMembers.remove(address);
+ peers.remove(address);
+ getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
}
/**
- * Add member to the local copy of member list if it doesnt already
- * @param member
+ * Add member to the local copy of member list if it doesn't already.
+ *
+ * @param member the member to add
*/
- void receiveMemberUp(Member member) {
-
+ private void receiveMemberUpOrReachable(final Member member) {
+ //ignore up notification for self
if (selfAddress.equals(member.address())) {
- return; //ignore up notification for self
+ return;
}
- if (!clusterMembers.contains(member.address())) {
- clusterMembers.add(member.address());
- }
- if(log.isDebugEnabled()) {
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
- }
+ addPeer(member.address());
+ LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
- * Sends Gossip status to other members in the cluster.
- * 1. If there are no member, ignore the tick.
- * 2. If there's only 1 member, send gossip status (bucket versions) to it.
+ * Sends Gossip status to other members in the cluster.
+ *
+ * 1. If there are no member, ignore the tick.
+ * 2. If there's only 1 member, send gossip status (bucket versions) to it.
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
- void receiveGossipTick(){
- if (clusterMembers.size() == 0) {
- return; //no members to send gossip status to
+ @VisibleForTesting
+ void receiveGossipTick() {
+ final Address address;
+ switch (clusterMembers.size()) {
+ case 0:
+ //no members to send gossip status to
+ return;
+ case 1:
+ address = clusterMembers.get(0);
+ break;
+ default:
+ final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
+ address = clusterMembers.get(randomIndex);
+ break;
}
- Address remoteMemberToGossipTo;
-
- if (clusterMembers.size() == 1) {
- remoteMemberToGossipTo = clusterMembers.get(0);
- } else {
- Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
- remoteMemberToGossipTo = clusterMembers.get(randomIndex);
- }
- if(log.isDebugEnabled()) {
- log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
- }
- getLocalStatusAndSendTo(remoteMemberToGossipTo);
+ LOG.trace("Gossiping to [{}]", address);
+ getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
}
/**
* Process gossip status received from a remote gossiper. Remote versions are compared with
- * the local copy.
- *
+ * the local copy.
+ *
* For each bucket
*
*
If local copy is newer, the newer buckets are sent in GossipEnvelope to remote
@@ -229,9 +255,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
*
* @param status bucket versions from a remote member
*/
- void receiveGossipStatus(GossipStatus status){
- //Don't accept messages from non-members
- if (!clusterMembers.contains(status.from())) {
+ @VisibleForTesting
+ void receiveGossipStatus(final GossipStatus status) {
+ // Don't accept messages from non-members
+ if (!peers.containsKey(status.from())) {
return;
}
@@ -240,7 +267,6 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
-
}
/**
@@ -248,37 +274,34 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
*
* @param envelope contains buckets from a remote gossiper
*/
- void receiveGossip(GossipEnvelope envelope){
+ @VisibleForTesting
+ > void receiveGossip(final GossipEnvelope envelope) {
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- if(log.isDebugEnabled()) {
- log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
- }
+ LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
return;
}
updateRemoteBuckets(envelope.getBuckets());
-
}
/**
- * Helper to send received buckets to bucket store
+ * Helper to send received buckets to bucket store.
*
- * @param buckets
+ * @param buckets map of Buckets to update
*/
- void updateRemoteBuckets(Map buckets) {
-
- UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
- getContext().parent().tell(updateRemoteBuckets, getSelf());
+ @VisibleForTesting
+ > void updateRemoteBuckets(final Map> buckets) {
+ getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
}
/**
- * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
+ * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
*
* @param remote remote node to send Buckets to
* @param addresses node addresses whose buckets needs to be sent
*/
- void sendGossipTo(final ActorRef remote, final Set addresses){
+ void sendGossipTo(final ActorRef remote, final Set addresses) {
Future