package org.opendaylight.controller.remote.rpc.registry.gossip;
import akka.actor.ActorRef;
+import akka.actor.ActorRefProvider;
import akka.actor.Address;
import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.cluster.Cluster;
+import akka.cluster.ClusterActorRefProvider;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.utils.ConditionalProbe;
import java.util.HashMap;
import java.util.Map;
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*
*/
-public class BucketStore extends UntypedActor {
+public class BucketStore extends AbstractUntypedActorWithMetering {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
/**
* Bucket owned by the node
*/
- private BucketImpl localBucket = new BucketImpl();;
+ private BucketImpl localBucket = new BucketImpl();
/**
* Buckets ownded by other known nodes in the cluster
/**
* Cluster address for this node
*/
- private final Address selfAddress = Cluster.get(getContext().system()).selfAddress();
+ private Address selfAddress;
- /**
- * Our private gossiper
- */
- private ActorRef gossiper;
+ private ConditionalProbe probe;
+
+ private final RemoteRpcProviderConfig config;
public BucketStore(){
- gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
}
- /**
- * This constructor is useful for testing.
- * TODO: Pass Props instead of ActorRef
- *
- * @param gossiper
- */
- public BucketStore(ActorRef gossiper){
- this.gossiper = gossiper;
+ @Override
+ public void preStart(){
+ ActorRefProvider provider = getContext().provider();
+ selfAddress = provider.getDefaultAddress();
+
+ if ( provider instanceof ClusterActorRefProvider)
+ getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
}
- @Override
- public void onReceive(Object message) throws Exception {
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ if (probe != null) {
+ probe.tell(message, getSelf());
+ }
- if (message instanceof UpdateBucket)
+ if (message instanceof ConditionalProbe) {
+ log.info("Received probe {} {}", getSelf(), message);
+ probe = (ConditionalProbe) message;
+ } else if (message instanceof UpdateBucket) {
receiveUpdateBucket(((UpdateBucket) message).getBucket());
-
- else if (message instanceof GetAllBuckets)
+ } else if (message instanceof GetAllBuckets) {
receiveGetAllBucket();
-
- else if (message instanceof GetLocalBucket)
+ } else if (message instanceof GetLocalBucket) {
receiveGetLocalBucket();
-
- else if (message instanceof GetBucketsByMembers)
- receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
-
- else if (message instanceof GetBucketVersions)
+ } else if (message instanceof GetBucketsByMembers) {
+ receiveGetBucketsByMembers(
+ ((GetBucketsByMembers) message).getMembers());
+ } else if (message instanceof GetBucketVersions) {
receiveGetBucketVersions();
-
- else if (message instanceof UpdateRemoteBuckets)
- receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
-
- else {
- log.debug("Unhandled message [{}]", message);
+ } else if (message instanceof UpdateRemoteBuckets) {
+ receiveUpdateRemoteBuckets(
+ ((UpdateRemoteBuckets) message).getBuckets());
+ } else {
+ if(log.isDebugEnabled()) {
+ log.debug("Unhandled message [{}]", message);
+ }
unhandled(message);
}
-
}
/**
if (remoteVersion == null) remoteVersion = -1L;
//update only if remote version is newer
- if ( remoteVersion > localVersion ) {
+ if ( remoteVersion.longValue() > localVersion.longValue() ) {
remoteBuckets.put(entry.getKey(), receivedBucket);
versions.put(entry.getKey(), remoteVersion);
}
}
-
- log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ if(log.isDebugEnabled()) {
+ log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+ }
}
///