package org.opendaylight.controller.remote.rpc.registry.gossip;
import akka.actor.ActorRef;
+import akka.actor.ActorRefProvider;
import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.cluster.Cluster;
+import akka.cluster.ClusterActorRefProvider;
import akka.event.Logging;
import akka.event.LoggingAdapter;
-import org.opendaylight.controller.utils.ConditionalProbe;
-
+import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.utils.ConditionalProbe;
/**
* A store that syncs its data across nodes in the cluster.
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*
*/
-public class BucketStore extends UntypedActor {
+public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
+
+ private static final Long NO_VERSION = -1L;
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
/**
* Bucket owned by the node
*/
- private BucketImpl localBucket = new BucketImpl();;
+ private final BucketImpl<T> localBucket = new BucketImpl<>();
/**
* Buckets ownded by other known nodes in the cluster
*/
- private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
+ private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
/**
* Bucket version for every known node in the cluster including this node
*/
- private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
+ private final Map<Address, Long> versions = new HashMap<>();
/**
* Cluster address for this node
*/
- private final Address selfAddress = Cluster.get(getContext().system()).selfAddress();
-
- /**
- * Our private gossiper
- */
- private ActorRef gossiper;
+ private Address selfAddress;
private ConditionalProbe probe;
- public BucketStore(){
- gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
- }
+ private final RemoteRpcProviderConfig config;
- /**
- * This constructor is useful for testing.
- * TODO: Pass Props instead of ActorRef
- *
- * @param gossiper
- */
- public BucketStore(ActorRef gossiper){
- this.gossiper = gossiper;
+ public BucketStore(){
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
}
@Override
- public void onReceive(Object message) throws Exception {
+ public void preStart(){
+ ActorRefProvider provider = getContext().provider();
+ selfAddress = provider.getDefaultAddress();
- log.debug("Received message: node[{}], message[{}]", selfAddress,
- message);
+ if ( provider instanceof ClusterActorRefProvider) {
+ getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
+ }
+ }
+ @Override
+ protected void handleReceive(Object message) throws Exception {
if (probe != null) {
-
probe.tell(message, getSelf());
}
if (message instanceof ConditionalProbe) {
+ // The ConditionalProbe is only used for unit tests.
log.info("Received probe {} {}", getSelf(), message);
probe = (ConditionalProbe) message;
- } else if (message instanceof UpdateBucket) {
- receiveUpdateBucket(((UpdateBucket) message).getBucket());
+ // Send back any message to tell the caller we got the probe.
+ getSender().tell("Got it", getSelf());
} else if (message instanceof GetAllBuckets) {
- receiveGetAllBucket();
- } else if (message instanceof GetLocalBucket) {
- receiveGetLocalBucket();
+ receiveGetAllBuckets();
} else if (message instanceof GetBucketsByMembers) {
- receiveGetBucketsByMembers(
- ((GetBucketsByMembers) message).getMembers());
+ receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
} else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
} else if (message instanceof UpdateRemoteBuckets) {
- receiveUpdateRemoteBuckets(
- ((UpdateRemoteBuckets) message).getBuckets());
+ receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
} else {
- log.debug("Unhandled message [{}]", message);
+ if(log.isDebugEnabled()) {
+ log.debug("Unhandled message [{}]", message);
+ }
unhandled(message);
}
-
- }
-
- /**
- * Returns a copy of bucket owned by this node
- */
- private void receiveGetLocalBucket() {
- final ActorRef sender = getSender();
- GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
- sender.tell(reply, getSelf());
- }
-
- /**
- * Updates the bucket owned by this node
- *
- * @param updatedBucket
- */
- void receiveUpdateBucket(Bucket updatedBucket){
-
- localBucket = (BucketImpl) updatedBucket;
- versions.put(selfAddress, localBucket.getVersion());
}
/**
* Returns all the buckets the this node knows about, self owned + remote
*/
- void receiveGetAllBucket(){
+ void receiveGetAllBuckets(){
final ActorRef sender = getSender();
sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
}
*
* @return self owned + remote buckets
*/
+ @SuppressWarnings("rawtypes")
Map<Address, Bucket> getAllBuckets(){
Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
//first add the local bucket
- all.put(selfAddress, localBucket);
+ all.put(selfAddress, new BucketImpl<>(localBucket));
//then get all remote buckets
all.putAll(remoteBuckets);
*
* @param members requested members
*/
+ @SuppressWarnings("rawtypes")
void receiveGetBucketsByMembers(Set<Address> members){
final ActorRef sender = getSender();
Map<Address, Bucket> buckets = getBucketsByMembers(members);
* @param members requested members
* @return buckets for requested memebers
*/
+ @SuppressWarnings("rawtypes")
Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
Map<Address, Bucket> buckets = new HashMap<>();
//first add the local bucket if asked
- if (members.contains(selfAddress))
- buckets.put(selfAddress, localBucket);
+ if (members.contains(selfAddress)) {
+ buckets.put(selfAddress, new BucketImpl<>(localBucket));
+ }
//then get buckets for requested remote nodes
for (Address address : members){
- if (remoteBuckets.containsKey(address))
+ if (remoteBuckets.containsKey(address)) {
buckets.put(address, remoteBuckets.get(address));
+ }
}
return buckets;
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
+ @SuppressWarnings({ "rawtypes", "unchecked" })
void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
-
+ log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
if (receivedBuckets == null || receivedBuckets.isEmpty())
+ {
return; //nothing to do
+ }
//Remote cant update self's bucket
receivedBuckets.remove(selfAddress);
for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
Long localVersion = versions.get(entry.getKey());
- if (localVersion == null) localVersion = -1L;
+ if (localVersion == null) {
+ localVersion = NO_VERSION;
+ }
- Bucket receivedBucket = entry.getValue();
+ Bucket<T> receivedBucket = entry.getValue();
- if (receivedBucket == null)
+ if (receivedBucket == null) {
continue;
+ }
Long remoteVersion = receivedBucket.getVersion();
- if (remoteVersion == null) remoteVersion = -1L;
+ if (remoteVersion == null) {
+ remoteVersion = NO_VERSION;
+ }
//update only if remote version is newer
- if ( remoteVersion > localVersion ) {
+ if ( remoteVersion.longValue() > localVersion.longValue() ) {
remoteBuckets.put(entry.getKey(), receivedBucket);
versions.put(entry.getKey(), remoteVersion);
}
}
- log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ if(log.isDebugEnabled()) {
+ log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ }
}
- ///
- ///Getter Setters
- ///
-
- BucketImpl getLocalBucket() {
+ protected BucketImpl<T> getLocalBucket() {
return localBucket;
}
- void setLocalBucket(BucketImpl localBucket) {
- this.localBucket = localBucket;
+ protected void updateLocalBucket(T data) {
+ localBucket.setData(data);
+ versions.put(selfAddress, localBucket.getVersion());
}
- ConcurrentMap<Address, Bucket> getRemoteBuckets() {
+ protected Map<Address, Bucket<T>> getRemoteBuckets() {
return remoteBuckets;
}
- void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
- this.remoteBuckets = remoteBuckets;
- }
-
- ConcurrentMap<Address, Long> getVersions() {
+ @VisibleForTesting
+ Map<Address, Long> getVersions() {
return versions;
}
-
- void setVersions(ConcurrentMap<Address, Long> versions) {
- this.versions = versions;
- }
-
- Address getSelfAddress() {
- return selfAddress;
- }
-
}