X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=a8bc25c40ba14b2ecf8c45926f7871f189dbdafa;hp=0b64136c49f9bda7bb331c9f5111d2146b91da9d;hb=f4bad4be4f0838122b9799d133878877088e870d;hpb=d7ce7c5acee7e6f7cd7895ceff5af63ac53789ad diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 0b64136c49..a8bc25c40b 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -8,11 +8,13 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import akka.actor.ActorRef; +import akka.actor.ActorRefProvider; import akka.actor.ActorSelection; import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.UntypedActor; import akka.cluster.Cluster; +import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; import akka.cluster.Member; import akka.dispatch.Mapper; @@ -41,15 +43,18 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go /** * Gossiper that syncs bucket store across nodes in the cluster. - *

- * It keeps a local scheduler that periodically sends Gossip ticks to itself to send bucket store's bucket versions - * to a randomly selected remote gossiper. - *

- * When bucket versions are received from a remote gossiper, it is compared with bucket store's bucket versions. - * Which ever buckets are newer locally, are sent to remote gossiper. If any bucket is older in bucket store, a - * gossip status is sent to remote gossiper so that it can send the newer buckets. - *

- * When a bucket is received from a remote gossiper, its sent to the bucket store for update. + *

+ * It keeps a local scheduler that periodically sends Gossip ticks to + * itself to send bucket store's bucket versions to a randomly selected remote + * gossiper. + *

+ * When bucket versions are received from a remote gossiper, it is compared + * with bucket store's bucket versions. Which ever buckets are newer + * locally, are sent to remote gossiper. If any bucket is older in bucket store, + * a gossip status is sent to remote gossiper so that it can send the newer buckets. + *

+ * When a bucket is received from a remote gossiper, its sent to the bucket store + * for update. * */ @@ -57,12 +62,12 @@ public class Gossiper extends UntypedActor { final LoggingAdapter log = Logging.getLogger(getContext().system(), this); - Cluster cluster = Cluster.get(getContext().system()); + private Cluster cluster; /** * ActorSystem's address for the current cluster node. */ - private Address selfAddress = cluster.selfAddress(); + private Address selfAddress; /** * All known cluster members @@ -77,7 +82,8 @@ public class Gossiper extends UntypedActor { /** * Helpful for testing - * @param autoStartGossipTicks used for turning off gossip ticks during testing. Gossip tick can be manually sent. + * @param autoStartGossipTicks used for turning off gossip ticks during testing. + * Gossip tick can be manually sent. */ public Gossiper(Boolean autoStartGossipTicks){ this.autoStartGossipTicks = autoStartGossipTicks; @@ -85,16 +91,21 @@ public class Gossiper extends UntypedActor { @Override public void preStart(){ - - cluster.subscribe(getSelf(), - ClusterEvent.initialStateAsEvents(), - ClusterEvent.MemberEvent.class, - ClusterEvent.UnreachableMember.class); + ActorRefProvider provider = getContext().provider(); + selfAddress = provider.getDefaultAddress(); + + if ( provider instanceof ClusterActorRefProvider ) { + cluster = Cluster.get(getContext().system()); + cluster.subscribe(getSelf(), + ClusterEvent.initialStateAsEvents(), + ClusterEvent.MemberEvent.class, + ClusterEvent.UnreachableMember.class); + } if (autoStartGossipTicks) { gossipTask = getContext().system().scheduler().schedule( new FiniteDuration(1, TimeUnit.SECONDS), //initial delay - new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval + new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval getSelf(), //target new Messages.GossiperMessages.GossipTick(), //message getContext().dispatcher(), //execution context @@ -211,14 +222,12 @@ public class Gossiper extends UntypedActor { * @param status bucket versions from a remote member */ void receiveGossipStatus(GossipStatus status){ - //Dont want to accept messages from non-members + //Don't accept messages from non-members if (!clusterMembers.contains(status.from())) return; final ActorRef sender = getSender(); - Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000); - futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher()); } @@ -231,11 +240,9 @@ public class Gossiper extends UntypedActor { void receiveGossip(GossipEnvelope envelope){ //TODO: Add more validations if (!selfAddress.equals(envelope.to())) { - log.info("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); + log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); return; } - if (envelope.getBuckets() == null) - return; //nothing to do updateRemoteBuckets(envelope.getBuckets()); @@ -248,11 +255,7 @@ public class Gossiper extends UntypedActor { */ void updateRemoteBuckets(Map buckets) { - if (buckets == null || buckets.isEmpty()) - return; //nothing to merge - UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets); - getContext().parent().tell(updateRemoteBuckets, getSelf()); } @@ -265,9 +268,7 @@ public class Gossiper extends UntypedActor { void sendGossipTo(final ActorRef remote, final Set
addresses){ Future futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000); - futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher()); - } /** @@ -279,7 +280,6 @@ public class Gossiper extends UntypedActor { //Get local status from bucket store and send to remote Future futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000); - ActorSelection remoteRef = getContext().system().actorSelection( remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress()); @@ -328,14 +328,16 @@ public class Gossiper extends UntypedActor { } /** - * Process bucket versions received from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}. + * Process bucket versions received from + * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}. * Then this method compares remote bucket versions with local bucket versions. *
    *
  • The buckets that are newer locally, send - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} to remote + * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} + * to remote *
  • The buckets that are older locally, send - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} to remote so that - * remote sends GossipEnvelop. + * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} + * to remote so that remote sends GossipEnvelop. *
* * @param sender the remote member @@ -390,9 +392,10 @@ public class Gossiper extends UntypedActor { } /** - * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} that contains - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. These buckets are sent to a remote member encapsulated - * in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} + * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} + * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. + * These buckets are sent to a remote member encapsulated in + * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} * * @param sender the remote member that sent * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} @@ -407,7 +410,7 @@ public class Gossiper extends UntypedActor { public Void apply(Object msg) { if (msg instanceof GetBucketsByMembersReply) { Map buckets = ((GetBucketsByMembersReply) msg).getBuckets(); - log.info("Buckets to send from {}: {}", selfAddress, buckets); + log.debug("Buckets to send from {}: {}", selfAddress, buckets); GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets); sender.tell(envelope, getSelf()); }