X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=85c6ebe26f859e0751122d4ab60065a0f1b48aba;hp=a8bc25c40ba14b2ecf8c45926f7871f189dbdafa;hb=25b805c6685467f561506dbb5187a744fc12096b;hpb=ca350882dfe7f0f8c5ac2b8a82c8441258f960a9 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index a8bc25c40b..85c6ebe26f 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -12,7 +12,6 @@ import akka.actor.ActorRefProvider; import akka.actor.ActorSelection; import akka.actor.Address; import akka.actor.Cancellable; -import akka.actor.UntypedActor; import akka.cluster.Cluster; import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; @@ -21,6 +20,8 @@ import akka.dispatch.Mapper; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.pattern.Patterns; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -58,7 +59,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go * */ -public class Gossiper extends UntypedActor { +public class Gossiper extends AbstractUntypedActorWithMetering { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); @@ -78,7 +79,11 @@ public class Gossiper extends UntypedActor { private Boolean autoStartGossipTicks = true; - public Gossiper(){} + private RemoteRpcProviderConfig config; + + public Gossiper(){ + config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + } /** * Helpful for testing @@ -105,7 +110,7 @@ public class Gossiper extends UntypedActor { if (autoStartGossipTicks) { gossipTask = getContext().system().scheduler().schedule( new FiniteDuration(1, TimeUnit.SECONDS), //initial delay - new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval + config.getGossipTickInterval(), //interval getSelf(), //target new Messages.GossiperMessages.GossipTick(), //message getContext().dispatcher(), //execution context @@ -123,22 +128,19 @@ public class Gossiper extends UntypedActor { } @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: node[{}], message[{}]", selfAddress, message); - + protected void handleReceive(Object message) throws Exception { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing if (message instanceof GossipTick) receiveGossipTick(); - //Message from remote gossiper with its bucket versions + //Message from remote gossiper with its bucket versions else if (message instanceof GossipStatus) receiveGossipStatus((GossipStatus) message); - //Message from remote gossiper with buckets. This is usually in response to GossipStatus message - //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus - //message with its local versions + //Message from remote gossiper with buckets. This is usually in response to GossipStatus message + //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus + //message with its local versions else if (message instanceof GossipEnvelope) receiveGossip((GossipEnvelope) message); @@ -195,7 +197,7 @@ public class Gossiper extends UntypedActor { void receiveGossipTick(){ if (clusterMembers.size() == 0) return; //no members to send gossip status to - Address remoteMemberToGossipTo = null; + Address remoteMemberToGossipTo; if (clusterMembers.size() == 1) remoteMemberToGossipTo = clusterMembers.get(0); @@ -227,7 +229,9 @@ public class Gossiper extends UntypedActor { return; final ActorRef sender = getSender(); - Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000); + Future futureReply = + Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); + futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher()); } @@ -267,7 +271,8 @@ public class Gossiper extends UntypedActor { */ void sendGossipTo(final ActorRef remote, final Set
addresses){ - Future futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000); + Future futureReply = + Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration()); futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher()); } @@ -279,7 +284,10 @@ public class Gossiper extends UntypedActor { void getLocalStatusAndSendTo(Address remoteActorSystemAddress){ //Get local status from bucket store and send to remote - Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000); + Future futureReply = + Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); + + //Find gossiper on remote system ActorSelection remoteRef = getContext().system().actorSelection( remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress()); @@ -375,8 +383,6 @@ public class Gossiper extends UntypedActor { localIsOlder.add(address); else if (localVersions.get(address) > remoteVersions.get(address)) localIsNewer.add(address); - else - continue; } if (!localIsOlder.isEmpty())