import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
-import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
+ private final Map<String, Future<Boolean>> shardActorStoppingFutures = new HashMap<>();
+
private final String persistenceId;
+ private final AbstractDataStore dataStore;
+
+ private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
+ private PrefixedShardConfigUpdateHandler configUpdateHandler;
ShardManager(AbstractShardManagerCreator<?> builder) {
this.cluster = builder.getCluster();
"shard-manager-" + this.type,
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
shardManagerMBean.registerMBean();
+
+ dataStore = builder.getDistributedDataStore();
+ }
+
+ @Override
+ public void preStart() {
+ LOG.info("Starting ShardManager {}", persistenceId);
}
@Override
LOG.info("Stopping ShardManager {}", persistenceId());
shardManagerMBean.unregisterMBean();
+
+ if (configListenerReg != null) {
+ configListenerReg.close();
+ configListenerReg = null;
+ }
}
@Override
onCreateShard((CreateShard)message);
} else if (message instanceof AddShardReplica) {
onAddShardReplica((AddShardReplica) message);
- } else if (message instanceof CreatePrefixedShard) {
- onCreatePrefixedShard((CreatePrefixedShard) message);
} else if (message instanceof AddPrefixShardReplica) {
onAddPrefixShardReplica((AddPrefixShardReplica) message);
+ } else if (message instanceof PrefixShardCreated) {
+ onPrefixShardCreated((PrefixShardCreated) message);
+ } else if (message instanceof PrefixShardRemoved) {
+ onPrefixShardRemoved((PrefixShardRemoved) message);
+ } else if (message instanceof InitConfigListener) {
+ onInitConfigListener();
} else if (message instanceof ForwardedAddServerReply) {
ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
} else if (message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
+ } else if (message instanceof RemovePrefixShardReplica) {
+ onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
} else if (message instanceof WrappedShardResponse) {
onWrappedShardResponse((WrappedShardResponse) message);
} else if (message instanceof GetSnapshot) {
}
}
+ private void onInitConfigListener() {
+ LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
+
+ final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+ org.opendaylight.mdsal.common.api.LogicalDatastoreType
+ .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
+
+ if (configUpdateHandler != null) {
+ configUpdateHandler.close();
+ }
+
+ configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
+ configUpdateHandler.initListener(dataStore, type);
+ }
+
private void onShutDown() {
List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
for (ShardInformation info : localShards.values()) {
}
}
+ private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
+ final String primaryPath, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+ final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+ //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+ LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ primaryPath, shardId);
+
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
+ Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+ new RemoveServer(shardId.toString()), removeServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+ primaryPath, shardName);
+
+ LOG.debug("{}: {}", persistenceId(), msg, failure);
+
+ // FAILURE
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ // SUCCESS
+ self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
}
private void onShardReplicaRemoved(ServerRemoved message) {
- final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
- final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+ removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void removeShard(final ShardIdentifier shardId) {
+ final String shardName = shardId.getShardName();
+ final ShardInformation shardInformation = localShards.remove(shardName);
if (shardInformation == null) {
LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
return;
- } else if (shardInformation.getActor() != null) {
- LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
- shardInformation.getActor().tell(Shutdown.INSTANCE, self());
}
- LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+
+ final ActorRef shardActor = shardInformation.getActor();
+ if (shardActor != null) {
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor);
+ FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(3);
+ final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE);
+ shardActorStoppingFutures.put(shardName, stopFuture);
+ stopFuture.onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(Throwable failure, Boolean result) {
+ if (failure == null) {
+ LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
+ } else {
+ LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
+ }
+
+ self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName),
+ ActorRef.noSender());
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
persistShardList();
}
return;
}
- byte[] shardManagerSnapshot = null;
- if (currentSnapshot != null) {
- shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
- }
-
ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
- new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
+ new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
for (ShardInformation shardInfo: localShards.values()) {
}
}
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
- LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
-
- Object reply;
- try {
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
- createPrefixedShard.getConfig().getPrefix());
- if (localShards.containsKey(shardId.getShardName())) {
- LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
- reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
- } else {
- doCreatePrefixedShard(createPrefixedShard);
- reply = new Status.Success(null);
- }
- } catch (final Exception e) {
- LOG.error("{}: onCreateShard failed", persistenceId(), e);
- reply = new Status.Failure(e);
- }
-
- if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
- getSender().tell(reply, getSelf());
- }
- }
-
@SuppressWarnings("checkstyle:IllegalCatch")
private void onCreateShard(CreateShard createShard) {
LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
}
}
- private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
- final PrefixShardConfiguration config = createPrefixedShard.getConfig();
+ private void onPrefixShardCreated(final PrefixShardCreated message) {
+ LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
- final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
- createPrefixedShard.getConfig().getPrefix());
+ final PrefixShardConfiguration config = message.getConfiguration();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
final String shardName = shardId.getShardName();
- configuration.addPrefixShardConfiguration(config);
+ if (isPreviousShardActorStopInProgress(shardName, message)) {
+ return;
+ }
- DatastoreContext shardDatastoreContext = createPrefixedShard.getContext();
+ if (localShards.containsKey(shardName)) {
+ LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
+ final PrefixShardConfiguration existing =
+ configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
- if (shardDatastoreContext == null) {
- final Builder builder = newShardDatastoreContextBuilder(shardName);
- builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
- .storeRoot(config.getPrefix().getRootIdentifier());
- shardDatastoreContext = builder.build();
- } else {
- shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
- peerAddressResolver).build();
+ if (existing != null && existing.equals(config)) {
+ // we don't have to do nothing here
+ return;
+ }
}
- final boolean shardWasInRecoveredSnapshot = currentSnapshot != null
- && currentSnapshot.getShardList().contains(shardName);
+ doCreatePrefixShard(config, shardId, shardName);
+ }
- final Map<String, String> peerAddresses = Collections.emptyMap();
+ private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
+ final Future<Boolean> stopFuture = shardActorStoppingFutures.get(shardName);
+ if (stopFuture == null) {
+ return false;
+ }
+
+ LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(),
+ shardName, messageToDefer);
+ final ActorRef sender = getSender();
+ stopFuture.onComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(Throwable failure, Boolean result) {
+ LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
+ self().tell(messageToDefer, sender);
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+
+ return true;
+ }
+
+ private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
+ configuration.addPrefixShardConfiguration(config);
+
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+ .storeRoot(config.getPrefix().getRootIdentifier());
+ DatastoreContext shardDatastoreContext = builder.build();
+
+ final Map<String, String> peerAddresses = getPeerAddresses(shardName);
final boolean isActiveMember = true;
- LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
- persistenceId(), shardId, peerAddresses, isActiveMember);
+
+ LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
- shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver);
+ shardDatastoreContext, Shard.builder(), peerAddressResolver);
info.setActiveMember(isActiveMember);
localShards.put(info.getShardName(), info);
}
}
+ private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+ LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
+
+ final DOMDataTreeIdentifier prefix = message.getPrefix();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+
+ configuration.removePrefixShardConfiguration(prefix);
+ removeShard(shardId);
+ }
+
private void doCreateShard(final CreateShard createShard) {
final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
final String shardName = moduleShardConfig.getShardName();
if (currentSnapshot == null && restoreFromSnapshot != null
&& restoreFromSnapshot.getShardManagerSnapshot() != null) {
- try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
- restoreFromSnapshot.getShardManagerSnapshot()))) {
- ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
+ ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
- LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
+ LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
- applyShardManagerSnapshot(snapshot);
- } catch (ClassNotFoundException | IOException e) {
- LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
- }
+ applyShardManagerSnapshot(snapshot);
}
createLocalShards();
}
@VisibleForTesting
- protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
- return getContext().actorOf(info.newProps(schemaContext)
- .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+ protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) {
+ return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath),
+ info.getShardId().toString());
}
private void findPrimary(FindPrimary message) {
* @param shardName the shard name
*/
private Map<String, String> getPeerAddresses(String shardName) {
- Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
- Map<String, String> peerAddresses = new HashMap<>();
+ final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
+ return getPeerAddresses(shardName, members);
+ }
+ private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
+ Map<String, String> peerAddresses = new HashMap<>();
MemberName currentMemberName = this.cluster.getCurrentMemberName();
for (MemberName memberName : members) {
return false;
}
+ private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
+ LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
- // With this message the shard does NOT have to be preconfigured
- // do a dynamic lookup if the shard exists somewhere and replicate
- private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
- final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
-
- LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(message.getShardPrefix()));
+ final String shardName = shardId.getShardName();
+ // Create the localShard
if (schemaContext == null) {
- final String msg = String.format(
+ String msg = String.format(
"No SchemaContext is available in order to create a local shard instance for %s", shardName);
LOG.debug("{}: {}", persistenceId(), msg);
getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return;
}
- findPrimary(shardName,
- new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
- @Override
- public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
- getSelf().tell(new RunnableMessage() {
- @Override
- public void run() {
- addShard(getShardName(), response, getSender());
- }
- }, getTargetActor());
- }
-
- @Override
- public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
- sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
- }
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+ getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
+ message.getShardPrefix(), response, getSender());
+ if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+ getSelf().tell(runnable, getTargetActor());
}
- );
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+ }
+ });
}
private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
- getTargetActor());
+ final RunnableMessage runnable = (RunnableMessage) () ->
+ addShard(getShardName(), response, getSender());
+ if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+ getSelf().tell(runnable, getTargetActor());
+ }
}
@Override
public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
}
-
});
}
sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
}
+ private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
+ final RemotePrimaryShardFound response, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardInformation shardInfo;
+ final boolean removeShardOnFailure;
+ ShardInformation existingShardInfo = localShards.get(shardName);
+ if (existingShardInfo == null) {
+ removeShardOnFailure = true;
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ DatastoreContext datastoreContext = builder.build();
+
+ shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setActiveMember(false);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ } else {
+ removeShardOnFailure = false;
+ shardInfo = existingShardInfo;
+ }
+
+ execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+ }
+
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
shardInfo = existingShardInfo;
}
- String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+ execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+ }
+
+ private void execAddShard(final String shardName,
+ final ShardInformation shardInfo,
+ final RemotePrimaryShardFound response,
+ final boolean removeShardOnFailure,
+ final ActorRef sender) {
+
+ final String localShardAddress =
+ peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
//inform ShardLeader to add this shard as a replica by sending an AddServer message
LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
response.getPrimaryPath(), shardInfo.getShardId());
- Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
+ final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
.getShardLeaderElectionTimeout().duration());
- Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
- new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
+ final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+ new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
- String msg = String.format("AddServer request to leader %s for shard %s failed",
+ final String msg = String.format("AddServer request to leader %s for shard %s failed",
response.getPrimaryPath(), shardName);
self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
} else {
});
}
+ private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
+ LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
+
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(message.getShardPrefix()));
+ final String shardName = shardId.getShardName();
+
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
+ shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ private void doRemoveShardReplicaAsync(final String primaryPath) {
+ getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
+ primaryPath, getSender()), getTargetActor());
+ }
+ });
+ }
+
private void persistShardList() {
List<String> shardList = new ArrayList<>(localShards.keySet());
for (ShardInformation shardInfo : localShards.values()) {
}
}
LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
- saveSnapshot(updateShardManagerSnapshot(shardList));
+ saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
}
- private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
- currentSnapshot = new ShardManagerSnapshot(shardList);
+ private ShardManagerSnapshot updateShardManagerSnapshot(
+ final List<String> shardList,
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
+ currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
return currentSnapshot;
}
OnDemandRaftState raftState = (OnDemandRaftState) response;
Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
- for ( Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+ for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
serverVotingStatusMap.put(e.getKey(), !e.getValue());
}
}
private void findLocalShard(FindLocalShard message) {
+ LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
+
final ShardInformation shardInformation = localShards.get(message.getShardName());
if (shardInformation == null) {
+ LOG.debug("{}: Local shard {} not found - shards present: {}",
+ persistenceId(), message.getShardName(), localShards.keySet());
+
getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
return;
}