import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
} else if (message instanceof SwitchShardBehavior) {
onSwitchShardBehavior((SwitchShardBehavior) message);
} else if (message instanceof CreateShard) {
- onCreateShard((CreateShard) message);
+ onCreateShard((CreateShard)message);
} else if (message instanceof AddShardReplica) {
onAddShardReplica((AddShardReplica) message);
+ } else if (message instanceof CreatePrefixedShard) {
+ onCreatePrefixedShard((CreatePrefixedShard) message);
+ } else if (message instanceof AddPrefixShardReplica) {
+ onAddPrefixShardReplica((AddPrefixShardReplica) message);
} else if (message instanceof ForwardedAddServerReply) {
- ForwardedAddServerReply msg = (ForwardedAddServerReply) message;
- onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure);
+ ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+ onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+ msg.removeShardOnFailure);
} else if (message instanceof ForwardedAddServerFailure) {
- ForwardedAddServerFailure msg = (ForwardedAddServerFailure) message;
+ ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
} else if (message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
} else {
int nfailed = 0;
- for (Boolean result: results) {
+ for (Boolean result : results) {
if (!result) {
nfailed++;
}
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+ LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
+
+ Object reply;
+ try {
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+ createPrefixedShard.getConfig().getPrefix());
+ if (localShards.containsKey(shardId.getShardName())) {
+ LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
+ reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
+ } else {
+ doCreatePrefixedShard(createPrefixedShard);
+ reply = new Status.Success(null);
+ }
+ } catch (final Exception e) {
+ LOG.error("{}: onCreateShard failed", persistenceId(), e);
+ reply = new Status.Failure(e);
+ }
+
+ if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(reply, getSelf());
+ }
+ }
+
@SuppressWarnings("checkstyle:IllegalCatch")
private void onCreateShard(CreateShard createShard) {
LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
}
}
- private void doCreateShard(CreateShard createShard) {
- ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
- String shardName = moduleShardConfig.getShardName();
+ private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+ final PrefixShardConfiguration config = createPrefixedShard.getConfig();
+
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+ createPrefixedShard.getConfig().getPrefix());
+ final String shardName = shardId.getShardName();
+
+ configuration.addPrefixShardConfiguration(config);
+
+ DatastoreContext shardDatastoreContext = createPrefixedShard.getContext();
+
+ if (shardDatastoreContext == null) {
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+ .storeRoot(config.getPrefix().getRootIdentifier());
+ shardDatastoreContext = builder.build();
+ } else {
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+ peerAddressResolver).build();
+ }
+
+ final boolean shardWasInRecoveredSnapshot = currentSnapshot != null
+ && currentSnapshot.getShardList().contains(shardName);
+
+ final Map<String, String> peerAddresses = Collections.emptyMap();
+ final boolean isActiveMember = true;
+ LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, peerAddresses, isActiveMember);
+
+ final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+ shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver);
+ info.setActiveMember(isActiveMember);
+ localShards.put(info.getShardName(), info);
+
+ if (schemaContext != null) {
+ info.setActor(newShardActor(schemaContext, info));
+ }
+ }
+
+ private void doCreateShard(final CreateShard createShard) {
+ final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ final String shardName = moduleShardConfig.getShardName();
configuration.addModuleShardConfiguration(moduleShardConfig);
return false;
}
+
+ // With this message the shard does NOT have to be preconfigured
+ // do a dynamic lookup if the shard exists somewhere and replicate
+ private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
+ final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
+
+ LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+ if (schemaContext == null) {
+ final String msg = String.format(
+ "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+ LOG.debug("{}: {}", persistenceId(), msg);
+ getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ return;
+ }
+
+ findPrimary(shardName,
+ new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ addShard(getShardName(), response, getSender());
+ }
+ }, getTargetActor());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+ }
+ }
+ );
+ }
+
private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();