X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=015c0a1b0e46d0bf180a78cfe781253997930c20;hp=2c47c4e2a9242a7d9c56a8dc30ade963b83450b7;hb=5b66dd8f5e3467a07e77b20fe696b29993ce5565;hpb=5fd4213b5bfaf2db21f1b37139f6b98535a872c0
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
index 2c47c4e2a9..015c0a1b0e 100644
--- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
+++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
@@ -17,8 +17,6 @@ import akka.cluster.Cluster;
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
-import akka.dispatch.Mapper;
-import akka.pattern.Patterns;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
@@ -27,21 +25,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
-import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
-import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
/**
@@ -63,6 +52,13 @@ import scala.concurrent.duration.FiniteDuration;
* for update.
*/
public class Gossiper extends AbstractUntypedActorWithMetering {
+ private static final Object GOSSIP_TICK = new Object() {
+ @Override
+ public String toString() {
+ return "gossip tick";
+ }
+ };
+
private final boolean autoStartGossipTicks;
private final RemoteRpcProviderConfig config;
@@ -85,6 +81,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
private Cancellable gossipTask;
+ private BucketStoreAccess bucketStore;
+
Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
this.config = Preconditions.checkNotNull(config);
this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
@@ -107,6 +105,8 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
+ bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
+
if (provider instanceof ClusterActorRefProvider ) {
cluster = Cluster.get(getContext().system());
cluster.subscribe(getSelf(),
@@ -121,7 +121,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
config.getGossipTickInterval(), //interval
getSelf(), //target
- new Messages.GossiperMessages.GossipTick(), //message
+ GOSSIP_TICK, //message
getContext().dispatcher(), //execution context
getSelf() //sender
);
@@ -138,12 +138,11 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void handleReceive(final Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
- if (message instanceof GossipTick) {
+ if (GOSSIP_TICK.equals(message)) {
receiveGossipTick();
} else if (message instanceof GossipStatus) {
// Message from remote gossiper with its bucket versions
@@ -197,7 +196,7 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
private void removePeer(final Address address) {
clusterMembers.remove(address);
peers.remove(address);
- getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
+ bucketStore.removeRemoteBucket(address);
}
/**
@@ -258,15 +257,53 @@ public class Gossiper extends AbstractUntypedActorWithMetering {
@VisibleForTesting
void receiveGossipStatus(final GossipStatus status) {
// Don't accept messages from non-members
- if (!peers.containsKey(status.from())) {
- return;
+ if (peers.containsKey(status.from())) {
+ // FIXME: sender should be part of GossipStatus
+ final ActorRef sender = getSender();
+ bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions));
+ }
+ }
+
+ private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
+ final Map
localVersions) {
+ final Map remoteVersions = status.versions();
+
+ //diff between remote list and local
+ final Set localIsOlder = new HashSet<>(remoteVersions.keySet());
+ localIsOlder.removeAll(localVersions.keySet());
+
+ //diff between local list and remote
+ final Set localIsNewer = new HashSet<>(localVersions.keySet());
+ localIsNewer.removeAll(remoteVersions.keySet());
+
+
+ for (Entry entry : remoteVersions.entrySet()) {
+ Address address = entry.getKey();
+ Long remoteVersion = entry.getValue();
+ Long localVersion = localVersions.get(address);
+ if (localVersion == null || remoteVersion == null) {
+ //this condition is taken care of by above diffs
+ continue;
+ }
+
+ if (localVersion < remoteVersion) {
+ localIsOlder.add(address);
+ } else if (localVersion > remoteVersion) {
+ localIsNewer.add(address);
+ }
}
- final ActorRef sender = getSender();
- Future