import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
/**
* Gossiper that syncs bucket store across nodes in the cluster.
public class Gossiper extends AbstractUntypedActorWithMetering {
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ private final Logger log = LoggerFactory.getLogger(getClass());
private Cluster cluster;
@Override
public void postStop(){
- if (cluster != null)
+ if (cluster != null) {
cluster.unsubscribe(getSelf());
- if (gossipTask != null)
+ }
+ if (gossipTask != null) {
gossipTask.cancel();
+ }
}
@Override
protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
- if (message instanceof GossipTick)
+ if (message instanceof GossipTick) {
receiveGossipTick();
-
- //Message from remote gossiper with its bucket versions
- else if (message instanceof GossipStatus)
+ } else if (message instanceof GossipStatus) {
+ // Message from remote gossiper with its bucket versions
receiveGossipStatus((GossipStatus) message);
-
- //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
- //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
- //message with its local versions
- else if (message instanceof GossipEnvelope)
+ } else if (message instanceof GossipEnvelope) {
+ // Message from remote gossiper with buckets. This is usually in response to GossipStatus
+ // message. The contained buckets are newer as determined by the remote gossiper by
+ // comparing the GossipStatus message with its local versions.
receiveGossip((GossipEnvelope) message);
-
- else if (message instanceof ClusterEvent.MemberUp) {
+ } else if (message instanceof ClusterEvent.MemberUp) {
receiveMemberUp(((ClusterEvent.MemberUp) message).member());
} else if (message instanceof ClusterEvent.MemberRemoved) {
} else if ( message instanceof ClusterEvent.UnreachableMember){
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
- } else
+ } else {
unhandled(message);
+ }
}
/**
*/
void receiveMemberUp(Member member) {
- if (selfAddress.equals(member.address()))
+ if (selfAddress.equals(member.address())) {
return; //ignore up notification for self
+ }
- if (!clusterMembers.contains(member.address()))
+ if (!clusterMembers.contains(member.address())) {
clusterMembers.add(member.address());
+ }
if(log.isDebugEnabled()) {
log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
void receiveGossipTick(){
- if (clusterMembers.size() == 0) return; //no members to send gossip status to
+ if (clusterMembers.size() == 0) {
+ return; //no members to send gossip status to
+ }
Address remoteMemberToGossipTo;
- if (clusterMembers.size() == 1)
+ if (clusterMembers.size() == 1) {
remoteMemberToGossipTo = clusterMembers.get(0);
- else {
+ } else {
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
*/
void receiveGossipStatus(GossipStatus status){
//Don't accept messages from non-members
- if (!clusterMembers.contains(status.from()))
+ if (!clusterMembers.contains(status.from())) {
return;
+ }
final ActorRef sender = getSender();
Future<Object> futureReply =
for (Address address : remoteVersions.keySet()){
- if (localVersions.get(address) == null || remoteVersions.get(address) == null)
+ if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
continue; //this condition is taken care of by above diffs
- if (localVersions.get(address) < remoteVersions.get(address))
+ }
+ if (localVersions.get(address) < remoteVersions.get(address)) {
localIsOlder.add(address);
- else if (localVersions.get(address) > remoteVersions.get(address))
+ } else if (localVersions.get(address) > remoteVersions.get(address)) {
localIsNewer.add(address);
+ }
}
- if (!localIsOlder.isEmpty())
+ if (!localIsOlder.isEmpty()) {
sendGossipStatusTo(sender, localVersions );
+ }
- if (!localIsNewer.isEmpty())
+ if (!localIsNewer.isEmpty()) {
sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+ }
}
return null;