+ private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
+ final ActorRef sender) {
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+ final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+ //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+ LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ primaryPath, shardId);
+
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
+ duration());
+ Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+ new RemoveServer(shardId.toString()), removeServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+ primaryPath, shardName);
+
+ LOG.debug ("{}: {}", persistenceId(), msg, failure);
+
+ // FAILURE
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ // SUCCESS
+ self().tell(new WrappedShardResponse(shardName, response), sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+