+ private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
+ final String primaryPath, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+ final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+ //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+ LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ primaryPath, shardId);
+
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
+ Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+ new RemoveServer(shardId.toString()), removeServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
+ shardName, failure);
+
+ // FAILURE
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
+ failure)), self());
+ } else {
+ // SUCCESS
+ self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void onShardReplicaRemoved(final ServerRemoved message) {
+ removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void removeShard(final ShardIdentifier shardId) {
+ final String shardName = shardId.getShardName();
+ final ShardInformation shardInformation = localShards.remove(shardName);