X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=c215321d0e9b74d31f2a0ebd76dd5866b1ff324c;hb=635b5b19764c8c99267f35690ca68b02cf1aea3a;hp=a46bb3dae8e5887202d66ad0ddfda51e5774537b;hpb=5c5c980e564d2b5f6cd26821ffd26997f59af260;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index a46bb3dae8..c215321d0e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -29,6 +29,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Sets; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -45,6 +46,8 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIde import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; +import org.opendaylight.controller.cluster.datastore.messages.CreateShard; +import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; @@ -54,6 +57,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; +import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; @@ -61,6 +65,7 @@ import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListe import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; +import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -111,6 +116,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final PrimaryShardInfoFutureCache primaryShardInfoCache; + private SchemaContext schemaContext; + /** */ protected ShardManager(ClusterWrapper cluster, Configuration configuration, @@ -183,13 +190,56 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ShardNotInitializedTimeout) { onShardNotInitializedTimeout((ShardNotInitializedTimeout)message); } else if(message instanceof ShardLeaderStateChanged) { - onLeaderStateChanged((ShardLeaderStateChanged)message); + onLeaderStateChanged((ShardLeaderStateChanged) message); + } else if(message instanceof SwitchShardBehavior){ + onSwitchShardBehavior((SwitchShardBehavior) message); + } else if(message instanceof CreateShard) { + onCreateShard((CreateShard)message); } else { unknownMessage(message); } } + private void onCreateShard(CreateShard createShard) { + Object reply; + try { + if(localShards.containsKey(createShard.getShardName())) { + throw new IllegalStateException(String.format("Shard with name %s already exists", + createShard.getShardName())); + } + + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), createShard.getShardName()); + Map peerAddresses = getPeerAddresses(createShard.getShardName(), createShard.getMemberNames()); + + LOG.debug("onCreateShard: shardId: {}, peerAddresses: {}", shardId, peerAddresses); + + DatastoreContext shardDatastoreContext = createShard.getDatastoreContext(); + if(shardDatastoreContext == null) { + shardDatastoreContext = datastoreContext; + } + + ShardInformation info = new ShardInformation(createShard.getShardName(), shardId, peerAddresses, + shardDatastoreContext, createShard.getShardPropsCreator()); + localShards.put(createShard.getShardName(), info); + + mBean.addLocalShard(shardId.toString()); + + if(schemaContext != null) { + info.setActor(newShardActor(schemaContext, info)); + } + + reply = new CreateShardReply(); + } catch (Exception e) { + LOG.error("onCreateShard failed", e); + reply = new akka.actor.Status.Failure(e); + } + + if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) { + getSender().tell(reply, getSelf()); + } + } + private void checkReady(){ if (isReadyWithLeaderId()) { LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", @@ -478,13 +528,27 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onSwitchShardBehavior(SwitchShardBehavior message) { + ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build(); + + ShardInformation shardInformation = localShards.get(identifier.getShardName()); + + if(shardInformation != null && shardInformation.getActor() != null) { + shardInformation.getActor().tell( + new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf()); + } else { + LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available", + message.getShardName(), message.getNewState()); + } + } + /** * Notifies all the local shards of a change in the schema context * * @param message */ private void updateSchemaContext(final Object message) { - final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); + schemaContext = ((UpdateSchemaContext) message).getSchemaContext(); LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size()); @@ -505,8 +569,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { @VisibleForTesting protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { - return getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext) + return getContext().actorOf(info.newProps(schemaContext) .withDispatcher(shardDispatcherPath), info.getShardId().toString()); } @@ -596,16 +659,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { List memberShardNames = this.configuration.getMemberShardNames(memberName); + ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator(); List localShardActorNames = new ArrayList<>(); for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); Map peerAddresses = getPeerAddresses(shardName); localShardActorNames.add(shardId.toString()); - localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses)); + localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext, + shardPropsCreator)); } - mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type, + mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type, datastoreContext.getDataStoreMXBeanType(), localShardActorNames); + + mBean.setShardManager(this); } /** @@ -614,16 +681,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { * @param shardName * @return */ - private Map getPeerAddresses(String shardName){ + private Map getPeerAddresses(String shardName) { + return getPeerAddresses(shardName, configuration.getMembersFromShardName(shardName)); + } - Map peerAddresses = new HashMap<>(); + private Map getPeerAddresses(String shardName, Collection members) { - List members = this.configuration.getMembersFromShardName(shardName); + Map peerAddresses = new HashMap<>(); String currentMemberName = this.cluster.getCurrentMemberName(); - for(String memberName : members){ - if(!currentMemberName.equals(memberName)){ + for(String memberName : members) { + if(!currentMemberName.equals(memberName)) { ShardIdentifier shardId = getShardIdentifier(memberName, shardName); String path = getShardActorPath(shardName, currentMemberName); peerAddresses.put(shardId.toString(), path); @@ -677,11 +746,21 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private String leaderId; private short leaderVersion; + private final DatastoreContext datastoreContext; + private final ShardPropsCreator shardPropsCreator; + private ShardInformation(String shardName, ShardIdentifier shardId, - Map peerAddresses) { + Map peerAddresses, DatastoreContext datastoreContext, + ShardPropsCreator shardPropsCreator) { this.shardName = shardName; this.shardId = shardId; this.peerAddresses = peerAddresses; + this.datastoreContext = datastoreContext; + this.shardPropsCreator = shardPropsCreator; + } + + Props newProps(SchemaContext schemaContext) { + return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext); } String getShardName() {