+ private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
+ checkLocalShardExists(shardName, sender);
+
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+ String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+ DisableElectionsRaftPolicy.class.getName()).build();
+
+ final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
+ getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setShardActiveMember(false);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+
+ //inform ShardLeader to add this shard as a replica by sending an AddServer message
+ LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+ response.getPrimaryPath(), shardId);
+
+ Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
+ Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+ new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object addServerResponse) {
+ if (failure != null) {
+ LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+ response.getPrimaryPath(), shardName, failure);
+
+ // Remove the shard
+ localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("AddServer request to leader %s for shard %s failed",
+ response.getPrimaryPath(), shardName), failure)), getSelf());
+ } else {
+ AddServerReply reply = (AddServerReply)addServerResponse;
+ onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).
+ getDispatcher(Dispatchers.DispatcherType.Client));
+ return;
+ }
+
+ private void onAddServerReply (String shardName, ShardInformation shardInfo,
+ AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+ LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+
+ // Make the local shard voting capable
+ shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+ shardInfo.setShardActiveMember(true);
+ persistShardList();
+
+ mBean.addLocalShard(shardInfo.getShardId().toString());
+ sender.tell(new akka.actor.Status.Success(true), getSelf());
+ } else {
+ LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard",
+ persistenceId(), shardName, replyMsg.getStatus());
+
+ //remove the local replica created
+ localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+ switch (replyMsg.getStatus()) {
+ case TIMEOUT:
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+ leaderPath, shardName))), getSelf());
+ break;
+ case NO_LEADER:
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+ "There is no shard leader available for shard %s", shardName))), getSelf());
+ break;
+ default :
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+ "AddServer request to leader %s for shard %s failed with status %s",
+ leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+ }
+ }
+ }
+
+ private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
+ String shardName = shardReplicaMsg.getShardName();
+
+ // verify the local shard replica is available in the controller node
+ if (!localShards.containsKey(shardName)) {
+ String msg = String.format("Local shard %s does not", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ return;
+ }
+ // call RemoveShard for the shardName
+ getSender().tell(new akka.actor.Status.Success(true), getSelf());
+ return;
+ }
+
+ private void persistShardList() {
+ List<String> shardList = new ArrayList(localShards.keySet());
+ for (ShardInformation shardInfo : localShards.values()) {
+ if (!shardInfo.isShardActiveMember()) {
+ shardList.remove(shardInfo.getShardName());
+ }
+ }
+ LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+ saveSnapshot(new ShardManagerSnapshot(shardList));
+ }
+
+ private void handleShardRecovery(SnapshotOffer offer) {
+ LOG.debug ("{}: in handleShardRecovery", persistenceId());
+ ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+ String currentMember = cluster.getCurrentMemberName();
+ Set<String> configuredShardList =
+ new HashSet<>(configuration.getMemberShardNames(currentMember));
+ for (String shard : snapshot.getShardList()) {
+ if (!configuredShardList.contains(shard)) {
+ // add the current member as a replica for the shard
+ LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+ configuration.addMemberReplicaForShard(shard, currentMember);
+ } else {
+ configuredShardList.remove(shard);
+ }
+ }
+ for (String shard : configuredShardList) {
+ // remove the member as a replica for the shard
+ LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+ configuration.removeMemberReplicaForShard(shard, currentMember);
+ }
+ }
+
+ @VisibleForTesting
+ protected static class ShardInformation {