X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=275200f8d5328d73930538ab9fc9d650837d18ca;hp=439c131e3f6999fb438d8622e343c85036c55368;hb=e222d9f4a5b6957853cff9cf73987e6ce0ae3b24;hpb=a81d98f692b80c45bce3fe6a87e731abfb012a9f
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
index 439c131e3f..275200f8d5 100644
--- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
+++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
@@ -12,34 +12,26 @@ import akka.actor.ActorRefProvider;
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
+import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
-import akka.dispatch.Mapper;
-import akka.pattern.Patterns;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.collect.Maps;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
/**
@@ -61,41 +53,52 @@ import scala.concurrent.duration.FiniteDuration;
* for update.
*/
public class Gossiper extends AbstractUntypedActorWithMetering {
+ private static final Object GOSSIP_TICK = new Object() {
+ @Override
+ public String toString() {
+ return "gossip tick";
+ }
+ };
- private final Logger log = LoggerFactory.getLogger(getClass());
+ private final boolean autoStartGossipTicks;
+ private final RemoteRpcProviderConfig config;
- private Cluster cluster;
+ /**
+ * All known cluster members.
+ */
+ private final List
clusterMembers = new ArrayList<>();
/**
- * ActorSystem's address for the current cluster node.
+ * Cached ActorSelections for remote peers.
*/
- private Address selfAddress;
+ private final Map peers = new HashMap<>();
/**
- * All known cluster members.
+ * ActorSystem's address for the current cluster node.
*/
- private List clusterMembers = new ArrayList<>();
+ private Address selfAddress;
- private Cancellable gossipTask;
+ private Cluster cluster;
- private Boolean autoStartGossipTicks = true;
+ private Cancellable gossipTask;
- private final RemoteRpcProviderConfig config;
+ private BucketStoreAccess bucketStore;
- public Gossiper(RemoteRpcProviderConfig config) {
+ Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
this.config = Preconditions.checkNotNull(config);
+ this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
}
- /**
- * Constructor for testing.
- *
- * @param autoStartGossipTicks used for turning off gossip ticks during testing.
- * Gossip tick can be manually sent.
- */
- @VisibleForTesting
- public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
- this(config);
- this.autoStartGossipTicks = autoStartGossipTicks;
+ Gossiper(final RemoteRpcProviderConfig config) {
+ this(config, Boolean.TRUE);
+ }
+
+ public static Props props(final RemoteRpcProviderConfig config) {
+ return Props.create(Gossiper.class, config);
+ }
+
+ static Props testProps(final RemoteRpcProviderConfig config) {
+ return Props.create(Gossiper.class, config, Boolean.FALSE);
}
@Override
@@ -103,11 +106,14 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- if ( provider instanceof ClusterActorRefProvider ) {
+ bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
+
+ if (provider instanceof ClusterActorRefProvider) {
cluster = Cluster.get(getContext().system());
cluster.subscribe(getSelf(),
ClusterEvent.initialStateAsEvents(),
ClusterEvent.MemberEvent.class,
+ ClusterEvent.ReachableMember.class,
ClusterEvent.UnreachableMember.class);
}
@@ -115,10 +121,10 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
config.getGossipTickInterval(), //interval
- getSelf(), //target
- new Messages.GossiperMessages.GossipTick(), //message
- getContext().dispatcher(), //execution context
- getSelf() //sender
+ getSelf(), //target
+ GOSSIP_TICK, //message
+ getContext().dispatcher(), //execution context
+ getSelf() //sender
);
}
}
@@ -133,12 +139,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- protected void handleReceive(Object message) throws Exception {
+ protected void handleReceive(final Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
- if (message instanceof GossipTick) {
+ if (GOSSIP_TICK.equals(message)) {
receiveGossipTick();
} else if (message instanceof GossipStatus) {
// Message from remote gossiper with its bucket versions
@@ -149,12 +154,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
// comparing the GossipStatus message with its local versions.
receiveGossip((GossipEnvelope) message);
} else if (message instanceof ClusterEvent.MemberUp) {
- receiveMemberUp(((ClusterEvent.MemberUp) message).member());
+ receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
+
+ } else if (message instanceof ClusterEvent.ReachableMember) {
+ receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
} else if (message instanceof ClusterEvent.MemberRemoved) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
- } else if ( message instanceof ClusterEvent.UnreachableMember) {
+ } else if (message instanceof ClusterEvent.UnreachableMember) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
} else {
@@ -167,15 +175,31 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
*
* @param member who went down
*/
- void receiveMemberRemoveOrUnreachable(Member member) {
+ private void receiveMemberRemoveOrUnreachable(final Member member) {
+ LOG.debug("Received memberDown or Unreachable: {}", member);
+
//if its self, then stop itself
if (selfAddress.equals(member.address())) {
getContext().stop(getSelf());
return;
}
- clusterMembers.remove(member.address());
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ removePeer(member.address());
+ LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
+
+ private void addPeer(final Address address) {
+ if (!clusterMembers.contains(address)) {
+ clusterMembers.add(address);
+ }
+ peers.computeIfAbsent(address, input -> getContext().system()
+ .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
+ }
+
+ private void removePeer(final Address address) {
+ clusterMembers.remove(address);
+ peers.remove(address);
+ bucketStore.removeRemoteBucket(address);
}
/**
@@ -183,17 +207,16 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
*
* @param member the member to add
*/
- void receiveMemberUp(Member member) {
+ private void receiveMemberUpOrReachable(final Member member) {
+ LOG.debug("Received memberUp or reachable: {}", member);
+ //ignore up notification for self
if (selfAddress.equals(member.address())) {
- return; //ignore up notification for self
- }
-
- if (!clusterMembers.contains(member.address())) {
- clusterMembers.add(member.address());
+ return;
}
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ addPeer(member.address());
+ LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
@@ -203,22 +226,24 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
* 2. If there's only 1 member, send gossip status (bucket versions) to it.
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
+ @VisibleForTesting
void receiveGossipTick() {
- if (clusterMembers.size() == 0) {
- return; //no members to send gossip status to
+ final Address address;
+ switch (clusterMembers.size()) {
+ case 0:
+ //no members to send gossip status to
+ return;
+ case 1:
+ address = clusterMembers.get(0);
+ break;
+ default:
+ final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
+ address = clusterMembers.get(randomIndex);
+ break;
}
- Address remoteMemberToGossipTo;
-
- if (clusterMembers.size() == 1) {
- remoteMemberToGossipTo = clusterMembers.get(0);
- } else {
- Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
- remoteMemberToGossipTo = clusterMembers.get(randomIndex);
- }
-
- log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
- getLocalStatusAndSendTo(remoteMemberToGossipTo);
+ LOG.trace("Gossiping to [{}]", address);
+ getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
}
/**
@@ -234,18 +259,56 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
*
* @param status bucket versions from a remote member
*/
- void receiveGossipStatus(GossipStatus status) {
- //Don't accept messages from non-members
- if (!clusterMembers.contains(status.from())) {
- return;
+ @VisibleForTesting
+ void receiveGossipStatus(final GossipStatus status) {
+ // Don't accept messages from non-members
+ if (peers.containsKey(status.from())) {
+ // FIXME: sender should be part of GossipStatus
+ final ActorRef sender = getSender();
+ bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions));
}
+ }
- final ActorRef sender = getSender();
- Future