X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=2a4e3b7f93252b2c411025b777defdbfb287830c;hb=efff2ad1ea02712f00013aa3b40529ceecf5e29b;hp=33b3f6e813fb95cd1f4090f53255949baee716a7;hpb=8426e7a67b1235e8ecc67b1a98a5bd096c88e729;p=controller.git
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
index 33b3f6e813..2a4e3b7f93 100644
--- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
+++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
@@ -17,31 +17,21 @@ import akka.cluster.Cluster;
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
-import akka.dispatch.Mapper;
-import akka.pattern.Patterns;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
/**
@@ -63,6 +53,13 @@ import scala.concurrent.duration.FiniteDuration;
* for update.
*/
public class Gossiper extends AbstractUntypedActorWithMetering {
+ private static final Object GOSSIP_TICK = new Object() {
+ @Override
+ public String toString() {
+ return "gossip tick";
+ }
+ };
+
private final boolean autoStartGossipTicks;
private final RemoteRpcProviderConfig config;
@@ -85,6 +82,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
private Cancellable gossipTask;
+ private BucketStoreAccess bucketStore;
+
Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
this.config = Preconditions.checkNotNull(config);
this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
@@ -107,7 +106,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- if (provider instanceof ClusterActorRefProvider ) {
+ bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration());
+
+ if (provider instanceof ClusterActorRefProvider) {
cluster = Cluster.get(getContext().system());
cluster.subscribe(getSelf(),
ClusterEvent.initialStateAsEvents(),
@@ -121,7 +122,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
config.getGossipTickInterval(), //interval
getSelf(), //target
- new Messages.GossiperMessages.GossipTick(), //message
+ GOSSIP_TICK, //message
getContext().dispatcher(), //execution context
getSelf() //sender
);
@@ -138,12 +139,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void handleReceive(final Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
- if (message instanceof GossipTick) {
+ if (GOSSIP_TICK.equals(message)) {
receiveGossipTick();
} else if (message instanceof GossipStatus) {
// Message from remote gossiper with its bucket versions
@@ -176,6 +176,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
* @param member who went down
*/
private void receiveMemberRemoveOrUnreachable(final Member member) {
+ LOG.debug("Received memberDown or Unreachable: {}", member);
+
//if its self, then stop itself
if (selfAddress.equals(member.address())) {
getContext().stop(getSelf());
@@ -197,7 +199,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
private void removePeer(final Address address) {
clusterMembers.remove(address);
peers.remove(address);
- getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
+ bucketStore.removeRemoteBucket(address);
}
/**
@@ -206,6 +208,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
* @param member the member to add
*/
private void receiveMemberUpOrReachable(final Member member) {
+ LOG.debug("Received memberUp or reachable: {}", member);
+
//ignore up notification for self
if (selfAddress.equals(member.address())) {
return;
@@ -258,15 +262,53 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
@VisibleForTesting
void receiveGossipStatus(final GossipStatus status) {
// Don't accept messages from non-members
- if (!peers.containsKey(status.from())) {
- return;
+ if (peers.containsKey(status.from())) {
+ // FIXME: sender should be part of GossipStatus
+ final ActorRef sender = getSender();
+ bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions));
}
+ }
+
+ private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
+ final Map
localVersions) {
+ final Map remoteVersions = status.versions();
+
+ //diff between remote list and local
+ final Set localIsOlder = new HashSet<>(remoteVersions.keySet());
+ localIsOlder.removeAll(localVersions.keySet());
+
+ //diff between local list and remote
+ final Set localIsNewer = new HashSet<>(localVersions.keySet());
+ localIsNewer.removeAll(remoteVersions.keySet());
- final ActorRef sender = getSender();
- Future