+ private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
+ final String shardName = shardReplicaMsg.getShardName();
+
+ LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+ // verify the shard with the specified name is present in the cluster configuration
+ if (!(this.configuration.isShardConfigured(shardName))) {
+ String msg = String.format("No module configuration exists for shard %s", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ return;
+ }
+
+ // Create the localShard
+ if (schemaContext == null) {
+ String msg = String.format(
+ "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ return;
+ }
+
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+ }
+
+ });
+ }
+
+ private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
+ String msg = String.format("Local shard %s already exists", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+ }
+
+ private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardInformation shardInfo;
+ final boolean removeShardOnFailure;
+ ShardInformation existingShardInfo = localShards.get(shardName);
+ if(existingShardInfo == null) {
+ removeShardOnFailure = true;
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+ DisableElectionsRaftPolicy.class.getName()).build();
+
+ shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setActiveMember(false);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ } else {
+ removeShardOnFailure = false;
+ shardInfo = existingShardInfo;
+ }
+
+ String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+
+ //inform ShardLeader to add this shard as a replica by sending an AddServer message
+ LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+ response.getPrimaryPath(), shardInfo.getShardId());
+
+ Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
+ duration());
+ Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+ new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object addServerResponse) {
+ if (failure != null) {
+ LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+ response.getPrimaryPath(), shardName, failure);
+
+ String msg = String.format("AddServer request to leader %s for shard %s failed",
+ response.getPrimaryPath(), shardName);
+ self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
+ } else {
+ self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
+ response.getPrimaryPath(), removeShardOnFailure), sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
+ boolean removeShardOnFailure) {
+ shardReplicaOperationsInProgress.remove(shardName);
+
+ if(removeShardOnFailure) {
+ ShardInformation shardInfo = localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+ new RuntimeException(message, failure)), getSelf());
+ }
+
+ private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
+ String leaderPath, boolean removeShardOnFailure) {
+ String shardName = shardInfo.getShardName();
+ shardReplicaOperationsInProgress.remove(shardName);
+
+ LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+
+ // Make the local shard voting capable
+ shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+ shardInfo.setActiveMember(true);
+ persistShardList();
+
+ mBean.addLocalShard(shardInfo.getShardId().toString());
+ sender.tell(new akka.actor.Status.Success(null), getSelf());
+ } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+ sendLocalReplicaAlreadyExistsReply(shardName, sender);
+ } else {
+ LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
+ persistenceId(), shardName, replyMsg.getStatus());
+
+ Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
+
+ onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
+ }
+ }
+
+ private Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
+ String leaderPath, ShardIdentifier shardId) {
+ Exception failure;
+ switch (serverChangeStatus) {
+ case TIMEOUT:
+ failure = new TimeoutException(String.format(
+ "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+ "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+ leaderPath, shardId.getShardName()));
+ break;
+ case NO_LEADER:
+ failure = createNoShardLeaderException(shardId);
+ break;
+ case NOT_SUPPORTED:
+ failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
+ serverChange.getSimpleName(), shardId.getShardName()));
+ break;
+ default :
+ failure = new RuntimeException(String.format(
+ "%s request to leader %s for shard %s failed with status %s",
+ serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
+ }
+ return failure;
+ }
+
+ private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+ LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+ findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
+ shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+ getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ }
+ });
+ }
+
+ private void persistShardList() {
+ List<String> shardList = new ArrayList<>(localShards.keySet());
+ for (ShardInformation shardInfo : localShards.values()) {
+ if (!shardInfo.isActiveMember()) {
+ shardList.remove(shardInfo.getShardName());
+ }
+ }
+ LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+ saveSnapshot(updateShardManagerSnapshot(shardList));
+ }
+
+ private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
+ currentSnapshot = new ShardManagerSnapshot(shardList);
+ return currentSnapshot;
+ }
+
+ private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
+ currentSnapshot = snapshot;
+
+ LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
+
+ String currentMember = cluster.getCurrentMemberName();
+ Set<String> configuredShardList =
+ new HashSet<>(configuration.getMemberShardNames(currentMember));
+ for (String shard : currentSnapshot.getShardList()) {
+ if (!configuredShardList.contains(shard)) {
+ // add the current member as a replica for the shard
+ LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+ configuration.addMemberReplicaForShard(shard, currentMember);
+ } else {
+ configuredShardList.remove(shard);
+ }
+ }
+ for (String shard : configuredShardList) {
+ // remove the member as a replica for the shard
+ LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+ configuration.removeMemberReplicaForShard(shard, currentMember);
+ }
+ }
+
+ private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
+ LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+ persistenceId());
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+ }
+
+ private static class ForwardedAddServerReply {
+ ShardInformation shardInfo;
+ AddServerReply addServerReply;
+ String leaderPath;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+ boolean removeShardOnFailure) {
+ this.shardInfo = shardInfo;
+ this.addServerReply = addServerReply;
+ this.leaderPath = leaderPath;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }
+ }
+
+ private static class ForwardedAddServerFailure {
+ String shardName;
+ String failureMessage;
+ Throwable failure;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+ boolean removeShardOnFailure) {
+ this.shardName = shardName;
+ this.failureMessage = failureMessage;
+ this.failure = failure;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }