X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=8af1c83c558a0a6b4842b9cb7accfa0a6e68a79b;hp=f6ce5e55f3ee63602fc92529e92d6e93d0ff9bb3;hb=3997099eb61b0f2adc47f7a85952c324e9de223f;hpb=287b1d1ecec3264c192b1007019bfcadf6cb4311 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index f6ce5e55f3..8af1c83c55 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -12,19 +12,12 @@ import akka.actor.ActorRefProvider; import akka.actor.ActorSelection; import akka.actor.Address; import akka.actor.Cancellable; -import akka.actor.UntypedActor; import akka.cluster.Cluster; import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; import akka.cluster.Member; import akka.dispatch.Mapper; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.pattern.Patterns; -import org.opendaylight.controller.remote.rpc.utils.ActorUtil; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -32,15 +25,20 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * Gossiper that syncs bucket store across nodes in the cluster. @@ -59,9 +57,9 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go * */ -public class Gossiper extends UntypedActor { +public class Gossiper extends AbstractUntypedActorWithMetering { - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + private final Logger log = LoggerFactory.getLogger(getClass()); private Cluster cluster; @@ -79,7 +77,11 @@ public class Gossiper extends UntypedActor { private Boolean autoStartGossipTicks = true; - public Gossiper(){} + private RemoteRpcProviderConfig config; + + public Gossiper(){ + config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + } /** * Helpful for testing @@ -106,7 +108,7 @@ public class Gossiper extends UntypedActor { if (autoStartGossipTicks) { gossipTask = getContext().system().scheduler().schedule( new FiniteDuration(1, TimeUnit.SECONDS), //initial delay - ActorUtil.GOSSIP_TICK_INTERVAL, //interval + config.getGossipTickInterval(), //interval getSelf(), //target new Messages.GossiperMessages.GossipTick(), //message getContext().dispatcher(), //execution context @@ -117,33 +119,29 @@ public class Gossiper extends UntypedActor { @Override public void postStop(){ - if (cluster != null) + if (cluster != null) { cluster.unsubscribe(getSelf()); - if (gossipTask != null) + } + if (gossipTask != null) { gossipTask.cancel(); + } } @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: node[{}], message[{}]", selfAddress, message); - + protected void handleReceive(Object message) throws Exception { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing - if (message instanceof GossipTick) + if (message instanceof GossipTick) { receiveGossipTick(); - - //Message from remote gossiper with its bucket versions - else if (message instanceof GossipStatus) + } else if (message instanceof GossipStatus) { + // Message from remote gossiper with its bucket versions receiveGossipStatus((GossipStatus) message); - - //Message from remote gossiper with buckets. This is usually in response to GossipStatus message - //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus - //message with its local versions - else if (message instanceof GossipEnvelope) + } else if (message instanceof GossipEnvelope) { + // Message from remote gossiper with buckets. This is usually in response to GossipStatus + // message. The contained buckets are newer as determined by the remote gossiper by + // comparing the GossipStatus message with its local versions. receiveGossip((GossipEnvelope) message); - - else if (message instanceof ClusterEvent.MemberUp) { + } else if (message instanceof ClusterEvent.MemberUp) { receiveMemberUp(((ClusterEvent.MemberUp) message).member()); } else if (message instanceof ClusterEvent.MemberRemoved) { @@ -152,8 +150,9 @@ public class Gossiper extends UntypedActor { } else if ( message instanceof ClusterEvent.UnreachableMember){ receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member()); - } else + } else { unhandled(message); + } } /** @@ -169,7 +168,9 @@ public class Gossiper extends UntypedActor { } clusterMembers.remove(member.address()); - log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + if(log.isDebugEnabled()) { + log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers); + } } /** @@ -178,13 +179,16 @@ public class Gossiper extends UntypedActor { */ void receiveMemberUp(Member member) { - if (selfAddress.equals(member.address())) + if (selfAddress.equals(member.address())) { return; //ignore up notification for self + } - if (!clusterMembers.contains(member.address())) + if (!clusterMembers.contains(member.address())) { clusterMembers.add(member.address()); - - log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); + } + if(log.isDebugEnabled()) { + log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); + } } /** @@ -194,18 +198,21 @@ public class Gossiper extends UntypedActor { * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it. */ void receiveGossipTick(){ - if (clusterMembers.size() == 0) return; //no members to send gossip status to + if (clusterMembers.size() == 0) { + return; //no members to send gossip status to + } - Address remoteMemberToGossipTo = null; + Address remoteMemberToGossipTo; - if (clusterMembers.size() == 1) + if (clusterMembers.size() == 1) { remoteMemberToGossipTo = clusterMembers.get(0); - else { + } else { Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); remoteMemberToGossipTo = clusterMembers.get(randomIndex); } - - log.debug("Gossiping to [{}]", remoteMemberToGossipTo); + if(log.isDebugEnabled()) { + log.debug("Gossiping to [{}]", remoteMemberToGossipTo); + } getLocalStatusAndSendTo(remoteMemberToGossipTo); } @@ -224,12 +231,13 @@ public class Gossiper extends UntypedActor { */ void receiveGossipStatus(GossipStatus status){ //Don't accept messages from non-members - if (!clusterMembers.contains(status.from())) + if (!clusterMembers.contains(status.from())) { return; + } final ActorRef sender = getSender(); Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis()); + Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher()); @@ -243,7 +251,9 @@ public class Gossiper extends UntypedActor { void receiveGossip(GossipEnvelope envelope){ //TODO: Add more validations if (!selfAddress.equals(envelope.to())) { - log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); + if(log.isDebugEnabled()) { + log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); + } return; } @@ -271,7 +281,7 @@ public class Gossiper extends UntypedActor { void sendGossipTo(final ActorRef remote, final Set
addresses){ Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis()); + Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration()); futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher()); } @@ -284,13 +294,15 @@ public class Gossiper extends UntypedActor { //Get local status from bucket store and send to remote Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis()); + Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); //Find gossiper on remote system ActorSelection remoteRef = getContext().system().actorSelection( remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress()); - log.debug("Sending bucket versions to [{}]", remoteRef); + if(log.isDebugEnabled()) { + log.debug("Sending bucket versions to [{}]", remoteRef); + } futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher()); @@ -376,21 +388,23 @@ public class Gossiper extends UntypedActor { for (Address address : remoteVersions.keySet()){ - if (localVersions.get(address) == null || remoteVersions.get(address) == null) + if (localVersions.get(address) == null || remoteVersions.get(address) == null) { continue; //this condition is taken care of by above diffs - if (localVersions.get(address) < remoteVersions.get(address)) + } + if (localVersions.get(address) < remoteVersions.get(address)) { localIsOlder.add(address); - else if (localVersions.get(address) > remoteVersions.get(address)) + } else if (localVersions.get(address) > remoteVersions.get(address)) { localIsNewer.add(address); - else - continue; + } } - if (!localIsOlder.isEmpty()) + if (!localIsOlder.isEmpty()) { sendGossipStatusTo(sender, localVersions ); + } - if (!localIsNewer.isEmpty()) + if (!localIsNewer.isEmpty()) { sendGossipTo(sender, localIsNewer);//send newer buckets to remote + } } return null; @@ -417,7 +431,9 @@ public class Gossiper extends UntypedActor { public Void apply(Object msg) { if (msg instanceof GetBucketsByMembersReply) { Map buckets = ((GetBucketsByMembersReply) msg).getBuckets(); - log.debug("Buckets to send from {}: {}", selfAddress, buckets); + if(log.isDebugEnabled()) { + log.debug("Buckets to send from {}: {}", selfAddress, buckets); + } GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets); sender.tell(envelope, getSelf()); }