+ private void onPrefixShardCreated(final PrefixShardCreated message) {
+ LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
+
+ final PrefixShardConfiguration config = message.getConfiguration();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
+ final String shardName = shardId.getShardName();
+
+ if (isPreviousShardActorStopInProgress(shardName, message)) {
+ return;
+ }
+
+ if (localShards.containsKey(shardName)) {
+ LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
+ final PrefixShardConfiguration existing =
+ configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
+
+ if (existing != null && existing.equals(config)) {
+ // we don't have to do nothing here
+ return;
+ }
+ }
+
+ doCreatePrefixShard(config, shardId, shardName);
+ }
+
+ private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
+ final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
+ if (stopOnComplete == null) {
+ return false;
+ }
+
+ LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
+ shardName, messageToDefer);
+ final ActorRef sender = getSender();
+ stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(final Throwable failure, final Boolean result) {
+ LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
+ self().tell(messageToDefer, sender);
+ }
+ });
+
+ return true;
+ }
+
+ private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId,
+ final String shardName) {
+ configuration.addPrefixShardConfiguration(config);
+
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.logicalStoreType(config.getPrefix().getDatastoreType())
+ .storeRoot(config.getPrefix().getRootIdentifier());
+ DatastoreContext shardDatastoreContext = builder.build();
+
+ final Map<String, String> peerAddresses = getPeerAddresses(shardName);
+ final boolean isActiveMember = true;
+
+ LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
+
+ final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+ shardDatastoreContext, Shard.builder(), peerAddressResolver);
+ info.setActiveMember(isActiveMember);
+ localShards.put(info.getShardName(), info);
+
+ if (schemaContext != null) {
+ info.setSchemaContext(schemaContext);
+ info.setActor(newShardActor(info));
+ }
+ }
+
+ private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+ LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
+
+ final DOMDataTreeIdentifier prefix = message.getPrefix();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+
+ configuration.removePrefixShardConfiguration(prefix);
+ removeShard(shardId);
+ }
+
+ private void doCreateShard(final CreateShard createShard) {
+ final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ final String shardName = moduleShardConfig.getShardName();