package org.opendaylight.controller.remote.rpc.registry.gossip;
import akka.actor.ActorRef;
+import akka.actor.ActorRefProvider;
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
-import akka.actor.UntypedActor;
import akka.cluster.Cluster;
+import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.dispatch.Mapper;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
/**
* Gossiper that syncs bucket store across nodes in the cluster.
- * <p>
- * It keeps a local scheduler that periodically sends Gossip ticks to itself to send bucket store's bucket versions
- * to a randomly selected remote gossiper.
- * <p>
- * When bucket versions are received from a remote gossiper, it is compared with bucket store's bucket versions.
- * Which ever buckets are newer locally, are sent to remote gossiper. If any bucket is older in bucket store, a
- * gossip status is sent to remote gossiper so that it can send the newer buckets.
- * <p>
- * When a bucket is received from a remote gossiper, its sent to the bucket store for update.
+ * <p/>
+ * It keeps a local scheduler that periodically sends Gossip ticks to
+ * itself to send bucket store's bucket versions to a randomly selected remote
+ * gossiper.
+ * <p/>
+ * When bucket versions are received from a remote gossiper, it is compared
+ * with bucket store's bucket versions. Which ever buckets are newer
+ * locally, are sent to remote gossiper. If any bucket is older in bucket store,
+ * a gossip status is sent to remote gossiper so that it can send the newer buckets.
+ * <p/>
+ * When a bucket is received from a remote gossiper, its sent to the bucket store
+ * for update.
*
*/
-public class Gossiper extends UntypedActor {
+public class Gossiper extends AbstractUntypedActorWithMetering {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
- Cluster cluster = Cluster.get(getContext().system());
+ private Cluster cluster;
/**
* ActorSystem's address for the current cluster node.
*/
- private Address selfAddress = cluster.selfAddress();
+ private Address selfAddress;
/**
* All known cluster members
private Boolean autoStartGossipTicks = true;
- public Gossiper(){}
+ private RemoteRpcProviderConfig config;
+
+ public Gossiper(){
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ }
/**
* Helpful for testing
- * @param autoStartGossipTicks used for turning off gossip ticks during testing. Gossip tick can be manually sent.
+ * @param autoStartGossipTicks used for turning off gossip ticks during testing.
+ * Gossip tick can be manually sent.
*/
public Gossiper(Boolean autoStartGossipTicks){
this.autoStartGossipTicks = autoStartGossipTicks;
@Override
public void preStart(){
-
- cluster.subscribe(getSelf(),
- ClusterEvent.initialStateAsEvents(),
- ClusterEvent.MemberEvent.class,
- ClusterEvent.UnreachableMember.class);
+ ActorRefProvider provider = getContext().provider();
+ selfAddress = provider.getDefaultAddress();
+
+ if ( provider instanceof ClusterActorRefProvider ) {
+ cluster = Cluster.get(getContext().system());
+ cluster.subscribe(getSelf(),
+ ClusterEvent.initialStateAsEvents(),
+ ClusterEvent.MemberEvent.class,
+ ClusterEvent.UnreachableMember.class);
+ }
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
- new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
+ config.getGossipTickInterval(), //interval
getSelf(), //target
new Messages.GossiperMessages.GossipTick(), //message
getContext().dispatcher(), //execution context
}
@Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
-
+ protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
if (message instanceof GossipTick)
receiveGossipTick();
- //Message from remote gossiper with its bucket versions
+ //Message from remote gossiper with its bucket versions
else if (message instanceof GossipStatus)
receiveGossipStatus((GossipStatus) message);
- //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
- //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
- //message with its local versions
+ //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
+ //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
+ //message with its local versions
else if (message instanceof GossipEnvelope)
receiveGossip((GossipEnvelope) message);
}
clusterMembers.remove(member.address());
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ if(log.isDebugEnabled()) {
+ log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
}
/**
if (!clusterMembers.contains(member.address()))
clusterMembers.add(member.address());
-
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ if(log.isDebugEnabled()) {
+ log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
}
/**
void receiveGossipTick(){
if (clusterMembers.size() == 0) return; //no members to send gossip status to
- Address remoteMemberToGossipTo = null;
+ Address remoteMemberToGossipTo;
if (clusterMembers.size() == 1)
remoteMemberToGossipTo = clusterMembers.get(0);
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
-
- log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+ if(log.isDebugEnabled()) {
+ log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+ }
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
* @param status bucket versions from a remote member
*/
void receiveGossipStatus(GossipStatus status){
- //Dont want to accept messages from non-members
+ //Don't accept messages from non-members
if (!clusterMembers.contains(status.from()))
return;
final ActorRef sender = getSender();
-
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
void receiveGossip(GossipEnvelope envelope){
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- log.info("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ if(log.isDebugEnabled()) {
+ log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ }
return;
}
- if (envelope.getBuckets() == null)
- return; //nothing to do
updateRemoteBuckets(envelope.getBuckets());
*/
void updateRemoteBuckets(Map<Address, Bucket> buckets) {
- if (buckets == null || buckets.isEmpty())
- return; //nothing to merge
-
UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
-
getContext().parent().tell(updateRemoteBuckets, getSelf());
}
*/
void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
-
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
-
}
/**
void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
//Get local status from bucket store and send to remote
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
+ //Find gossiper on remote system
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
- log.debug("Sending bucket versions to [{}]", remoteRef);
+ if(log.isDebugEnabled()) {
+ log.debug("Sending bucket versions to [{}]", remoteRef);
+ }
futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
}
/**
- * Process bucket versions received from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
+ * Process bucket versions received from
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
* Then this method compares remote bucket versions with local bucket versions.
* <ul>
* <li>The buckets that are newer locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} to remote
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
+ * to remote
* <li>The buckets that are older locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} to remote so that
- * remote sends GossipEnvelop.
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
+ * to remote so that remote sends GossipEnvelop.
* </ul>
*
* @param sender the remote member
localIsOlder.add(address);
else if (localVersions.get(address) > remoteVersions.get(address))
localIsNewer.add(address);
- else
- continue;
}
if (!localIsOlder.isEmpty())
}
/**
- * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} that contains
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. These buckets are sent to a remote member encapsulated
- * in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
+ * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
+ * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
+ * These buckets are sent to a remote member encapsulated in
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
*
* @param sender the remote member that sent
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
public Void apply(Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- log.info("Buckets to send from {}: {}", selfAddress, buckets);
+ if(log.isDebugEnabled()) {
+ log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+ }
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}