X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=e861165c6ba2592d7d5d14d98e5fb34591344d85;hb=36f3397f35d771f687173108597c5c76feba667f;hp=157f1cb3771cd71ddd1ddf14d2541bef3a0aefc3;hpb=73e969cf365dd78772596c71e940ae44fe2f22d3;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 157f1cb377..e861165c6b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -25,6 +25,7 @@ import akka.persistence.RecoveryFailure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.collect.Lists; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; @@ -163,7 +164,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Initializing shard [{}]", shardName); ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { - shardInformation.setShardInitialized(true); + shardInformation.setActorInitialized(); } } @@ -192,7 +193,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return; } - sendResponse(shardInformation, new Supplier() { + sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier() { @Override public Object get() { return new LocalShardFound(shardInformation.getActor()); @@ -200,9 +201,22 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - private void sendResponse(ShardInformation shardInformation, Supplier messageSupplier) { - if (shardInformation.getActor() == null || !shardInformation.isShardInitialized()) { - getSender().tell(new ActorNotInitialized(), getSelf()); + private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized, + final Supplier messageSupplier) { + if (!shardInformation.isShardInitialized()) { + if(waitUntilInitialized) { + final ActorRef sender = getSender(); + final ActorRef self = self(); + shardInformation.addRunnableOnInitialized(new Runnable() { + @Override + public void run() { + sender.tell(messageSupplier.get(), self); + } + }); + } else { + getSender().tell(new ActorNotInitialized(), getSelf()); + } + return; } @@ -277,7 +291,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // First see if the there is a local replica for the shard final ShardInformation info = localShards.get(shardName); if (info != null) { - sendResponse(info, new Supplier() { + sendResponse(info, message.isWaitUntilInitialized(), new Supplier() { @Override public Object get() { return new PrimaryFound(info.getActorPath().toString()).toSerializable(); @@ -422,7 +436,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private ActorRef actor; private ActorPath actorPath; private final Map peerAddresses; - private boolean shardInitialized = false; // flag that determines if the actor is ready for business + + // flag that determines if the actor is ready for business + private boolean actorInitialized = false; + + private final List runnablesOnInitialized = Lists.newArrayList(); private ShardInformation(String shardName, ShardIdentifier shardId, Map peerAddresses) { @@ -474,11 +492,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } boolean isShardInitialized() { - return shardInitialized; + return getActor() != null && actorInitialized; + } + + void setActorInitialized() { + this.actorInitialized = true; + + for(Runnable runnable: runnablesOnInitialized) { + runnable.run(); + } + + runnablesOnInitialized.clear(); } - void setShardInitialized(boolean shardInitialized) { - this.shardInitialized = shardInitialized; + void addRunnableOnInitialized(Runnable runnable) { + runnablesOnInitialized.add(runnable); } } @@ -505,8 +533,6 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } static class SchemaContextModules implements Serializable { - private static final long serialVersionUID = 1L; - private final Set modules; SchemaContextModules(Set modules){