+ private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
+ LOG.debug("{}, ShardManager {} received config changed {}",
+ cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries());
+
+ final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
+
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
+ changedConfig.values().stream().collect(
+ Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity()));
+
+ resolveConfig(newConfig);
+ }
+
+ private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
+ LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}",
+ cluster.getCurrentMemberName(), persistenceId, newConfig);
+
+ newConfig.forEach((prefix, config) ->
+ LOG.debug("{} ShardManager : {}, received shard config "
+ + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config));
+
+ final SetView<DOMDataTreeIdentifier> removedConfigs =
+ Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet());
+
+ // resolve removals
+
+ resolveRemovals(removedConfigs);
+
+ final SetView<DOMDataTreeIdentifier> addedConfigs =
+ Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet());
+ // resolve additions
+
+ resolveAdditions(addedConfigs, newConfig);
+ // iter through to update existing shards, either start/stop replicas or update the shard
+ // to check for more peers
+ resolveUpdates(Collections.emptySet());
+ }
+
+ private void resolveRemovals(final Set<DOMDataTreeIdentifier> removedConfigs) {
+ LOG.debug("{} ShardManager : {}, resolving removed configs : {}",
+ cluster.getCurrentMemberName(), persistenceId, removedConfigs);
+
+ removedConfigs.forEach(id -> doRemovePrefixedShard(id));
+ }
+
+ private void resolveAdditions(final Set<DOMDataTreeIdentifier> addedConfigs,
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> configs) {
+ LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs);
+
+ addedConfigs.stream().filter(identifier
+ -> identifier
+ .getDatastoreType().equals(
+ ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType())))
+ .forEach(id -> doCreatePrefixedShard(configs.get(id)));
+ }
+
+ private void resolveUpdates(Set<DOMDataTreeIdentifier> maybeUpdatedConfigs) {
+ LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs);
+ }
+
+ private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) {
+ LOG.debug("{} ShardManager : {}, removing prefix shard: {}",
+ cluster.getCurrentMemberName(), persistenceId, prefix);
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
+ final ShardInformation shard = localShards.remove(shardId.getShardName());
+
+ configuration.removePrefixShardConfiguration(prefix);
+
+ if (shard == null) {
+ LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix);
+ return;
+ }
+
+ if (shard.getActor() != null) {
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
+ shard.getActor().tell(Shutdown.INSTANCE, self());
+ }
+ LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(),
+ persistenceId(), shardId.getShardName());
+ persistShardList();
+ }
+