X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=2a4e3b7f93252b2c411025b777defdbfb287830c;hb=efff2ad1ea02712f00013aa3b40529ceecf5e29b;hp=33b3f6e813fb95cd1f4090f53255949baee716a7;hpb=8426e7a67b1235e8ecc67b1a98a5bd096c88e729;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 33b3f6e813..2a4e3b7f93 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -17,31 +17,21 @@ import akka.cluster.Cluster; import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; import akka.cluster.Member; -import akka.dispatch.Mapper; -import akka.pattern.Patterns; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Verify; +import com.google.common.collect.Maps; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; -import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; /** @@ -63,6 +53,13 @@ import scala.concurrent.duration.FiniteDuration; * for update. */ public class Gossiper extends AbstractUntypedActorWithMetering { + private static final Object GOSSIP_TICK = new Object() { + @Override + public String toString() { + return "gossip tick"; + } + }; + private final boolean autoStartGossipTicks; private final RemoteRpcProviderConfig config; @@ -85,6 +82,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering { private Cancellable gossipTask; + private BucketStoreAccess bucketStore; + Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) { this.config = Preconditions.checkNotNull(config); this.autoStartGossipTicks = autoStartGossipTicks.booleanValue(); @@ -107,7 +106,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); - if (provider instanceof ClusterActorRefProvider ) { + bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration()); + + if (provider instanceof ClusterActorRefProvider) { cluster = Cluster.get(getContext().system()); cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), @@ -121,7 +122,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { new FiniteDuration(1, TimeUnit.SECONDS), //initial delay config.getGossipTickInterval(), //interval getSelf(), //target - new Messages.GossiperMessages.GossipTick(), //message + GOSSIP_TICK, //message getContext().dispatcher(), //execution context getSelf() //sender ); @@ -138,12 +139,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering { } } - @SuppressWarnings({ "rawtypes", "unchecked" }) @Override protected void handleReceive(final Object message) throws Exception { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing - if (message instanceof GossipTick) { + if (GOSSIP_TICK.equals(message)) { receiveGossipTick(); } else if (message instanceof GossipStatus) { // Message from remote gossiper with its bucket versions @@ -176,6 +176,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param member who went down */ private void receiveMemberRemoveOrUnreachable(final Member member) { + LOG.debug("Received memberDown or Unreachable: {}", member); + //if its self, then stop itself if (selfAddress.equals(member.address())) { getContext().stop(getSelf()); @@ -197,7 +199,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { private void removePeer(final Address address) { clusterMembers.remove(address); peers.remove(address); - getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender()); + bucketStore.removeRemoteBucket(address); } /** @@ -206,6 +208,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param member the member to add */ private void receiveMemberUpOrReachable(final Member member) { + LOG.debug("Received memberUp or reachable: {}", member); + //ignore up notification for self if (selfAddress.equals(member.address())) { return; @@ -258,15 +262,53 @@ public class Gossiper extends AbstractUntypedActorWithMetering { @VisibleForTesting void receiveGossipStatus(final GossipStatus status) { // Don't accept messages from non-members - if (!peers.containsKey(status.from())) { - return; + if (peers.containsKey(status.from())) { + // FIXME: sender should be part of GossipStatus + final ActorRef sender = getSender(); + bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions)); } + } + + private void processRemoteStatus(final ActorRef remote, final GossipStatus status, + final Map localVersions) { + final Map remoteVersions = status.versions(); + + //diff between remote list and local + final Set
localIsOlder = new HashSet<>(remoteVersions.keySet()); + localIsOlder.removeAll(localVersions.keySet()); + + //diff between local list and remote + final Set
localIsNewer = new HashSet<>(localVersions.keySet()); + localIsNewer.removeAll(remoteVersions.keySet()); - final ActorRef sender = getSender(); - Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); - futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher()); + for (Entry entry : remoteVersions.entrySet()) { + Address address = entry.getKey(); + Long remoteVersion = entry.getValue(); + Long localVersion = localVersions.get(address); + if (localVersion == null || remoteVersion == null) { + //this condition is taken care of by above diffs + continue; + } + + if (localVersion < remoteVersion) { + localIsOlder.add(address); + } else if (localVersion > remoteVersion) { + localIsNewer.add(address); + } + } + + if (!localIsOlder.isEmpty()) { + remote.tell(new GossipStatus(selfAddress, localVersions), getSelf()); + } + + if (!localIsNewer.isEmpty()) { + //send newer buckets to remote + bucketStore.getBucketsByMembers(localIsNewer, buckets -> { + LOG.trace("Buckets to send from {}: {}", selfAddress, buckets); + remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf()); + }); + } } /** @@ -275,14 +317,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param envelope contains buckets from a remote gossiper */ @VisibleForTesting - > void receiveGossip(final GossipEnvelope envelope) { + void receiveGossip(final GossipEnvelope envelope) { //TODO: Add more validations if (!selfAddress.equals(envelope.to())) { LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to()); return; } - updateRemoteBuckets(envelope.getBuckets()); + updateRemoteBuckets(envelope.buckets()); } /** @@ -291,21 +333,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param buckets map of Buckets to update */ @VisibleForTesting - > void updateRemoteBuckets(final Map> buckets) { - getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf()); - } - - /** - * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper. - * - * @param remote remote node to send Buckets to - * @param addresses node addresses whose buckets needs to be sent - */ - void sendGossipTo(final ActorRef remote, final Set
addresses) { - - Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration()); - futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher()); + void updateRemoteBuckets(final Map> buckets) { + // filter this so we only handle buckets for known peers + bucketStore.updateRemoteBuckets(Maps.filterKeys(buckets, peers::containsKey)); } /** @@ -315,133 +345,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ @VisibleForTesting void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) { - - //Get local status from bucket store and send to remote - Future futureReply = - Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration()); - - LOG.trace("Sending bucket versions to [{}]", remoteGossiper); - - futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher()); - } - - /// - /// Private factories to create mappers - /// - - private Mapper getMapperToSendLocalStatus(final ActorSelection remote) { - - return new Mapper() { - @Override - public Void apply(final Object replyMessage) { - if (replyMessage instanceof GetBucketVersionsReply) { - GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; - Map localVersions = reply.getVersions(); - - remote.tell(new GossipStatus(selfAddress, localVersions), getSelf()); - } - return null; - } - }; - } - - /** - * Process bucket versions received from - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}. - * Then this method compares remote bucket versions with local bucket versions. - *
    - *
  • The buckets that are newer locally, send - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} - * to remote - *
  • The buckets that are older locally, send - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} - * to remote so that remote sends GossipEnvelop. - *
