import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
-import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
-import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
*
*/
-public class Gossiper extends UntypedActor {
+public class Gossiper extends AbstractUntypedActorWithMetering {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private Boolean autoStartGossipTicks = true;
- public Gossiper(){}
+ private RemoteRpcProviderConfig config;
+
+ public Gossiper(){
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ }
/**
* Helpful for testing
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
- ActorUtil.GOSSIP_TICK_INTERVAL, //interval
+ config.getGossipTickInterval(), //interval
getSelf(), //target
new Messages.GossiperMessages.GossipTick(), //message
getContext().dispatcher(), //execution context
}
@Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
-
+ protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
if (message instanceof GossipTick)
receiveGossipTick();
- //Message from remote gossiper with its bucket versions
+ //Message from remote gossiper with its bucket versions
else if (message instanceof GossipStatus)
receiveGossipStatus((GossipStatus) message);
- //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
- //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
- //message with its local versions
+ //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
+ //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
+ //message with its local versions
else if (message instanceof GossipEnvelope)
receiveGossip((GossipEnvelope) message);
void receiveGossipTick(){
if (clusterMembers.size() == 0) return; //no members to send gossip status to
- Address remoteMemberToGossipTo = null;
+ Address remoteMemberToGossipTo;
if (clusterMembers.size() == 1)
remoteMemberToGossipTo = clusterMembers.get(0);
final ActorRef sender = getSender();
Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis());
+ Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
}
//Get local status from bucket store and send to remote
Future<Object> futureReply =
- Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
//Find gossiper on remote system
ActorSelection remoteRef = getContext().system().actorSelection(
localIsOlder.add(address);
else if (localVersions.get(address) > remoteVersions.get(address))
localIsNewer.add(address);
- else
- continue;
}
if (!localIsOlder.isEmpty())