Bug 2210: Fixed initial DCL notification on registration
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 157f1cb3771cd71ddd1ddf14d2541bef3a0aefc3..e861165c6ba2592d7d5d14d98e5fb34591344d85 100644 (file)
@@ -25,6 +25,7 @@ import akka.persistence.RecoveryFailure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
@@ -163,7 +164,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("Initializing shard [{}]", shardName);
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
-            shardInformation.setShardInitialized(true);
+            shardInformation.setActorInitialized();
         }
     }
 
@@ -192,7 +193,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        sendResponse(shardInformation, new Supplier<Object>() {
+        sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
             @Override
             public Object get() {
                 return new LocalShardFound(shardInformation.getActor());
@@ -200,9 +201,22 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         });
     }
 
-    private void sendResponse(ShardInformation shardInformation,  Supplier<Object> messageSupplier) {
-        if (shardInformation.getActor() == null || !shardInformation.isShardInitialized()) {
-            getSender().tell(new ActorNotInitialized(), getSelf());
+    private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
+            final Supplier<Object> messageSupplier) {
+        if (!shardInformation.isShardInitialized()) {
+            if(waitUntilInitialized) {
+                final ActorRef sender = getSender();
+                final ActorRef self = self();
+                shardInformation.addRunnableOnInitialized(new Runnable() {
+                    @Override
+                    public void run() {
+                        sender.tell(messageSupplier.get(), self);
+                    }
+                });
+            } else {
+                getSender().tell(new ActorNotInitialized(), getSelf());
+            }
+
             return;
         }
 
@@ -277,7 +291,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         // First see if the there is a local replica for the shard
         final ShardInformation info = localShards.get(shardName);
         if (info != null) {
-            sendResponse(info, new Supplier<Object>() {
+            sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
                 @Override
                 public Object get() {
                     return new PrimaryFound(info.getActorPath().toString()).toSerializable();
@@ -422,7 +436,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private ActorRef actor;
         private ActorPath actorPath;
         private final Map<ShardIdentifier, String> peerAddresses;
-        private boolean shardInitialized = false; // flag that determines if the actor is ready for business
+
+        // flag that determines if the actor is ready for business
+        private boolean actorInitialized = false;
+
+        private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<ShardIdentifier, String> peerAddresses) {
@@ -474,11 +492,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         boolean isShardInitialized() {
-            return shardInitialized;
+            return getActor() != null && actorInitialized;
+        }
+
+        void setActorInitialized() {
+            this.actorInitialized = true;
+
+            for(Runnable runnable: runnablesOnInitialized) {
+                runnable.run();
+            }
+
+            runnablesOnInitialized.clear();
         }
 
-        void setShardInitialized(boolean shardInitialized) {
-            this.shardInitialized = shardInitialized;
+        void addRunnableOnInitialized(Runnable runnable) {
+            runnablesOnInitialized.add(runnable);
         }
     }
 
@@ -505,8 +533,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     static class SchemaContextModules implements Serializable {
-        private static final long serialVersionUID = 1L;
-
         private final Set<String> modules;
 
         SchemaContextModules(Set<String> modules){