onCreateShard((CreateShard)message);
} else if(message instanceof AddShardReplica){
onAddShardReplica((AddShardReplica)message);
+ } else if(message instanceof ForwardedAddServerReply) {
+ ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
+ onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
+ msg.removeShardOnFailure);
+ } else if(message instanceof ForwardedAddServerFailure) {
+ ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
+ onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
+ } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
+ ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
+ addShard(msg.shardName, msg.primaryFound, getSender());
} else if(message instanceof RemoveShardReplica){
onRemoveShardReplica((RemoveShardReplica)message);
} else if(message instanceof GetSnapshot) {
} else {
unknownMessage(message);
}
-
}
private void onGetSnapshot() {
private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
- LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
+ LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
// verify the shard with the specified name is present in the cluster configuration
if (!(this.configuration.isShardConfigured(shardName))) {
String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
} else {
if(response instanceof RemotePrimaryShardFound) {
- RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
- addShard (shardName, message, sender);
+ self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
+ (RemotePrimaryShardFound)response), sender);
} else if(response instanceof LocalPrimaryShardFound) {
sendLocalReplicaAlreadyExistsReply(shardName, sender);
} else {
futureObj.onComplete(new OnComplete<Object>() {
@Override
public void onComplete(Throwable failure, Object addServerResponse) {
- shardReplicaOperationsInProgress.remove(shardName);
if (failure != null) {
LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
- onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed",
- response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure);
+ String msg = String.format("AddServer request to leader %s for shard %s failed",
+ response.getPrimaryPath(), shardName);
+ self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
} else {
- AddServerReply reply = (AddServerReply)addServerResponse;
- onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure);
+ self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
+ response.getPrimaryPath(), removeShardOnFailure), sender);
}
}
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
boolean removeShardOnFailure) {
+ shardReplicaOperationsInProgress.remove(shardName);
+
if(removeShardOnFailure) {
ShardInformation shardInfo = localShards.remove(shardName);
if (shardInfo.getActor() != null) {
private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
String leaderPath, boolean removeShardOnFailure) {
String shardName = shardInfo.getShardName();
+ shardReplicaOperationsInProgress.remove(shardName);
+
LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
}
}
+ private static class ForwardedAddServerPrimaryShardFound {
+ String shardName;
+ RemotePrimaryShardFound primaryFound;
+
+ ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
+ this.shardName = shardName;
+ this.primaryFound = primaryFound;
+ }
+ }
+
+ private static class ForwardedAddServerReply {
+ ShardInformation shardInfo;
+ AddServerReply addServerReply;
+ String leaderPath;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+ boolean removeShardOnFailure) {
+ this.shardInfo = shardInfo;
+ this.addServerReply = addServerReply;
+ this.leaderPath = leaderPath;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }
+ }
+
+ private static class ForwardedAddServerFailure {
+ String shardName;
+ String failureMessage;
+ Throwable failure;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+ boolean removeShardOnFailure) {
+ this.shardName = shardName;
+ this.failureMessage = failureMessage;
+ this.failure = failure;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }
+ }
+
@VisibleForTesting
protected static class ShardInformation {
private final ShardIdentifier shardId;