package org.opendaylight.controller.cluster.datastore.shardmanager;
-import static akka.actor.ActorRef.noSender;
import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberWeaklyUp;
import akka.cluster.Member;
-import akka.cluster.ddata.DistributedData;
-import akka.cluster.ddata.ORMap;
-import akka.cluster.ddata.Replicator.Changed;
-import akka.cluster.ddata.Replicator.Subscribe;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.collect.Sets.SetView;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
-import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
private final String persistenceId;
+ private final AbstractDataStore dataStore;
- private final ActorRef replicator;
+ private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
+ private PrefixedShardConfigUpdateHandler configUpdateHandler;
ShardManager(AbstractShardManagerCreator<?> builder) {
this.cluster = builder.getCluster();
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
shardManagerMBean.registerMBean();
- replicator = DistributedData.get(context().system()).replicator();
-
+ dataStore = builder.getDistributedDataStore();
}
+ @Override
public void preStart() {
- LOG.info("Starting Shardmanager {}", persistenceId);
-
- final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
- new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
- replicator.tell(subscribe, noSender());
+ LOG.info("Starting ShardManager {}", persistenceId);
}
@Override
LOG.info("Stopping ShardManager {}", persistenceId());
shardManagerMBean.unregisterMBean();
+
+ if (configListenerReg != null) {
+ configListenerReg.close();
+ configListenerReg = null;
+ }
}
@Override
onCreateShard((CreateShard)message);
} else if (message instanceof AddShardReplica) {
onAddShardReplica((AddShardReplica) message);
- } else if (message instanceof CreatePrefixedShard) {
- onCreatePrefixedShard((CreatePrefixedShard) message);
} else if (message instanceof AddPrefixShardReplica) {
onAddPrefixShardReplica((AddPrefixShardReplica) message);
+ } else if (message instanceof PrefixShardCreated) {
+ onPrefixShardCreated((PrefixShardCreated) message);
+ } else if (message instanceof PrefixShardRemoved) {
+ onPrefixShardRemoved((PrefixShardRemoved) message);
+ } else if (message instanceof InitConfigListener) {
+ onInitConfigListener();
} else if (message instanceof ForwardedAddServerReply) {
ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
} else if (message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
+ } else if (message instanceof RemovePrefixShardReplica) {
+ onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
} else if (message instanceof WrappedShardResponse) {
onWrappedShardResponse((WrappedShardResponse) message);
} else if (message instanceof GetSnapshot) {
onGetLocalShardIds();
} else if (message instanceof RunnableMessage) {
((RunnableMessage)message).run();
- } else if (message instanceof Changed) {
- onConfigChanged((Changed) message);
} else {
unknownMessage(message);
}
}
+ private void onInitConfigListener() {
+ LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
+
+ final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+ org.opendaylight.mdsal.common.api.LogicalDatastoreType
+ .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
+
+ if (configUpdateHandler != null) {
+ configUpdateHandler.close();
+ }
+
+ configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
+ configUpdateHandler.initListener(dataStore, type);
+ }
+
private void onShutDown() {
List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
for (ShardInformation info : localShards.values()) {
}
}
- private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
- LOG.debug("{}, ShardManager {} received config changed {}",
- cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries());
-
- final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
-
- final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
- changedConfig.values().stream().collect(
- Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity()));
-
- resolveConfig(newConfig);
- }
-
- private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
- LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}",
- cluster.getCurrentMemberName(), persistenceId, newConfig);
-
- newConfig.forEach((prefix, config) ->
- LOG.debug("{} ShardManager : {}, received shard config "
- + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config));
-
- final SetView<DOMDataTreeIdentifier> removedConfigs =
- Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet());
-
- // resolve removals
-
- resolveRemovals(removedConfigs);
-
- final SetView<DOMDataTreeIdentifier> addedConfigs =
- Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet());
- // resolve additions
-
- resolveAdditions(addedConfigs, newConfig);
- // iter through to update existing shards, either start/stop replicas or update the shard
- // to check for more peers
- resolveUpdates(Collections.emptySet());
- }
-
- private void resolveRemovals(final Set<DOMDataTreeIdentifier> removedConfigs) {
- LOG.debug("{} ShardManager : {}, resolving removed configs : {}",
- cluster.getCurrentMemberName(), persistenceId, removedConfigs);
-
- removedConfigs.forEach(id -> doRemovePrefixedShard(id));
- }
-
- private void resolveAdditions(final Set<DOMDataTreeIdentifier> addedConfigs,
- final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> configs) {
- LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs);
-
- addedConfigs.stream().filter(identifier
- -> identifier
- .getDatastoreType().equals(
- ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType())))
- .forEach(id -> doCreatePrefixedShard(configs.get(id)));
- }
-
- private void resolveUpdates(Set<DOMDataTreeIdentifier> maybeUpdatedConfigs) {
- LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs);
- }
-
- private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) {
- LOG.debug("{} ShardManager : {}, removing prefix shard: {}",
- cluster.getCurrentMemberName(), persistenceId, prefix);
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
- final ShardInformation shard = localShards.remove(shardId.getShardName());
-
- configuration.removePrefixShardConfiguration(prefix);
-
- if (shard == null) {
- LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix);
- return;
- }
-
- if (shard.getActor() != null) {
- LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
- shard.getActor().tell(Shutdown.INSTANCE, self());
- }
- LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(),
- persistenceId(), shardId.getShardName());
- persistShardList();
- }
-
private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
String leaderPath) {
shardReplicaOperationsInProgress.remove(shardId.getShardName());
}
}
+ private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
+ final String primaryPath, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+ final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+ //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+ LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ primaryPath, shardId);
+
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
+ Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+ new RemoveServer(shardId.toString()), removeServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+ primaryPath, shardName);
+
+ LOG.debug("{}: {}", persistenceId(), msg, failure);
+
+ // FAILURE
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ // SUCCESS
+ self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
- LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
-
- Object reply;
- try {
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
- createPrefixedShard.getConfig().getPrefix());
- if (localShards.containsKey(shardId.getShardName())) {
- LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
- reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
- } else {
- doCreatePrefixedShard(createPrefixedShard);
- reply = new Status.Success(null);
- }
- } catch (final Exception e) {
- LOG.error("{}: onCreateShard failed", persistenceId(), e);
- reply = new Status.Failure(e);
- }
-
- if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
- getSender().tell(reply, getSelf());
- }
- }
-
@SuppressWarnings("checkstyle:IllegalCatch")
private void onCreateShard(CreateShard createShard) {
LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
}
}
- private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
- doCreatePrefixedShard(createPrefixedShard.getConfig());
- // do not replicate on this level
- }
+ private void onPrefixShardCreated(final PrefixShardCreated message) {
+ LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
- private void doCreatePrefixedShard(final PrefixShardConfiguration config) {
- LOG.debug("doCreatePrefixShard : {}", config.getPrefix());
+ final PrefixShardConfiguration config = message.getConfiguration();
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
- config.getPrefix());
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
final String shardName = shardId.getShardName();
if (localShards.containsKey(shardName)) {
}
}
+ doCreatePrefixShard(config, shardId, shardName);
+ }
+
+ private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
configuration.addPrefixShardConfiguration(config);
final Builder builder = newShardDatastoreContextBuilder(shardName);
.storeRoot(config.getPrefix().getRootIdentifier());
DatastoreContext shardDatastoreContext = builder.build();
- final Map<String, String> peerAddresses = Collections.emptyMap();
+ final Map<String, String> peerAddresses = getPeerAddresses(shardName);
final boolean isActiveMember = true;
- LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: "
- + "{}, peerAddresses: {}, isActiveMember: {}",
- shardId, persistenceId(), config.getShardMemberNames(),
- peerAddresses, isActiveMember);
+
+ LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
shardDatastoreContext, Shard.builder(), peerAddressResolver);
if (schemaContext != null) {
info.setActor(newShardActor(schemaContext, info));
}
+ }
+
+ private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+ LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
+
+ final DOMDataTreeIdentifier prefix = message.getPrefix();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+ final ShardInformation shard = localShards.remove(shardId.getShardName());
+
+ configuration.removePrefixShardConfiguration(prefix);
+
+ if (shard == null) {
+ LOG.warn("{}: Received removal for unconfigured shard: {}, ignoring.. ", persistenceId(), prefix);
+ return;
+ }
+ if (shard.getActor() != null) {
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
+ shard.getActor().tell(Shutdown.INSTANCE, self());
+ }
+
+ LOG.debug("{}: Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
persistShardList();
}
return false;
}
+ private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
+ LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
- // With this message the shard does NOT have to be preconfigured
- // do a dynamic lookup if the shard exists somewhere and replicate
- private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
- final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
-
- LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(message.getShardPrefix()));
+ final String shardName = shardId.getShardName();
+ // Create the localShard
if (schemaContext == null) {
- final String msg = String.format(
+ String msg = String.format(
"No SchemaContext is available in order to create a local shard instance for %s", shardName);
LOG.debug("{}: {}", persistenceId(), msg);
getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return;
}
- findPrimary(shardName,
- new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
- @Override
- public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
- getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
- getTargetActor());
- }
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+ getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ getSelf().tell((RunnableMessage) () -> addPrefixShard(getShardName(), message.getShardPrefix(),
+ response, getSender()), getTargetActor());
+ }
- @Override
- public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
- sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
- }
- }
- );
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+ }
+ });
}
private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
}
+ private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
+ final RemotePrimaryShardFound response, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardInformation shardInfo;
+ final boolean removeShardOnFailure;
+ ShardInformation existingShardInfo = localShards.get(shardName);
+ if (existingShardInfo == null) {
+ removeShardOnFailure = true;
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ DatastoreContext datastoreContext = builder.build();
+
+ shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setActiveMember(false);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ } else {
+ removeShardOnFailure = false;
+ shardInfo = existingShardInfo;
+ }
+
+ execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+ }
+
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
shardInfo = existingShardInfo;
}
- String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+ execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+ }
+
+ private void execAddShard(final String shardName,
+ final ShardInformation shardInfo,
+ final RemotePrimaryShardFound response,
+ final boolean removeShardOnFailure,
+ final ActorRef sender) {
+
+ final String localShardAddress =
+ peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
//inform ShardLeader to add this shard as a replica by sending an AddServer message
LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
response.getPrimaryPath(), shardInfo.getShardId());
- Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
+ final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
.getShardLeaderElectionTimeout().duration());
- Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
- new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
+ final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+ new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
- String msg = String.format("AddServer request to leader %s for shard %s failed",
+ final String msg = String.format("AddServer request to leader %s for shard %s failed",
response.getPrimaryPath(), shardName);
self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
} else {
});
}
+ private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
+ LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
+
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(message.getShardPrefix()));
+ final String shardName = shardId.getShardName();
+
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
+ shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ private void doRemoveShardReplicaAsync(final String primaryPath) {
+ getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
+ primaryPath, getSender()), getTargetActor());
+ }
+ });
+ }
+
private void persistShardList() {
List<String> shardList = new ArrayList<>(localShards.keySet());
for (ShardInformation shardInfo : localShards.values()) {
}
private void findLocalShard(FindLocalShard message) {
+ LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
+
final ShardInformation shardInformation = localShards.get(message.getShardName());
if (shardInformation == null) {
+ LOG.debug("{}: Local shard {} not found - shards present: {}",
+ persistenceId(), message.getShardName(), localShards.keySet());
+
getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
return;
}