- private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
- LOG.debug("{}, ShardManager {} received config changed {}",
- cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries());
-
- final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
-
- final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
- changedConfig.values().stream().collect(
- Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity()));
-
- resolveConfig(newConfig);
- }
-
- private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
- LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}",
- cluster.getCurrentMemberName(), persistenceId, newConfig);
-
- newConfig.forEach((prefix, config) ->
- LOG.debug("{} ShardManager : {}, received shard config "
- + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config));
-
- final SetView<DOMDataTreeIdentifier> removedConfigs =
- Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet());
-
- // resolve removals
-
- resolveRemovals(removedConfigs);
-
- final SetView<DOMDataTreeIdentifier> addedConfigs =
- Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet());
- // resolve additions
-
- resolveAdditions(addedConfigs, newConfig);
- // iter through to update existing shards, either start/stop replicas or update the shard
- // to check for more peers
- resolveUpdates(Collections.emptySet());
- }
-
- private void resolveRemovals(final Set<DOMDataTreeIdentifier> removedConfigs) {
- LOG.debug("{} ShardManager : {}, resolving removed configs : {}",
- cluster.getCurrentMemberName(), persistenceId, removedConfigs);
-
- removedConfigs.forEach(id -> doRemovePrefixedShard(id));
- }
-
- private void resolveAdditions(final Set<DOMDataTreeIdentifier> addedConfigs,
- final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> configs) {
- LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs);
-
- addedConfigs.forEach(id -> doCreatePrefixedShard(configs.get(id)));
- }
-
- private void resolveUpdates(Set<DOMDataTreeIdentifier> maybeUpdatedConfigs) {
- LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs);
- }
-
- private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) {
- LOG.debug("{} ShardManager : {}, removing prefix shard: {}",
- cluster.getCurrentMemberName(), persistenceId, prefix);
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
- final ShardInformation shard = localShards.remove(shardId.getShardName());
-
- configuration.removePrefixShardConfiguration(prefix);
-
- if (shard == null) {
- LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix);
- return;
- }
-
- if (shard.getActor() != null) {
- LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
- shard.getActor().tell(Shutdown.INSTANCE, self());
- }
- LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(),
- persistenceId(), shardId.getShardName());
- persistShardList();
- }
-
- private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
- String leaderPath) {