Merge "Handle more specific BindException and IOException"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
index 23cbaca32f483f6af6ba8046e34d9233d326d6c5..6ffe147e71384b66ece31c4e701294c8630456c8 100644 (file)
@@ -9,30 +9,30 @@
 package org.opendaylight.controller.remote.rpc.registry.gossip;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorRefProvider;
 import akka.actor.Address;
 import akka.actor.Props;
-import akka.actor.UntypedActor;
-import akka.cluster.Cluster;
+import akka.cluster.ClusterActorRefProvider;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
-import org.opendaylight.controller.utils.ConditionalProbe;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.utils.ConditionalProbe;
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -43,14 +43,14 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Bu
  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
  *
  */
-public class BucketStore extends UntypedActor {
+public class BucketStore extends AbstractUntypedActorWithMetering {
 
     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
 
     /**
      * Bucket owned by the node
      */
-    private BucketImpl localBucket = new BucketImpl();;
+    private BucketImpl localBucket = new BucketImpl();
 
     /**
      * Buckets ownded by other known nodes in the cluster
@@ -65,43 +65,39 @@ public class BucketStore extends UntypedActor {
     /**
      * Cluster address for this node
      */
-    private final Address selfAddress = Cluster.get(getContext().system()).selfAddress();
-
-    /**
-     * Our private gossiper
-     */
-    private ActorRef gossiper;
+    private Address selfAddress;
 
     private ConditionalProbe probe;
 
-    public BucketStore(){
-        gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
-    }
+    private final RemoteRpcProviderConfig config;
 
-    /**
-     * This constructor is useful for testing.
-     * TODO: Pass Props instead of ActorRef
-     *
-     * @param gossiper
-     */
-    public BucketStore(ActorRef gossiper){
-        this.gossiper = gossiper;
+    public BucketStore(){
+        config = new RemoteRpcProviderConfig(getContext().system().settings().config());
     }
 
     @Override
-    public void onReceive(Object message) throws Exception {
+    public void preStart(){
+        ActorRefProvider provider = getContext().provider();
+        selfAddress = provider.getDefaultAddress();
 
-        log.debug("Received message: node[{}], message[{}]", selfAddress,
-            message);
+        if ( provider instanceof ClusterActorRefProvider) {
+            getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
+        }
+    }
 
-        if (probe != null) {
 
+    @Override
+    protected void handleReceive(Object message) throws Exception {
+        if (probe != null) {
             probe.tell(message, getSelf());
         }
 
         if (message instanceof ConditionalProbe) {
+            // The ConditionalProbe is only used for unit tests.
             log.info("Received probe {} {}", getSelf(), message);
             probe = (ConditionalProbe) message;
+            // Send back any message to tell the caller we got the probe.
+            getSender().tell("Got it", getSelf());
         } else if (message instanceof UpdateBucket) {
             receiveUpdateBucket(((UpdateBucket) message).getBucket());
         } else if (message instanceof GetAllBuckets) {
@@ -110,17 +106,18 @@ public class BucketStore extends UntypedActor {
             receiveGetLocalBucket();
         } else if (message instanceof GetBucketsByMembers) {
             receiveGetBucketsByMembers(
-                ((GetBucketsByMembers) message).getMembers());
+                    ((GetBucketsByMembers) message).getMembers());
         } else if (message instanceof GetBucketVersions) {
             receiveGetBucketVersions();
         } else if (message instanceof UpdateRemoteBuckets) {
             receiveUpdateRemoteBuckets(
-                ((UpdateRemoteBuckets) message).getBuckets());
+                    ((UpdateRemoteBuckets) message).getBuckets());
         } else {
-            log.debug("Unhandled message [{}]", message);
+            if(log.isDebugEnabled()) {
+                log.debug("Unhandled message [{}]", message);
+            }
             unhandled(message);
         }
-
     }
 
     /**
@@ -189,13 +186,15 @@ public class BucketStore extends UntypedActor {
         Map<Address, Bucket> buckets = new HashMap<>();
 
         //first add the local bucket if asked
-        if (members.contains(selfAddress))
+        if (members.contains(selfAddress)) {
             buckets.put(selfAddress, localBucket);
+        }
 
         //then get buckets for requested remote nodes
         for (Address address : members){
-            if (remoteBuckets.containsKey(address))
+            if (remoteBuckets.containsKey(address)) {
                 buckets.put(address, remoteBuckets.get(address));
+            }
         }
 
         return buckets;
@@ -219,7 +218,9 @@ public class BucketStore extends UntypedActor {
     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
 
         if (receivedBuckets == null || receivedBuckets.isEmpty())
+         {
             return; //nothing to do
+        }
 
         //Remote cant update self's bucket
         receivedBuckets.remove(selfAddress);
@@ -227,24 +228,30 @@ public class BucketStore extends UntypedActor {
         for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
 
             Long localVersion = versions.get(entry.getKey());
-            if (localVersion == null) localVersion = -1L;
+            if (localVersion == null) {
+                localVersion = -1L;
+            }
 
             Bucket receivedBucket = entry.getValue();
 
-            if (receivedBucket == null)
+            if (receivedBucket == null) {
                 continue;
+            }
 
             Long remoteVersion = receivedBucket.getVersion();
-            if (remoteVersion == null) remoteVersion = -1L;
+            if (remoteVersion == null) {
+                remoteVersion = -1L;
+            }
 
             //update only if remote version is newer
-            if ( remoteVersion > localVersion ) {
+            if ( remoteVersion.longValue() > localVersion.longValue() ) {
                 remoteBuckets.put(entry.getKey(), receivedBucket);
                 versions.put(entry.getKey(), remoteVersion);
             }
         }
-
-        log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+        if(log.isDebugEnabled()) {
+            log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
+        }
     }
 
     ///
@@ -278,5 +285,4 @@ public class BucketStore extends UntypedActor {
     Address getSelfAddress() {
         return selfAddress;
     }
-
 }