package org.opendaylight.controller.remote.rpc.registry.gossip;
import akka.actor.ActorRef;
+import akka.actor.ActorRefProvider;
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
-import akka.actor.UntypedActor;
import akka.cluster.Cluster;
+import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.dispatch.Mapper;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
+import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
/**
* Gossiper that syncs bucket store across nodes in the cluster.
*
*/
-public class Gossiper extends UntypedActor {
+public class Gossiper extends AbstractUntypedActorWithMetering {
- final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ private final Logger log = LoggerFactory.getLogger(getClass());
- Cluster cluster = Cluster.get(getContext().system());
+ private Cluster cluster;
/**
* ActorSystem's address for the current cluster node.
*/
- private Address selfAddress = cluster.selfAddress();
+ private Address selfAddress;
/**
* All known cluster members
private Boolean autoStartGossipTicks = true;
- public Gossiper(){}
+ private final RemoteRpcProviderConfig config;
+
+ public Gossiper(RemoteRpcProviderConfig config){
+ this.config = Preconditions.checkNotNull(config);
+ }
/**
* Helpful for testing
* @param autoStartGossipTicks used for turning off gossip ticks during testing.
* Gossip tick can be manually sent.
*/
- public Gossiper(Boolean autoStartGossipTicks){
+ public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config){
+ this(config);
this.autoStartGossipTicks = autoStartGossipTicks;
}
@Override
public void preStart(){
-
- cluster.subscribe(getSelf(),
- ClusterEvent.initialStateAsEvents(),
- ClusterEvent.MemberEvent.class,
- ClusterEvent.UnreachableMember.class);
+ ActorRefProvider provider = getContext().provider();
+ selfAddress = provider.getDefaultAddress();
+
+ if ( provider instanceof ClusterActorRefProvider ) {
+ cluster = Cluster.get(getContext().system());
+ cluster.subscribe(getSelf(),
+ ClusterEvent.initialStateAsEvents(),
+ ClusterEvent.MemberEvent.class,
+ ClusterEvent.UnreachableMember.class);
+ }
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
- new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
+ config.getGossipTickInterval(), //interval
getSelf(), //target
new Messages.GossiperMessages.GossipTick(), //message
getContext().dispatcher(), //execution context
@Override
public void postStop(){
- if (cluster != null)
+ if (cluster != null) {
cluster.unsubscribe(getSelf());
- if (gossipTask != null)
+ }
+ if (gossipTask != null) {
gossipTask.cancel();
+ }
}
@Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
-
+ protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
- if (message instanceof GossipTick)
+ if (message instanceof GossipTick) {
receiveGossipTick();
-
- //Message from remote gossiper with its bucket versions
- else if (message instanceof GossipStatus)
+ } else if (message instanceof GossipStatus) {
+ // Message from remote gossiper with its bucket versions
receiveGossipStatus((GossipStatus) message);
-
- //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
- //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
- //message with its local versions
- else if (message instanceof GossipEnvelope)
+ } else if (message instanceof GossipEnvelope) {
+ // Message from remote gossiper with buckets. This is usually in response to GossipStatus
+ // message. The contained buckets are newer as determined by the remote gossiper by
+ // comparing the GossipStatus message with its local versions.
receiveGossip((GossipEnvelope) message);
-
- else if (message instanceof ClusterEvent.MemberUp) {
+ } else if (message instanceof ClusterEvent.MemberUp) {
receiveMemberUp(((ClusterEvent.MemberUp) message).member());
} else if (message instanceof ClusterEvent.MemberRemoved) {
} else if ( message instanceof ClusterEvent.UnreachableMember){
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
- } else
+ } else {
unhandled(message);
+ }
}
/**
}
clusterMembers.remove(member.address());
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ if(log.isDebugEnabled()) {
+ log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
}
/**
*/
void receiveMemberUp(Member member) {
- if (selfAddress.equals(member.address()))
+ if (selfAddress.equals(member.address())) {
return; //ignore up notification for self
+ }
- if (!clusterMembers.contains(member.address()))
+ if (!clusterMembers.contains(member.address())) {
clusterMembers.add(member.address());
-
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
+ if(log.isDebugEnabled()) {
+ log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
}
/**
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
void receiveGossipTick(){
- if (clusterMembers.size() == 0) return; //no members to send gossip status to
+ if (clusterMembers.size() == 0) {
+ return; //no members to send gossip status to
+ }
- Address remoteMemberToGossipTo = null;
+ Address remoteMemberToGossipTo;
- if (clusterMembers.size() == 1)
+ if (clusterMembers.size() == 1) {
remoteMemberToGossipTo = clusterMembers.get(0);
- else {
+ } else {
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
-
- log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+ if(log.isTraceEnabled()) {
+ log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
+ }
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
*/
void receiveGossipStatus(GossipStatus status){
//Don't accept messages from non-members
- if (!clusterMembers.contains(status.from()))
+ if (!clusterMembers.contains(status.from())) {
return;
+ }
final ActorRef sender = getSender();
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
+
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
}
void receiveGossip(GossipEnvelope envelope){
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ if(log.isTraceEnabled()) {
+ log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ }
return;
}
*/
void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
}
void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
//Get local status from bucket store and send to remote
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
+
+ //Find gossiper on remote system
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
- log.debug("Sending bucket versions to [{}]", remoteRef);
+ if(log.isTraceEnabled()) {
+ log.trace("Sending bucket versions to [{}]", remoteRef);
+ }
futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
for (Address address : remoteVersions.keySet()){
- if (localVersions.get(address) == null || remoteVersions.get(address) == null)
+ if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
continue; //this condition is taken care of by above diffs
- if (localVersions.get(address) < remoteVersions.get(address))
+ }
+ if (localVersions.get(address) < remoteVersions.get(address)) {
localIsOlder.add(address);
- else if (localVersions.get(address) > remoteVersions.get(address))
+ } else if (localVersions.get(address) > remoteVersions.get(address)) {
localIsNewer.add(address);
- else
- continue;
+ }
}
- if (!localIsOlder.isEmpty())
+ if (!localIsOlder.isEmpty()) {
sendGossipStatusTo(sender, localVersions );
+ }
- if (!localIsNewer.isEmpty())
+ if (!localIsNewer.isEmpty()) {
sendGossipTo(sender, localIsNewer);//send newer buckets to remote
+ }
}
return null;
public Void apply(Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+ if(log.isTraceEnabled()) {
+ log.trace("Buckets to send from {}: {}", selfAddress, buckets);
+ }
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}