private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
+ private final Map<String, Future<Boolean>> shardActorStoppingFutures = new HashMap<>();
+
private final String persistenceId;
private final AbstractDataStore dataStore;
}
private void onShardReplicaRemoved(ServerRemoved message) {
- final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
- final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+ removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void removeShard(final ShardIdentifier shardId) {
+ final String shardName = shardId.getShardName();
+ final ShardInformation shardInformation = localShards.remove(shardName);
if (shardInformation == null) {
LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
return;
- } else if (shardInformation.getActor() != null) {
- LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
- shardInformation.getActor().tell(Shutdown.INSTANCE, self());
}
- LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+
+ final ActorRef shardActor = shardInformation.getActor();
+ if (shardActor != null) {
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor);
+ FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(3);
+ final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE);
+ shardActorStoppingFutures.put(shardName, stopFuture);
+ stopFuture.onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(Throwable failure, Boolean result) {
+ if (failure == null) {
+ LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
+ } else {
+ LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
+ }
+
+ self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName),
+ ActorRef.noSender());
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
persistShardList();
}
LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
final PrefixShardConfiguration config = message.getConfiguration();
-
final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
final String shardName = shardId.getShardName();
+ if (isPreviousShardActorStopInProgress(shardName, message)) {
+ return;
+ }
+
if (localShards.containsKey(shardName)) {
LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
final PrefixShardConfiguration existing =
doCreatePrefixShard(config, shardId, shardName);
}
+ private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
+ final Future<Boolean> stopFuture = shardActorStoppingFutures.get(shardName);
+ if (stopFuture == null) {
+ return false;
+ }
+
+ LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(),
+ shardName, messageToDefer);
+ final ActorRef sender = getSender();
+ stopFuture.onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(Throwable failure, Boolean result) {
+ LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
+ self().tell(messageToDefer, sender);
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+
+ return true;
+ }
+
private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
configuration.addPrefixShardConfiguration(config);
.storeRoot(config.getPrefix().getRootIdentifier());
DatastoreContext shardDatastoreContext = builder.build();
- final Map<String, String> peerAddresses = Collections.emptyMap();
+ final Map<String, String> peerAddresses = getPeerAddresses(shardName);
final boolean isActiveMember = true;
LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
final DOMDataTreeIdentifier prefix = message.getPrefix();
final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
- final ShardInformation shard = localShards.remove(shardId.getShardName());
configuration.removePrefixShardConfiguration(prefix);
-
- if (shard == null) {
- LOG.warn("{}: Received removal for unconfigured shard: {}, ignoring.. ", persistenceId(), prefix);
- return;
- }
-
- if (shard.getActor() != null) {
- LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
- shard.getActor().tell(Shutdown.INSTANCE, self());
- }
-
- LOG.debug("{}: Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
- persistShardList();
+ removeShard(shardId);
}
private void doCreateShard(final CreateShard createShard) {
}
@VisibleForTesting
- protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
- return getContext().actorOf(info.newProps(schemaContext)
- .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+ protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) {
+ return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath),
+ info.getShardId().toString());
}
private void findPrimary(FindPrimary message) {
getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell((RunnableMessage) () -> addPrefixShard(getShardName(), message.getShardPrefix(),
- response, getSender()), getTargetActor());
+ final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
+ message.getShardPrefix(), response, getSender());
+ if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+ getSelf().tell(runnable, getTargetActor());
+ }
}
@Override
getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
- getTargetActor());
+ final RunnableMessage runnable = (RunnableMessage) () ->
+ addShard(getShardName(), response, getSender());
+ if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+ getSelf().tell(runnable, getTargetActor());
+ }
}
@Override
public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
}
-
});
}