import akka.pattern.Patterns;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
*/
private final List<Address> clusterMembers = new ArrayList<>();
+ /**
+ * Cached ActorSelections for remote peers.
+ */
+ private final Map<Address, ActorSelection> peers = new HashMap<>();
+
/**
* ActorSystem's address for the current cluster node.
*/
return;
}
- clusterMembers.remove(member.address());
+ removePeer(member.address());
LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
- getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender());
+ private void addPeer(final Address address) {
+ if (!clusterMembers.contains(address)) {
+ clusterMembers.add(address);
+ }
+ peers.computeIfAbsent(address, input -> getContext().system()
+ .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
+ }
+
+ private void removePeer(final Address address) {
+ clusterMembers.remove(address);
+ peers.remove(address);
+ getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
}
/**
return;
}
- if (!clusterMembers.contains(member.address())) {
- clusterMembers.add(member.address());
- }
-
+ addPeer(member.address());
LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
*/
@VisibleForTesting
void receiveGossipTick() {
- final Address remoteMemberToGossipTo;
+ final Address address;
switch (clusterMembers.size()) {
case 0:
//no members to send gossip status to
return;
case 1:
- remoteMemberToGossipTo = clusterMembers.get(0);
+ address = clusterMembers.get(0);
break;
default:
final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
- remoteMemberToGossipTo = clusterMembers.get(randomIndex);
+ address = clusterMembers.get(randomIndex);
break;
}
- LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo);
- getLocalStatusAndSendTo(remoteMemberToGossipTo);
+ LOG.trace("Gossiping to [{}]", address);
+ getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
}
/**
*/
@VisibleForTesting
void receiveGossipStatus(final GossipStatus status) {
- //Don't accept messages from non-members
- if (!clusterMembers.contains(status.from())) {
+ // Don't accept messages from non-members
+ if (!peers.containsKey(status.from())) {
return;
}
* @param remoteActorSystemAddress remote gossiper to send to
*/
@VisibleForTesting
- void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) {
+ void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
//Get local status from bucket store and send to remote
Future<Object> futureReply =
Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
- //Find gossiper on remote system
- ActorSelection remoteRef = getContext().system().actorSelection(
- remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
-
- LOG.trace("Sending bucket versions to [{}]", remoteRef);
-
- futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
- }
-
- /**
- * Helper to send bucket versions received from local store.
- *
- * @param remote remote gossiper to send versions to
- * @param localVersions bucket versions received from local store
- */
- void sendGossipStatusTo(final ActorRef remote, final Map<Address, Long> localVersions) {
+ LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
- GossipStatus status = new GossipStatus(selfAddress, localVersions);
- remote.tell(status, getSelf());
- }
-
- void sendGossipStatusTo(final ActorSelection remote, final Map<Address, Long> localVersions) {
-
- GossipStatus status = new GossipStatus(selfAddress, localVersions);
- remote.tell(status, getSelf());
+ futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
}
///
GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
Map<Address, Long> localVersions = reply.getVersions();
- sendGossipStatusTo(remote, localVersions);
-
+ remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
}
return null;
}
}
if (!localIsOlder.isEmpty()) {
- sendGossipStatusTo(sender, localVersions);
+ sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
}
if (!localIsNewer.isEmpty()) {
@VisibleForTesting
void setClusterMembers(final Address... members) {
clusterMembers.clear();
- clusterMembers.addAll(Arrays.asList(members));
+ peers.clear();
+
+ for (Address addr : members) {
+ addPeer(addr);
+ }
}
}