X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=40be108b2e40a5ae821b24bbd50f11c8f04654ed;hb=HEAD;hp=33b3f6e813fb95cd1f4090f53255949baee716a7;hpb=8426e7a67b1235e8ecc67b1a98a5bd096c88e729;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 33b3f6e813..40be108b2e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -7,6 +7,9 @@ */ package org.opendaylight.controller.remote.rpc.registry.gossip; +import static com.google.common.base.Verify.verifyNotNull; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.ActorRefProvider; import akka.actor.ActorSelection; @@ -17,31 +20,19 @@ import akka.cluster.Cluster; import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; import akka.cluster.Member; -import akka.dispatch.Mapper; -import akka.pattern.Patterns; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.base.Verify; +import com.google.common.collect.Maps; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; -import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; -import scala.concurrent.Future; +import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig; import scala.concurrent.duration.FiniteDuration; /** @@ -63,8 +54,15 @@ import scala.concurrent.duration.FiniteDuration; * for update. */ public class Gossiper extends AbstractUntypedActorWithMetering { + private static final Object GOSSIP_TICK = new Object() { + @Override + public String toString() { + return "gossip tick"; + } + }; + private final boolean autoStartGossipTicks; - private final RemoteRpcProviderConfig config; + private final RemoteOpsProviderConfig config; /** * All known cluster members. @@ -85,20 +83,22 @@ public class Gossiper extends AbstractUntypedActorWithMetering { private Cancellable gossipTask; - Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) { - this.config = Preconditions.checkNotNull(config); - this.autoStartGossipTicks = autoStartGossipTicks.booleanValue(); + private BucketStoreAccess bucketStore; + + Gossiper(final RemoteOpsProviderConfig config, final Boolean autoStartGossipTicks) { + this.config = requireNonNull(config); + this.autoStartGossipTicks = autoStartGossipTicks; } - Gossiper(final RemoteRpcProviderConfig config) { + Gossiper(final RemoteOpsProviderConfig config) { this(config, Boolean.TRUE); } - public static Props props(final RemoteRpcProviderConfig config) { + public static Props props(final RemoteOpsProviderConfig config) { return Props.create(Gossiper.class, config); } - static Props testProps(final RemoteRpcProviderConfig config) { + static Props testProps(final RemoteOpsProviderConfig config) { return Props.create(Gossiper.class, config, Boolean.FALSE); } @@ -107,7 +107,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); - if (provider instanceof ClusterActorRefProvider ) { + bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration()); + + if (provider instanceof ClusterActorRefProvider) { cluster = Cluster.get(getContext().system()); cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), @@ -117,14 +119,19 @@ public class Gossiper extends AbstractUntypedActorWithMetering { } if (autoStartGossipTicks) { - gossipTask = getContext().system().scheduler().schedule( - new FiniteDuration(1, TimeUnit.SECONDS), //initial delay - config.getGossipTickInterval(), //interval - getSelf(), //target - new Messages.GossiperMessages.GossipTick(), //message - getContext().dispatcher(), //execution context - getSelf() //sender - ); + gossipTask = getContext().system().scheduler().scheduleAtFixedRate( + // initial delay + new FiniteDuration(1, TimeUnit.SECONDS), + // interval + config.getGossipTickInterval(), + // target + getSelf(), + // message + GOSSIP_TICK, + // execution context + getContext().dispatcher(), + // sender + getSelf()); } } @@ -138,32 +145,31 @@ public class Gossiper extends AbstractUntypedActorWithMetering { } } - @SuppressWarnings({ "rawtypes", "unchecked" }) @Override - protected void handleReceive(final Object message) throws Exception { + protected void handleReceive(final Object message) { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing - if (message instanceof GossipTick) { + if (GOSSIP_TICK.equals(message)) { receiveGossipTick(); - } else if (message instanceof GossipStatus) { + } else if (message instanceof GossipStatus status) { // Message from remote gossiper with its bucket versions - receiveGossipStatus((GossipStatus) message); - } else if (message instanceof GossipEnvelope) { + receiveGossipStatus(status); + } else if (message instanceof GossipEnvelope envelope) { // Message from remote gossiper with buckets. This is usually in response to GossipStatus // message. The contained buckets are newer as determined by the remote gossiper by // comparing the GossipStatus message with its local versions. - receiveGossip((GossipEnvelope) message); - } else if (message instanceof ClusterEvent.MemberUp) { - receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member()); + receiveGossip(envelope); + } else if (message instanceof ClusterEvent.MemberUp memberUp) { + receiveMemberUpOrReachable(memberUp.member()); - } else if (message instanceof ClusterEvent.ReachableMember) { - receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member()); + } else if (message instanceof ClusterEvent.ReachableMember reachableMember) { + receiveMemberUpOrReachable(reachableMember.member()); - } else if (message instanceof ClusterEvent.MemberRemoved) { - receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member()); + } else if (message instanceof ClusterEvent.MemberRemoved memberRemoved) { + receiveMemberRemoveOrUnreachable(memberRemoved.member()); - } else if (message instanceof ClusterEvent.UnreachableMember) { - receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member()); + } else if (message instanceof ClusterEvent.UnreachableMember unreachableMember) { + receiveMemberRemoveOrUnreachable(unreachableMember.member()); } else { unhandled(message); @@ -176,6 +182,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param member who went down */ private void receiveMemberRemoveOrUnreachable(final Member member) { + LOG.debug("Received memberDown or Unreachable: {}", member); + //if its self, then stop itself if (selfAddress.equals(member.address())) { getContext().stop(getSelf()); @@ -197,7 +205,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { private void removePeer(final Address address) { clusterMembers.remove(address); peers.remove(address); - getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender()); + bucketStore.removeRemoteBucket(address); } /** @@ -206,6 +214,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * @param member the member to add */ private void receiveMemberUpOrReachable(final Member member) { + LOG.debug("Received memberUp or reachable: {}", member); + //ignore up notification for self if (selfAddress.equals(member.address())) { return; @@ -239,7 +249,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering { } LOG.trace("Gossiping to [{}]", address); - getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address))); + getLocalStatusAndSendTo(verifyNotNull(peers.get(address))); } /** @@ -258,15 +268,53 @@ public class Gossiper extends AbstractUntypedActorWithMetering { @VisibleForTesting void receiveGossipStatus(final GossipStatus status) { // Don't accept messages from non-members - if (!peers.containsKey(status.from())) { - return; + if (peers.containsKey(status.from())) { + // FIXME: sender should be part of GossipStatus + final ActorRef sender = getSender(); + bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions)); } + } + + private void processRemoteStatus(final ActorRef remote, final GossipStatus status, + final Map
localVersions) { + final Map remoteVersions = status.versions(); + + //diff between remote list and local + final Set localIsOlder = new HashSet<>(remoteVersions.keySet()); + localIsOlder.removeAll(localVersions.keySet()); - final ActorRef sender = getSender(); - Future