import akka.cluster.Member;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
/**
* Gossiper that syncs bucket store across nodes in the cluster.
- * <p/>
+ *
+ * <p>
* It keeps a local scheduler that periodically sends Gossip ticks to
* itself to send bucket store's bucket versions to a randomly selected remote
* gossiper.
- * <p/>
+ *
+ * <p>
* When bucket versions are received from a remote gossiper, it is compared
* with bucket store's bucket versions. Which ever buckets are newer
* locally, are sent to remote gossiper. If any bucket is older in bucket store,
* a gossip status is sent to remote gossiper so that it can send the newer buckets.
- * <p/>
+ *
+ * <p>
* When a bucket is received from a remote gossiper, its sent to the bucket store
* for update.
- *
*/
-
public class Gossiper extends AbstractUntypedActorWithMetering {
private final Logger log = LoggerFactory.getLogger(getClass());
private Address selfAddress;
/**
- * All known cluster members
+ * All known cluster members.
*/
private List<Address> clusterMembers = new ArrayList<>();
private Boolean autoStartGossipTicks = true;
- private RemoteRpcProviderConfig config;
+ private final RemoteRpcProviderConfig config;
- public Gossiper(){
- config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ public Gossiper(RemoteRpcProviderConfig config) {
+ this.config = Preconditions.checkNotNull(config);
}
/**
- * Helpful for testing
+ * Constructor for testing.
+ *
* @param autoStartGossipTicks used for turning off gossip ticks during testing.
* Gossip tick can be manually sent.
*/
- public Gossiper(Boolean autoStartGossipTicks){
+ @VisibleForTesting
+ public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
+ this(config);
this.autoStartGossipTicks = autoStartGossipTicks;
}
@Override
- public void preStart(){
+ public void preStart() {
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
}
@Override
- public void postStop(){
+ public void postStop() {
if (cluster != null) {
cluster.unsubscribe(getSelf());
}
}
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
} else if (message instanceof ClusterEvent.MemberRemoved) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
- } else if ( message instanceof ClusterEvent.UnreachableMember){
+ } else if ( message instanceof ClusterEvent.UnreachableMember) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
} else {
*/
void receiveMemberRemoveOrUnreachable(Member member) {
//if its self, then stop itself
- if (selfAddress.equals(member.address())){
+ if (selfAddress.equals(member.address())) {
getContext().stop(getSelf());
return;
}
clusterMembers.remove(member.address());
- if(log.isDebugEnabled()) {
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
- }
+ log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
- * Add member to the local copy of member list if it doesnt already
- * @param member
+ * Add member to the local copy of member list if it doesn't already.
+ *
+ * @param member the member to add
*/
void receiveMemberUp(Member member) {
if (!clusterMembers.contains(member.address())) {
clusterMembers.add(member.address());
}
- if(log.isDebugEnabled()) {
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
- }
+
+ log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
- * Sends Gossip status to other members in the cluster. <br/>
- * 1. If there are no member, ignore the tick. </br>
- * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
+ * Sends Gossip status to other members in the cluster.
+ * <br>
+ * 1. If there are no member, ignore the tick. <br>
+ * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
- void receiveGossipTick(){
+ void receiveGossipTick() {
if (clusterMembers.size() == 0) {
return; //no members to send gossip status to
}
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
- if(log.isDebugEnabled()) {
- log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
- }
+
+ log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
/**
* Process gossip status received from a remote gossiper. Remote versions are compared with
- * the local copy. <p>
- *
+ * the local copy.
+ * <p/>
* For each bucket
* <ul>
* <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
*
* @param status bucket versions from a remote member
*/
- void receiveGossipStatus(GossipStatus status){
+ void receiveGossipStatus(GossipStatus status) {
//Don't accept messages from non-members
if (!clusterMembers.contains(status.from())) {
return;
*
* @param envelope contains buckets from a remote gossiper
*/
- void receiveGossip(GossipEnvelope envelope){
+ <T extends Copier<T>> void receiveGossip(GossipEnvelope<T> envelope) {
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- if(log.isDebugEnabled()) {
- log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ if (log.isTraceEnabled()) {
+ log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
+ envelope.to());
}
return;
}
}
/**
- * Helper to send received buckets to bucket store
+ * Helper to send received buckets to bucket store.
*
- * @param buckets
+ * @param buckets map of Buckets to update
*/
- void updateRemoteBuckets(Map<Address, Bucket> buckets) {
-
- UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
+ <T extends Copier<T>> void updateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
+ UpdateRemoteBuckets<T> updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
getContext().parent().tell(updateRemoteBuckets, getSelf());
}
/**
- * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
+ * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
*
* @param remote remote node to send Buckets to
* @param addresses node addresses whose buckets needs to be sent
*/
- void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
+ void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
Future<Object> futureReply =
Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
}
/**
- * Gets bucket versions from bucket store and sends to the supplied address
+ * Gets bucket versions from bucket store and sends to the supplied address.
*
* @param remoteActorSystemAddress remote gossiper to send to
*/
- void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
+ void getLocalStatusAndSendTo(Address remoteActorSystemAddress) {
//Get local status from bucket store and send to remote
Future<Object> futureReply =
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
- if(log.isDebugEnabled()) {
- log.debug("Sending bucket versions to [{}]", remoteRef);
- }
+ log.trace("Sending bucket versions to [{}]", remoteRef);
futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
}
/**
- * Helper to send bucket versions received from local store
+ * Helper to send bucket versions received from local store.
+ *
* @param remote remote gossiper to send versions to
* @param localVersions bucket versions received from local store
*/
- void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
+ void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions) {
GossipStatus status = new GossipStatus(selfAddress, localVersions);
remote.tell(status, getSelf());
}
- void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
+ void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions) {
GossipStatus status = new GossipStatus(selfAddress, localVersions);
remote.tell(status, getSelf());
/// Private factories to create mappers
///
- private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
+ private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
return new Mapper<Object, Void>() {
@Override
* @return a {@link akka.dispatch.Mapper} that gets evaluated in future
*
*/
- private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
+ private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
final Map<Address, Long> remoteVersions = status.getVersions();
localIsNewer.removeAll(remoteVersions.keySet());
- for (Address address : remoteVersions.keySet()){
+ for (Address address : remoteVersions.keySet()) {
if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
continue; //this condition is taken care of by above diffs
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
*
* @param sender the remote member that sent
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * in reply to which bucket is being sent back
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
+ * in reply to which bucket is being sent back
* @return a {@link akka.dispatch.Mapper} that gets evaluated in future
*
*/
private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
return new Mapper<Object, Void>() {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public Void apply(Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
- Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- if(log.isDebugEnabled()) {
- log.debug("Buckets to send from {}: {}", selfAddress, buckets);
- }
+ Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
+ log.trace("Buckets to send from {}: {}", selfAddress, buckets);
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}