X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=c24156f506211c1ae7a61786da8809e71cd966b6;hb=refs%2Fchanges%2F01%2F26801%2F1;hp=5f59672ed987b4f8cb8b47cb4e82da67ca4b4f69;hpb=bbc8a16efdc6bfa0d742b73af3374a33a12e2a1c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 5f59672ed9..c24156f506 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -29,6 +29,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -37,6 +38,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; +import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; @@ -45,6 +47,8 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; +import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; @@ -113,6 +117,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final PrimaryShardInfoFutureCache primaryShardInfoCache; + private SchemaContext schemaContext; + /** */ protected ShardManager(ClusterWrapper cluster, Configuration configuration, @@ -188,12 +194,53 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { onLeaderStateChanged((ShardLeaderStateChanged) message); } else if(message instanceof SwitchShardBehavior){ onSwitchShardBehavior((SwitchShardBehavior) message); + } else if(message instanceof CreateShard) { + onCreateShard((CreateShard)message); } else { unknownMessage(message); } } + private void onCreateShard(CreateShard createShard) { + Object reply; + try { + if(localShards.containsKey(createShard.getShardName())) { + throw new IllegalStateException(String.format("Shard with name %s already exists", + createShard.getShardName())); + } + + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), createShard.getShardName()); + Map peerAddresses = getPeerAddresses(createShard.getShardName(), createShard.getMemberNames()); + + LOG.debug("onCreateShard: shardId: {}, peerAddresses: {}", shardId, peerAddresses); + + DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); + if(shardDatastoreContext == null) { + shardDatastoreContext = datastoreContext; + } + + ShardInformation info = new ShardInformation(createShard.getShardName(), shardId, peerAddresses, + shardDatastoreContext, createShard.getShardPropsCreator()); + localShards.put(createShard.getShardName(), info); + + mBean.addLocalShard(shardId.toString()); + + if(schemaContext != null) { + info.setActor(newShardActor(schemaContext, info)); + } + + reply = new CreateShardReply(); + } catch (Exception e) { + LOG.error("onCreateShard failed", e); + reply = new akka.actor.Status.Failure(e); + } + + if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { + getSender().tell(reply, getSelf()); + } + } + private void checkReady(){ if (isReadyWithLeaderId()) { LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", @@ -502,7 +549,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param message */ private void updateSchemaContext(final Object message) { - final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); + schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size()); @@ -523,8 +570,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @VisibleForTesting protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { - return getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext) + return getContext().actorOf(info.newProps(schemaContext) .withDispatcher(shardDispatcherPath), info.getShardId().toString()); } @@ -611,15 +657,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { */ private void createLocalShards() { String memberName = this.cluster.getCurrentMemberName(); - List memberShardNames = - this.configuration.getMemberShardNames(memberName); + Collection memberShardNames = this.configuration.getMemberShardNames(memberName); + ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator(); List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); localShardActorNames.add(shardId.toString()); - localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); + localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext, + shardPropsCreator)); } mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, @@ -634,16 +681,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName * @return */ - private Map getPeerAddresses(String shardName){ + private Map getPeerAddresses(String shardName) { + return getPeerAddresses(shardName, configuration.getMembersFromShardName(shardName)); + } - Map peerAddresses = new HashMap<>(); + private Map getPeerAddresses(String shardName, Collection members) { - List members = this.configuration.getMembersFromShardName(shardName); + Map peerAddresses = new HashMap<>(); String currentMemberName = this.cluster.getCurrentMemberName(); - for(String memberName : members){ - if(!currentMemberName.equals(memberName)){ + for(String memberName : members) { + if(!currentMemberName.equals(memberName)) { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); String path = getShardActorPath(shardName, currentMemberName); peerAddresses.put(shardId.toString(), path); @@ -697,11 +746,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private String leaderId; private short leaderVersion; + private final DatastoreContext datastoreContext; + private final ShardPropsCreator shardPropsCreator; + private ShardInformation(String shardName, ShardIdentifier shardId, - Map peerAddresses) { + Map peerAddresses, DatastoreContext datastoreContext, + ShardPropsCreator shardPropsCreator) { this.shardName = shardName; this.shardId = shardId; this.peerAddresses = peerAddresses; + this.datastoreContext = datastoreContext; + this.shardPropsCreator = shardPropsCreator; + } + + Props newProps(SchemaContext schemaContext) { + return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext); } String getShardName() {