X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=e861165c6ba2592d7d5d14d98e5fb34591344d85;hb=36f3397f35d771f687173108597c5c76feba667f;hp=e68628dbf5c1fc992afb30ae986fff8ed8f6eef1;hpb=8de365b86ee7b65ee201166be85142b27ffd7295;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index e68628dbf5..e861165c6b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -24,6 +24,8 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.RecoveryFailure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; @@ -42,7 +44,6 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; - import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -94,7 +95,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be * configuration or operational */ - private ShardManager(String type, ClusterWrapper cluster, Configuration configuration, + protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext) { this.type = Preconditions.checkNotNull(type, "type should not be null"); @@ -105,7 +106,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); - //createLocalShards(null); + createLocalShards(); } public static Props props(final String type, @@ -123,8 +124,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(Object message) throws Exception { if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { - findPrimary( - FindPrimary.fromSerializable(message)); + findPrimary(FindPrimary.fromSerializable(message)); } else if(message instanceof FindLocalShard){ findLocalShard((FindLocalShard) message); } else if (message instanceof UpdateSchemaContext) { @@ -160,16 +160,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { markShardAsInitialized(shardId.getShardName()); } - @VisibleForTesting protected void markShardAsInitialized(String shardName) { + private void markShardAsInitialized(String shardName) { LOG.debug("Initializing shard [{}]", shardName); ShardInformation shardInformation = localShards.get(shardName); if (shardInformation != null) { - shardInformation.setShardInitialized(true); + shardInformation.setActorInitialized(); } } - @Override protected void handleRecover(Object message) throws Exception { - + @Override + protected void handleRecover(Object message) throws Exception { if(message instanceof SchemaContextModules){ SchemaContextModules msg = (SchemaContextModules) message; knownModules.clear(); @@ -186,23 +186,41 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void findLocalShard(FindLocalShard message) { - ShardInformation shardInformation = localShards.get(message.getShardName()); + final ShardInformation shardInformation = localShards.get(message.getShardName()); if(shardInformation == null){ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf()); return; } - sendResponse(shardInformation, new LocalShardFound(shardInformation.getActor())); + sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier() { + @Override + public Object get() { + return new LocalShardFound(shardInformation.getActor()); + } + }); } - private void sendResponse(ShardInformation shardInformation, Object message) { + private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized, + final Supplier messageSupplier) { if (!shardInformation.isShardInitialized()) { - getSender().tell(new ActorNotInitialized(), getSelf()); + if(waitUntilInitialized) { + final ActorRef sender = getSender(); + final ActorRef self = self(); + shardInformation.addRunnableOnInitialized(new Runnable() { + @Override + public void run() { + sender.tell(messageSupplier.get(), self); + } + }); + } else { + getSender().tell(new ActorNotInitialized(), getSelf()); + } + return; } - getSender().tell(message, getSelf()); + getSender().tell(messageSupplier.get(), getSelf()); } private void memberRemoved(ClusterEvent.MemberRemoved message) { @@ -246,12 +264,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { persist(new SchemaContextModules(newModules), new Procedure() { - @Override public void apply(SchemaContextModules param) throws Exception { + @Override + public void apply(SchemaContextModules param) throws Exception { LOG.info("Sending new SchemaContext to Shards"); - if (localShards.size() == 0) { - createLocalShards(schemaContext); - } else { - for (ShardInformation info : localShards.values()) { + for (ShardInformation info : localShards.values()) { + if(info.getActor() == null) { + info.setActor(getContext().actorOf(Shard.props(info.getShardId(), + info.getPeerAddresses(), datastoreContext, schemaContext), + info.getShardId().toString())); + } else { info.getActor().tell(message, getSelf()); } } @@ -265,14 +286,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void findPrimary(FindPrimary message) { - final ActorRef sender = getSender(); String shardName = message.getShardName(); // First see if the there is a local replica for the shard - ShardInformation info = localShards.get(shardName); + final ShardInformation info = localShards.get(shardName); if (info != null) { - ActorPath shardPath = info.getActorPath(); - sendResponse(info, new PrimaryFound(shardPath.toString()).toSerializable()); + sendResponse(info, message.isWaitUntilInitialized(), new Supplier() { + @Override + public Object get() { + return new PrimaryFound(info.getActorPath().toString()).toSerializable(); + } + }); + return; } @@ -331,7 +356,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * runs * */ - private void createLocalShards(SchemaContext schemaContext) { + private void createLocalShards() { String memberName = this.cluster.getCurrentMemberName(); List memberShardNames = this.configuration.getMemberShardNames(memberName); @@ -340,11 +365,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); - ActorRef actor = getContext() - .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext), - shardId.toString()); localShardActorNames.add(shardId.toString()); - localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses)); + localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); } mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type, @@ -398,64 +420,93 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } - @Override public String persistenceId() { + @Override + public String persistenceId() { return "shard-manager-" + type; } - @VisibleForTesting public Collection getKnownModules() { + @VisibleForTesting + Collection getKnownModules() { return knownModules; } private class ShardInformation { + private final ShardIdentifier shardId; private final String shardName; - private final ActorRef actor; - private final ActorPath actorPath; + private ActorRef actor; + private ActorPath actorPath; private final Map peerAddresses; - private boolean shardInitialized = false; //flag that determines if the actor is ready for business - private ShardInformation(String shardName, ActorRef actor, - Map peerAddresses) { + // flag that determines if the actor is ready for business + private boolean actorInitialized = false; + + private final List runnablesOnInitialized = Lists.newArrayList(); + + private ShardInformation(String shardName, ShardIdentifier shardId, + Map peerAddresses) { this.shardName = shardName; - this.actor = actor; - this.actorPath = actor.path(); + this.shardId = shardId; this.peerAddresses = peerAddresses; } - public String getShardName() { + String getShardName() { return shardName; } - public ActorRef getActor(){ + ActorRef getActor(){ return actor; } - public ActorPath getActorPath() { + ActorPath getActorPath() { return actorPath; } - public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ + void setActor(ActorRef actor) { + this.actor = actor; + this.actorPath = actor.path(); + } + + ShardIdentifier getShardId() { + return shardId; + } + + Map getPeerAddresses() { + return peerAddresses; + } + + void updatePeerAddress(ShardIdentifier peerId, String peerAddress){ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); if(peerAddresses.containsKey(peerId)){ peerAddresses.put(peerId, peerAddress); - if(LOG.isDebugEnabled()) { - LOG.debug( - "Sending PeerAddressResolved for peer {} with address {} to {}", - peerId, peerAddress, actor.path()); - } - actor - .tell(new PeerAddressResolved(peerId, peerAddress), - getSelf()); + if(actor != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}", + peerId, peerAddress, actor.path()); + } + + actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf()); + } } } - public boolean isShardInitialized() { - return shardInitialized; + boolean isShardInitialized() { + return getActor() != null && actorInitialized; + } + + void setActorInitialized() { + this.actorInitialized = true; + + for(Runnable runnable: runnablesOnInitialized) { + runnable.run(); + } + + runnablesOnInitialized.clear(); } - public void setShardInitialized(boolean shardInitialized) { - this.shardInitialized = shardInitialized; + void addRunnableOnInitialized(Runnable runnable) { + runnablesOnInitialized.add(runnable); } }