import akka.cluster.ClusterActorRefProvider;
import akka.event.Logging;
import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.utils.ConditionalProbe;
-
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.utils.ConditionalProbe;
/**
* A store that syncs its data across nodes in the cluster.
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
- if ( provider instanceof ClusterActorRefProvider)
+ if ( provider instanceof ClusterActorRefProvider) {
getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
+ }
}
}
if (message instanceof ConditionalProbe) {
+ // The ConditionalProbe is only used for unit tests.
log.info("Received probe {} {}", getSelf(), message);
probe = (ConditionalProbe) message;
+ // Send back any message to tell the caller we got the probe.
+ getSender().tell("Got it", getSelf());
} else if (message instanceof UpdateBucket) {
receiveUpdateBucket(((UpdateBucket) message).getBucket());
} else if (message instanceof GetAllBuckets) {
Map<Address, Bucket> buckets = new HashMap<>();
//first add the local bucket if asked
- if (members.contains(selfAddress))
+ if (members.contains(selfAddress)) {
buckets.put(selfAddress, localBucket);
+ }
//then get buckets for requested remote nodes
for (Address address : members){
- if (remoteBuckets.containsKey(address))
+ if (remoteBuckets.containsKey(address)) {
buckets.put(address, remoteBuckets.get(address));
+ }
}
return buckets;
void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
if (receivedBuckets == null || receivedBuckets.isEmpty())
+ {
return; //nothing to do
+ }
//Remote cant update self's bucket
receivedBuckets.remove(selfAddress);
for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
Long localVersion = versions.get(entry.getKey());
- if (localVersion == null) localVersion = -1L;
+ if (localVersion == null) {
+ localVersion = -1L;
+ }
Bucket receivedBucket = entry.getValue();
- if (receivedBucket == null)
+ if (receivedBucket == null) {
continue;
+ }
Long remoteVersion = receivedBucket.getVersion();
- if (remoteVersion == null) remoteVersion = -1L;
+ if (remoteVersion == null) {
+ remoteVersion = -1L;
+ }
//update only if remote version is newer
if ( remoteVersion.longValue() > localVersion.longValue() ) {