X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStore.java;h=126b9b669df473d46cb8f9c8cfa2b4eb6786ffd3;hb=cf005e61579cc0848b2f76524db84aa7a65de178;hp=cc24e6845f852df17b52e604cc113f7e34a4a6d8;hpb=9d1222a1f001c9249f4a6b3dba6b067c65de5b4a;p=controller.git diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index cc24e6845f..126b9b669d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -11,11 +11,11 @@ package org.opendaylight.controller.remote.rpc.registry.gossip; import akka.actor.ActorRef; import akka.actor.ActorRefProvider; import akka.actor.Address; -import akka.actor.Props; import akka.cluster.ClusterActorRefProvider; import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; @@ -25,10 +25,9 @@ import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketSto import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket; import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; import org.opendaylight.controller.utils.ConditionalProbe; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A store that syncs its data across nodes in the cluster. @@ -40,18 +39,13 @@ import org.slf4j.LoggerFactory; * */ public class BucketStore> extends AbstractUntypedActorWithMetering { - - private static final Long NO_VERSION = -1L; - - protected final Logger log = LoggerFactory.getLogger(getClass()); - /** * Bucket owned by the node */ - private final BucketImpl localBucket = new BucketImpl<>(); + private final BucketImpl localBucket; /** - * Buckets ownded by other known nodes in the cluster + * Buckets owned by other known nodes in the cluster. */ private final Map> remoteBuckets = new HashMap<>(); @@ -65,12 +59,14 @@ public class BucketStore> extends AbstractUntypedActorWithMe */ private Address selfAddress; + // FIXME: should be part of test-specific subclass private ConditionalProbe probe; private final RemoteRpcProviderConfig config; - public BucketStore(RemoteRpcProviderConfig config){ + public BucketStore(final RemoteRpcProviderConfig config, final T initialData) { this.config = Preconditions.checkNotNull(config); + this.localBucket = new BucketImpl<>(initialData); } @Override @@ -78,35 +74,36 @@ public class BucketStore> extends AbstractUntypedActorWithMe ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); - if ( provider instanceof ClusterActorRefProvider) { - getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper"); + if (provider instanceof ClusterActorRefProvider) { + getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper"); } } @Override - protected void handleReceive(Object message) throws Exception { + protected void handleReceive(final Object message) throws Exception { if (probe != null) { probe.tell(message, getSelf()); } - if (message instanceof ConditionalProbe) { - // The ConditionalProbe is only used for unit tests. - log.info("Received probe {} {}", getSelf(), message); - probe = (ConditionalProbe) message; - // Send back any message to tell the caller we got the probe. - getSender().tell("Got it", getSelf()); - } else if (message instanceof GetAllBuckets) { - receiveGetAllBuckets(); - } else if (message instanceof GetBucketsByMembers) { + if (message instanceof GetBucketsByMembers) { receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers()); } else if (message instanceof GetBucketVersions) { receiveGetBucketVersions(); } else if (message instanceof UpdateRemoteBuckets) { receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); + } else if (message instanceof RemoveRemoteBucket) { + removeBucket(((RemoveRemoteBucket) message).getAddress()); + } else if (message instanceof GetAllBuckets) { + // GetAllBuckets is used only for unit tests. + receiveGetAllBuckets(); + } else if (message instanceof ConditionalProbe) { + // The ConditionalProbe is only used for unit tests. + LOG.info("Received probe {} {}", getSelf(), message); + probe = (ConditionalProbe) message; + // Send back any message to tell the caller we got the probe. + getSender().tell("Got it", getSelf()); } else { - if(log.isDebugEnabled()) { - log.debug("Unhandled message [{}]", message); - } + LOG.debug("Unhandled message [{}]", message); unhandled(message); } } @@ -145,7 +142,7 @@ public class BucketStore> extends AbstractUntypedActorWithMe * * @param members requested members */ - void receiveGetBucketsByMembers(Set
members){ + void receiveGetBucketsByMembers(final Set
members) { final ActorRef sender = getSender(); Map> buckets = getBucketsByMembers(members); sender.tell(new GetBucketsByMembersReply(buckets), getSelf()); @@ -157,7 +154,7 @@ public class BucketStore> extends AbstractUntypedActorWithMe * @param members requested members * @return buckets for requested memebers */ - Map> getBucketsByMembers(Set
members) { + Map> getBucketsByMembers(final Set
members) { Map> buckets = new HashMap<>(); //first add the local bucket if asked @@ -190,56 +187,77 @@ public class BucketStore> extends AbstractUntypedActorWithMe * @param receivedBuckets buckets sent by remote * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} */ - void receiveUpdateRemoteBuckets(Map> receivedBuckets){ - log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); - if (receivedBuckets == null || receivedBuckets.isEmpty()) - { - return; //nothing to do + void receiveUpdateRemoteBuckets(final Map> receivedBuckets) { + LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); + if (receivedBuckets == null || receivedBuckets.isEmpty()) { + //nothing to do + return; } - //Remote cant update self's bucket - receivedBuckets.remove(selfAddress); - - for (Map.Entry> entry : receivedBuckets.entrySet()){ - - Long localVersion = versions.get(entry.getKey()); - if (localVersion == null) { - localVersion = NO_VERSION; + final Map> newBuckets = new HashMap<>(receivedBuckets.size()); + for (Entry> entry : receivedBuckets.entrySet()) { + if (selfAddress.equals(entry.getKey())) { + // Remote cannot update our bucket + continue; } - Bucket receivedBucket = entry.getValue(); - + final Bucket receivedBucket = entry.getValue(); if (receivedBucket == null) { + LOG.debug("Ignoring null bucket from {}", entry.getKey()); continue; } - Long remoteVersion = receivedBucket.getVersion(); - if (remoteVersion == null) { - remoteVersion = NO_VERSION; + // update only if remote version is newer + final long remoteVersion = receivedBucket.getVersion(); + final Long localVersion = versions.get(entry.getKey()); + if (localVersion != null && remoteVersion <= localVersion.longValue()) { + LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion, + remoteVersion); + continue; } - //update only if remote version is newer - if ( remoteVersion.longValue() > localVersion.longValue() ) { - remoteBuckets.put(entry.getKey(), receivedBucket); - versions.put(entry.getKey(), remoteVersion); - } + newBuckets.put(entry.getKey(), receivedBucket); + remoteBuckets.put(entry.getKey(), receivedBucket); + versions.put(entry.getKey(), remoteVersion); + LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion); } - if(log.isDebugEnabled()) { - log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + + onBucketsUpdated(newBuckets); + } + + private void removeBucket(final Address address) { + final Bucket bucket = remoteBuckets.remove(address); + if (bucket != null) { + onBucketRemoved(address, bucket); } + } - onBucketsUpdated(); + /** + * Callback to subclasses invoked when a bucket is removed. + * + * @param address Remote address + * @param bucket Bucket removed + */ + protected void onBucketRemoved(final Address address, final Bucket bucket) { + // Default noop } - protected void onBucketsUpdated() { + /** + * Callback to subclasses invoked when the set of remote buckets is updated. + * + * @param newBuckets Map of address to new bucket. Never null, but can be empty. + */ + protected void onBucketsUpdated(final Map> newBuckets) { + // Default noop } public BucketImpl getLocalBucket() { return localBucket; } - protected void updateLocalBucket(T data) { + protected void updateLocalBucket(final T data) { localBucket.setData(data); versions.put(selfAddress, localBucket.getVersion()); }