+ private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
+ boolean removeShardOnFailure) {
+ shardReplicaOperationsInProgress.remove(shardName);
+
+ if(removeShardOnFailure) {
+ ShardInformation shardInfo = localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+ new RuntimeException(message, failure)), getSelf());
+ }
+
+ private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
+ String leaderPath, boolean removeShardOnFailure) {
+ String shardName = shardInfo.getShardName();
+ shardReplicaOperationsInProgress.remove(shardName);
+
+ LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+
+ // Make the local shard voting capable
+ shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+ shardInfo.setActiveMember(true);
+ persistShardList();
+
+ mBean.addLocalShard(shardInfo.getShardId().toString());
+ sender.tell(new akka.actor.Status.Success(null), getSelf());
+ } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+ sendLocalReplicaAlreadyExistsReply(shardName, sender);
+ } else {
+ LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
+ persistenceId(), shardName, replyMsg.getStatus());
+
+ Exception failure;
+ switch (replyMsg.getStatus()) {
+ case TIMEOUT:
+ failure = new TimeoutException(String.format(
+ "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+ "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+ leaderPath, shardName));
+ break;
+ case NO_LEADER:
+ failure = createNoShardLeaderException(shardInfo.getShardId());
+ break;
+ default :
+ failure = new RuntimeException(String.format(
+ "AddServer request to leader %s for shard %s failed with status %s",
+ leaderPath, shardName, replyMsg.getStatus()));
+ }
+
+ onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
+ }
+ }
+
+ private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
+ String shardName = shardReplicaMsg.getShardName();
+
+ // verify the local shard replica is available in the controller node
+ if (!localShards.containsKey(shardName)) {
+ String msg = String.format("Local shard %s does not", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ return;
+ }
+ // call RemoveShard for the shardName
+ getSender().tell(new akka.actor.Status.Success(true), getSelf());
+ return;
+ }
+
+ private void persistShardList() {
+ List<String> shardList = new ArrayList<>(localShards.keySet());
+ for (ShardInformation shardInfo : localShards.values()) {
+ if (!shardInfo.isActiveMember()) {
+ shardList.remove(shardInfo.getShardName());
+ }
+ }
+ LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+ saveSnapshot(new ShardManagerSnapshot(shardList));
+ }
+
+ private void handleShardRecovery(SnapshotOffer offer) {
+ LOG.debug ("{}: in handleShardRecovery", persistenceId());
+ ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+ String currentMember = cluster.getCurrentMemberName();
+ Set<String> configuredShardList =
+ new HashSet<>(configuration.getMemberShardNames(currentMember));
+ for (String shard : snapshot.getShardList()) {
+ if (!configuredShardList.contains(shard)) {
+ // add the current member as a replica for the shard
+ LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+ configuration.addMemberReplicaForShard(shard, currentMember);
+ } else {
+ configuredShardList.remove(shard);
+ }
+ }
+ for (String shard : configuredShardList) {
+ // remove the member as a replica for the shard
+ LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+ configuration.removeMemberReplicaForShard(shard, currentMember);
+ }
+ }
+
+ private static class ForwardedAddServerPrimaryShardFound {
+ String shardName;
+ RemotePrimaryShardFound primaryFound;
+
+ ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
+ this.shardName = shardName;
+ this.primaryFound = primaryFound;
+ }
+ }
+
+ private static class ForwardedAddServerReply {
+ ShardInformation shardInfo;
+ AddServerReply addServerReply;
+ String leaderPath;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+ boolean removeShardOnFailure) {
+ this.shardInfo = shardInfo;
+ this.addServerReply = addServerReply;
+ this.leaderPath = leaderPath;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }
+ }
+
+ private static class ForwardedAddServerFailure {
+ String shardName;
+ String failureMessage;
+ Throwable failure;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+ boolean removeShardOnFailure) {
+ this.shardName = shardName;
+ this.failureMessage = failureMessage;
+ this.failure = failure;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }
+ }
+
+ @VisibleForTesting
+ protected static class ShardInformation {