+ private void onShutDown() {
+ List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
+ for (ShardInformation info : localShards.values()) {
+ if (info.getActor() != null) {
+ LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
+
+ FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+ stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
+ }
+ }
+
+ LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
+
+ ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+ Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
+
+ combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Boolean> results) {
+ LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
+
+ self().tell(PoisonPill.getInstance(), self());
+
+ if(failure != null) {
+ LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
+ } else {
+ int nfailed = 0;
+ for(Boolean r: results) {
+ if(!r) {
+ nfailed++;
+ }
+ }
+
+ if(nfailed > 0) {
+ LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
+ }
+ }
+ }
+ }, dispatcher);
+ }
+
+ private void onWrappedShardResponse(WrappedShardResponse message) {
+ if (message.getResponse() instanceof RemoveServerReply) {
+ onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
+ message.getLeaderPath());
+ }
+ }
+
+ private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
+ String leaderPath) {
+ shardReplicaOperationsInProgress.remove(shardId);
+
+ LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+ shardId.getShardName());
+ originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+ } else {
+ LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+ persistenceId(), shardId, replyMsg.getStatus());
+
+ Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
+ leaderPath, shardId);
+ originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+ }
+ }
+