X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FBucketStore.java;h=628deb4311cebe1da5ff1b44deb715d933b8b8b2;hp=3b078aa062fdf1fc3feaca7fa10147de8c3503cc;hb=c31509c7a6630e54a9f9749a643fed5e1a1ad380;hpb=be88c930f435d3a5f0f012cb0588022a216237bd diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java index 3b078aa062..628deb4311 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java @@ -12,28 +12,23 @@ import akka.actor.ActorRef; import akka.actor.ActorRefProvider; import akka.actor.Address; import akka.actor.Props; -import akka.actor.UntypedActor; import akka.cluster.ClusterActorRefProvider; -import akka.event.Logging; -import akka.event.LoggingAdapter; -import org.opendaylight.controller.utils.ConditionalProbe; - +import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; +import org.opendaylight.controller.utils.ConditionalProbe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A store that syncs its data across nodes in the cluster. @@ -44,24 +39,26 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}. * */ -public class BucketStore extends UntypedActor { +public class BucketStore> extends AbstractUntypedActorWithMetering { - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + private static final Long NO_VERSION = -1L; + + protected final Logger log = LoggerFactory.getLogger(getClass()); /** * Bucket owned by the node */ - private BucketImpl localBucket = new BucketImpl();; + private final BucketImpl localBucket = new BucketImpl<>(); /** * Buckets ownded by other known nodes in the cluster */ - private ConcurrentMap remoteBuckets = new ConcurrentHashMap<>(); + private final Map> remoteBuckets = new HashMap<>(); /** * Bucket version for every known node in the cluster including this node */ - private ConcurrentMap versions = new ConcurrentHashMap<>(); + private final Map versions = new HashMap<>(); /** * Cluster address for this node @@ -70,72 +67,54 @@ public class BucketStore extends UntypedActor { private ConditionalProbe probe; + private final RemoteRpcProviderConfig config; + + public BucketStore(){ + config = new RemoteRpcProviderConfig(getContext().system().settings().config()); + } + @Override public void preStart(){ ActorRefProvider provider = getContext().provider(); selfAddress = provider.getDefaultAddress(); - if ( provider instanceof ClusterActorRefProvider) - getContext().actorOf(Props.create(Gossiper.class), "gossiper"); + if ( provider instanceof ClusterActorRefProvider) { + getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper"); + } } @Override - public void onReceive(Object message) throws Exception { - - log.debug("Received message: node[{}], message[{}]", selfAddress, message); - + protected void handleReceive(Object message) throws Exception { if (probe != null) { probe.tell(message, getSelf()); } if (message instanceof ConditionalProbe) { + // The ConditionalProbe is only used for unit tests. log.info("Received probe {} {}", getSelf(), message); probe = (ConditionalProbe) message; - } else if (message instanceof UpdateBucket) { - receiveUpdateBucket(((UpdateBucket) message).getBucket()); + // Send back any message to tell the caller we got the probe. + getSender().tell("Got it", getSelf()); } else if (message instanceof GetAllBuckets) { - receiveGetAllBucket(); - } else if (message instanceof GetLocalBucket) { - receiveGetLocalBucket(); + receiveGetAllBuckets(); } else if (message instanceof GetBucketsByMembers) { - receiveGetBucketsByMembers( - ((GetBucketsByMembers) message).getMembers()); + receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers()); } else if (message instanceof GetBucketVersions) { receiveGetBucketVersions(); } else if (message instanceof UpdateRemoteBuckets) { - receiveUpdateRemoteBuckets( - ((UpdateRemoteBuckets) message).getBuckets()); + receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets()); } else { - log.debug("Unhandled message [{}]", message); + if(log.isDebugEnabled()) { + log.debug("Unhandled message [{}]", message); + } unhandled(message); } - - } - - /** - * Returns a copy of bucket owned by this node - */ - private void receiveGetLocalBucket() { - final ActorRef sender = getSender(); - GetLocalBucketReply reply = new GetLocalBucketReply(localBucket); - sender.tell(reply, getSelf()); - } - - /** - * Updates the bucket owned by this node - * - * @param updatedBucket - */ - void receiveUpdateBucket(Bucket updatedBucket){ - - localBucket = (BucketImpl) updatedBucket; - versions.put(selfAddress, localBucket.getVersion()); } /** * Returns all the buckets the this node knows about, self owned + remote */ - void receiveGetAllBucket(){ + void receiveGetAllBuckets(){ final ActorRef sender = getSender(); sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf()); } @@ -145,11 +124,12 @@ public class BucketStore extends UntypedActor { * * @return self owned + remote buckets */ + @SuppressWarnings("rawtypes") Map getAllBuckets(){ Map all = new HashMap<>(remoteBuckets.size() + 1); //first add the local bucket - all.put(selfAddress, localBucket); + all.put(selfAddress, new BucketImpl<>(localBucket)); //then get all remote buckets all.putAll(remoteBuckets); @@ -162,6 +142,7 @@ public class BucketStore extends UntypedActor { * * @param members requested members */ + @SuppressWarnings("rawtypes") void receiveGetBucketsByMembers(Set
members){ final ActorRef sender = getSender(); Map buckets = getBucketsByMembers(members); @@ -174,17 +155,20 @@ public class BucketStore extends UntypedActor { * @param members requested members * @return buckets for requested memebers */ + @SuppressWarnings("rawtypes") Map getBucketsByMembers(Set
members) { Map buckets = new HashMap<>(); //first add the local bucket if asked - if (members.contains(selfAddress)) - buckets.put(selfAddress, localBucket); + if (members.contains(selfAddress)) { + buckets.put(selfAddress, new BucketImpl<>(localBucket)); + } //then get buckets for requested remote nodes for (Address address : members){ - if (remoteBuckets.containsKey(address)) + if (remoteBuckets.containsKey(address)) { buckets.put(address, remoteBuckets.get(address)); + } } return buckets; @@ -205,10 +189,13 @@ public class BucketStore extends UntypedActor { * @param receivedBuckets buckets sent by remote * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper} */ + @SuppressWarnings({ "rawtypes", "unchecked" }) void receiveUpdateRemoteBuckets(Map receivedBuckets){ - + log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets); if (receivedBuckets == null || receivedBuckets.isEmpty()) + { return; //nothing to do + } //Remote cant update self's bucket receivedBuckets.remove(selfAddress); @@ -216,15 +203,20 @@ public class BucketStore extends UntypedActor { for (Map.Entry entry : receivedBuckets.entrySet()){ Long localVersion = versions.get(entry.getKey()); - if (localVersion == null) localVersion = -1L; + if (localVersion == null) { + localVersion = NO_VERSION; + } - Bucket receivedBucket = entry.getValue(); + Bucket receivedBucket = entry.getValue(); - if (receivedBucket == null) + if (receivedBucket == null) { continue; + } Long remoteVersion = receivedBucket.getVersion(); - if (remoteVersion == null) remoteVersion = -1L; + if (remoteVersion == null) { + remoteVersion = NO_VERSION; + } //update only if remote version is newer if ( remoteVersion.longValue() > localVersion.longValue() ) { @@ -233,38 +225,26 @@ public class BucketStore extends UntypedActor { } } - log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + if(log.isDebugEnabled()) { + log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets); + } } - /// - ///Getter Setters - /// - - BucketImpl getLocalBucket() { + protected BucketImpl getLocalBucket() { return localBucket; } - void setLocalBucket(BucketImpl localBucket) { - this.localBucket = localBucket; + protected void updateLocalBucket(T data) { + localBucket.setData(data); + versions.put(selfAddress, localBucket.getVersion()); } - ConcurrentMap getRemoteBuckets() { + protected Map> getRemoteBuckets() { return remoteBuckets; } - void setRemoteBuckets(ConcurrentMap remoteBuckets) { - this.remoteBuckets = remoteBuckets; - } - - ConcurrentMap getVersions() { + @VisibleForTesting + Map getVersions() { return versions; } - - void setVersions(ConcurrentMap versions) { - this.versions = versions; - } - - Address getSelfAddress() { - return selfAddress; - } }