+ private void persistShardList() {
+ List<String> shardList = new ArrayList(localShards.keySet());
+ for (ShardInformation shardInfo : localShards.values()) {
+ if (!shardInfo.isShardActiveMember()) {
+ shardList.remove(shardInfo.getShardName());
+ }
+ }
+ LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+ saveSnapshot(new ShardManagerSnapshot(shardList));
+ }
+
+ private void handleShardRecovery(SnapshotOffer offer) {
+ LOG.debug ("{}: in handleShardRecovery", persistenceId());
+ ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+ String currentMember = cluster.getCurrentMemberName();
+ Set<String> configuredShardList =
+ new HashSet<>(configuration.getMemberShardNames(currentMember));
+ for (String shard : snapshot.getShardList()) {
+ if (!configuredShardList.contains(shard)) {
+ // add the current member as a replica for the shard
+ LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+ configuration.addMemberReplicaForShard(shard, currentMember);
+ } else {
+ configuredShardList.remove(shard);
+ }
+ }
+ for (String shard : configuredShardList) {
+ // remove the member as a replica for the shard
+ LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+ configuration.removeMemberReplicaForShard(shard, currentMember);
+ }
+ }
+