- * - * @param sender the remote member - * @param status bucket versions from a remote member - * @return a {@link akka.dispatch.Mapper} that gets evaluated in future - * - */ - private Mapper getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) { - - final Map remoteVersions = status.getVersions(); - - return new Mapper() { - @Override - public Void apply(final Object replyMessage) { - if (replyMessage instanceof GetBucketVersionsReply) { - GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage; - Map localVersions = reply.getVersions(); - - //diff between remote list and local - Set
localIsOlder = new HashSet<>(); - localIsOlder.addAll(remoteVersions.keySet()); - localIsOlder.removeAll(localVersions.keySet()); - - //diff between local list and remote - Set
localIsNewer = new HashSet<>(); - localIsNewer.addAll(localVersions.keySet()); - localIsNewer.removeAll(remoteVersions.keySet()); - - - for (Map.Entry entry : remoteVersions.entrySet()) { - Address address = entry.getKey(); - Long remoteVersion = entry.getValue(); - Long localVersion = localVersions.get(address); - if (localVersion == null || remoteVersion == null) { - //this condition is taken care of by above diffs - continue; - } - - if (localVersion < remoteVersion) { - localIsOlder.add(address); - } else if (localVersion > remoteVersion) { - localIsNewer.add(address); - } - } - - if (!localIsOlder.isEmpty()) { - sender.tell(new GossipStatus(selfAddress, localVersions), getSelf()); - } - - if (!localIsNewer.isEmpty()) { - //send newer buckets to remote - sendGossipTo(sender, localIsNewer); - } - } - return null; - } - }; - } - - /** - * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} - * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. - * These buckets are sent to a remote member encapsulated in - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} - * - * @param sender the remote member that sent - * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} - * in reply to which bucket is being sent back - * @return a {@link akka.dispatch.Mapper} that gets evaluated in future - * - */ - private Mapper getMapperToSendGossip(final ActorRef sender) { - - return new Mapper() { - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public Void apply(final Object msg) { - if (msg instanceof GetBucketsByMembersReply) { - Map> buckets = ((GetBucketsByMembersReply) msg).getBuckets(); - LOG.trace("Buckets to send from {}: {}", selfAddress, buckets); - GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets); - sender.tell(envelope, getSelf()); - } - return null; - } - }; + bucketStore.getBucketVersions(versions -> { + LOG.trace("Sending bucket versions to [{}]", remoteGossiper); + /* + * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring, + * but can we identify which bucket is the local one? + */ + remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf()); + }); } ///