X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=inline;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=85c6ebe26f859e0751122d4ab60065a0f1b48aba;hb=25b805c6685467f561506dbb5187a744fc12096b;hp=0b64136c49f9bda7bb331c9f5111d2146b91da9d;hpb=d7ce7c5acee7e6f7cd7895ceff5af63ac53789ad;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 0b64136c49..85c6ebe26f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -8,17 +8,20 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import akka.actor.ActorRef; +import akka.actor.ActorRefProvider; import akka.actor.ActorSelection; import akka.actor.Address; import akka.actor.Cancellable; -import akka.actor.UntypedActor; import akka.cluster.Cluster; +import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; import akka.cluster.Member; import akka.dispatch.Mapper; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.pattern.Patterns; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -41,28 +44,31 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go /** * Gossiper that syncs bucket store across nodes in the cluster. - *
- * It keeps a local scheduler that periodically sends Gossip ticks to itself to send bucket store's bucket versions - * to a randomly selected remote gossiper. - *
- * When bucket versions are received from a remote gossiper, it is compared with bucket store's bucket versions. - * Which ever buckets are newer locally, are sent to remote gossiper. If any bucket is older in bucket store, a - * gossip status is sent to remote gossiper so that it can send the newer buckets. - *
- * When a bucket is received from a remote gossiper, its sent to the bucket store for update. + *
+ * It keeps a local scheduler that periodically sends Gossip ticks to + * itself to send bucket store's bucket versions to a randomly selected remote + * gossiper. + * + * When bucket versions are received from a remote gossiper, it is compared + * with bucket store's bucket versions. Which ever buckets are newer + * locally, are sent to remote gossiper. If any bucket is older in bucket store, + * a gossip status is sent to remote gossiper so that it can send the newer buckets. + * + * When a bucket is received from a remote gossiper, its sent to the bucket store + * for update. * */ -public class Gossiper extends UntypedActor { +public class Gossiper extends AbstractUntypedActorWithMetering { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - Cluster cluster = Cluster.get(getContext().system()); + private Cluster cluster; /** * ActorSystem's address for the current cluster node. */ - private Address selfAddress = cluster.selfAddress(); + private Address selfAddress; /** * All known cluster members @@ -73,11 +79,16 @@ public class Gossiper extends UntypedActor { private Boolean autoStartGossipTicks = true; - public Gossiper(){} + private RemoteRpcProviderConfig config; + + public Gossiper(){ + config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + } /** * Helpful for testing - * @param autoStartGossipTicks used for turning off gossip ticks during testing. Gossip tick can be manually sent. + * @param autoStartGossipTicks used for turning off gossip ticks during testing. + * Gossip tick can be manually sent. */ public Gossiper(Boolean autoStartGossipTicks){ this.autoStartGossipTicks = autoStartGossipTicks; @@ -85,16 +96,21 @@ public class Gossiper extends UntypedActor { @Override public void preStart(){ - - cluster.subscribe(getSelf(), - ClusterEvent.initialStateAsEvents(), - ClusterEvent.MemberEvent.class, - ClusterEvent.UnreachableMember.class); + ActorRefProvider provider = getContext().provider(); + selfAddress = provider.getDefaultAddress(); + + if ( provider instanceof ClusterActorRefProvider ) { + cluster = Cluster.get(getContext().system()); + cluster.subscribe(getSelf(), + ClusterEvent.initialStateAsEvents(), + ClusterEvent.MemberEvent.class, + ClusterEvent.UnreachableMember.class); + } if (autoStartGossipTicks) { gossipTask = getContext().system().scheduler().schedule( new FiniteDuration(1, TimeUnit.SECONDS), //initial delay - new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval + config.getGossipTickInterval(), //interval getSelf(), //target new Messages.GossiperMessages.GossipTick(), //message getContext().dispatcher(), //execution context @@ -112,22 +128,19 @@ public class Gossiper extends UntypedActor { } @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: node[{}], message[{}]", selfAddress, message); - + protected void handleReceive(Object message) throws Exception { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing if (message instanceof GossipTick) receiveGossipTick(); - //Message from remote gossiper with its bucket versions + //Message from remote gossiper with its bucket versions else if (message instanceof GossipStatus) receiveGossipStatus((GossipStatus) message); - //Message from remote gossiper with buckets. This is usually in response to GossipStatus message - //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus - //message with its local versions + //Message from remote gossiper with buckets. This is usually in response to GossipStatus message + //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus + //message with its local versions else if (message instanceof GossipEnvelope) receiveGossip((GossipEnvelope) message); @@ -184,7 +197,7 @@ public class Gossiper extends UntypedActor { void receiveGossipTick(){ if (clusterMembers.size() == 0) return; //no members to send gossip status to - Address remoteMemberToGossipTo = null; + Address remoteMemberToGossipTo; if (clusterMembers.size() == 1) remoteMemberToGossipTo = clusterMembers.get(0); @@ -211,13 +224,13 @@ public class Gossiper extends UntypedActor { * @param status bucket versions from a remote member */ void receiveGossipStatus(GossipStatus status){ - //Dont want to accept messages from non-members + //Don't accept messages from non-members if (!clusterMembers.contains(status.from())) return; final ActorRef sender = getSender(); - - Future