import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
LOG.debug("Initializing shard [{}]", shardName);
ShardInformation shardInformation = localShards.get(shardName);
if (shardInformation != null) {
- shardInformation.setShardInitialized(true);
+ shardInformation.setActorInitialized();
}
}
return;
}
- sendResponse(shardInformation, new Supplier<Object>() {
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
@Override
public Object get() {
return new LocalShardFound(shardInformation.getActor());
});
}
- private void sendResponse(ShardInformation shardInformation, Supplier<Object> messageSupplier) {
- if (shardInformation.getActor() == null || !shardInformation.isShardInitialized()) {
- getSender().tell(new ActorNotInitialized(), getSelf());
+ private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
+ final Supplier<Object> messageSupplier) {
+ if (!shardInformation.isShardInitialized()) {
+ if(waitUntilInitialized) {
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
+ shardInformation.addRunnableOnInitialized(new Runnable() {
+ @Override
+ public void run() {
+ sender.tell(messageSupplier.get(), self);
+ }
+ });
+ } else {
+ getSender().tell(new ActorNotInitialized(), getSelf());
+ }
+
return;
}
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
if (info != null) {
- sendResponse(info, new Supplier<Object>() {
+ sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
@Override
public Object get() {
return new PrimaryFound(info.getActorPath().toString()).toSerializable();
private ActorRef actor;
private ActorPath actorPath;
private final Map<ShardIdentifier, String> peerAddresses;
- private boolean shardInitialized = false; // flag that determines if the actor is ready for business
+
+ // flag that determines if the actor is ready for business
+ private boolean actorInitialized = false;
+
+ private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
private ShardInformation(String shardName, ShardIdentifier shardId,
Map<ShardIdentifier, String> peerAddresses) {
}
boolean isShardInitialized() {
- return shardInitialized;
+ return getActor() != null && actorInitialized;
+ }
+
+ void setActorInitialized() {
+ this.actorInitialized = true;
+
+ for(Runnable runnable: runnablesOnInitialized) {
+ runnable.run();
+ }
+
+ runnablesOnInitialized.clear();
}
- void setShardInitialized(boolean shardInitialized) {
- this.shardInitialized = shardInitialized;
+ void addRunnableOnInitialized(Runnable runnable) {
+ runnablesOnInitialized.add(runnable);
}
}
}
static class SchemaContextModules implements Serializable {
- private static final long serialVersionUID = 1L;
-
private final Set<String> modules;
SchemaContextModules(Set<String> modules){