package org.opendaylight.controller.cluster.datastore.shardmanager;
import static akka.pattern.Patterns.ask;
+
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.SupervisorStrategy;
import akka.actor.SupervisorStrategy.Directive;
import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
import java.util.function.Supplier;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
/**
- * The ShardManager has the following jobs,
+ * Manages the shards for a data store. The ShardManager has the following jobs:
* <ul>
* <li> Create all the local shard replicas that belong on this cluster member
* <li> Find the address of the local shard
* <li> Find the primary replica for any given shard
* <li> Monitor the cluster members and store their addresses
- * <ul>
+ * </ul>
*/
class ShardManager extends AbstractUntypedPersistentActorWithMetering {
-
private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
// Stores a mapping between a shard name and it's corresponding information
private final String shardDispatcherPath;
- private final ShardManagerInfo mBean;
+ private final ShardManagerInfo shardManagerMBean;
private DatastoreContextFactory datastoreContextFactory;
this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
- this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
+ this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
+ shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
+ "shard-manager-" + this.type,
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
- mBean.registerMBean();
+ shardManagerMBean.registerMBean();
}
@Override
public void postStop() {
LOG.info("Stopping ShardManager {}", persistenceId());
- mBean.unregisterMBean();
+ shardManagerMBean.unregisterMBean();
}
@Override
public void handleCommand(Object message) throws Exception {
if (message instanceof FindPrimary) {
findPrimary((FindPrimary)message);
- } else if(message instanceof FindLocalShard){
+ } else if (message instanceof FindLocalShard) {
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
- } else if(message instanceof ActorInitialized) {
+ } else if (message instanceof ActorInitialized) {
onActorInitialized(message);
- } else if (message instanceof ClusterEvent.MemberUp){
+ } else if (message instanceof ClusterEvent.MemberUp) {
memberUp((ClusterEvent.MemberUp) message);
- } else if (message instanceof ClusterEvent.MemberExited){
+ } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
+ memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
+ } else if (message instanceof ClusterEvent.MemberExited) {
memberExited((ClusterEvent.MemberExited) message);
- } else if(message instanceof ClusterEvent.MemberRemoved) {
+ } else if (message instanceof ClusterEvent.MemberRemoved) {
memberRemoved((ClusterEvent.MemberRemoved) message);
- } else if(message instanceof ClusterEvent.UnreachableMember) {
- memberUnreachable((ClusterEvent.UnreachableMember)message);
- } else if(message instanceof ClusterEvent.ReachableMember) {
+ } else if (message instanceof ClusterEvent.UnreachableMember) {
+ memberUnreachable((ClusterEvent.UnreachableMember) message);
+ } else if (message instanceof ClusterEvent.ReachableMember) {
memberReachable((ClusterEvent.ReachableMember) message);
- } else if(message instanceof DatastoreContextFactory) {
- onDatastoreContextFactory((DatastoreContextFactory)message);
- } else if(message instanceof RoleChangeNotification) {
+ } else if (message instanceof DatastoreContextFactory) {
+ onDatastoreContextFactory((DatastoreContextFactory) message);
+ } else if (message instanceof RoleChangeNotification) {
onRoleChangeNotification((RoleChangeNotification) message);
- } else if(message instanceof FollowerInitialSyncUpStatus){
+ } else if (message instanceof FollowerInitialSyncUpStatus) {
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
- } else if(message instanceof ShardNotInitializedTimeout) {
- onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
- } else if(message instanceof ShardLeaderStateChanged) {
+ } else if (message instanceof ShardNotInitializedTimeout) {
+ onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
+ } else if (message instanceof ShardLeaderStateChanged) {
onLeaderStateChanged((ShardLeaderStateChanged) message);
- } else if(message instanceof SwitchShardBehavior){
+ } else if (message instanceof SwitchShardBehavior) {
onSwitchShardBehavior((SwitchShardBehavior) message);
- } else if(message instanceof CreateShard) {
+ } else if (message instanceof CreateShard) {
onCreateShard((CreateShard)message);
- } else if(message instanceof AddShardReplica){
- onAddShardReplica((AddShardReplica)message);
- } else if(message instanceof ForwardedAddServerReply) {
+ } else if (message instanceof AddShardReplica) {
+ onAddShardReplica((AddShardReplica) message);
+ } else if (message instanceof CreatePrefixedShard) {
+ onCreatePrefixedShard((CreatePrefixedShard) message);
+ } else if (message instanceof AddPrefixShardReplica) {
+ onAddPrefixShardReplica((AddPrefixShardReplica) message);
+ } else if (message instanceof ForwardedAddServerReply) {
ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
msg.removeShardOnFailure);
- } else if(message instanceof ForwardedAddServerFailure) {
+ } else if (message instanceof ForwardedAddServerFailure) {
ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if(message instanceof PrimaryShardFoundForContext) {
- PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
- onPrimaryShardFoundContext(primaryShardFoundContext);
- } else if(message instanceof RemoveShardReplica) {
+ } else if (message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
- } else if(message instanceof WrappedShardResponse){
+ } else if (message instanceof WrappedShardResponse) {
onWrappedShardResponse((WrappedShardResponse) message);
- } else if(message instanceof GetSnapshot) {
+ } else if (message instanceof GetSnapshot) {
onGetSnapshot();
- } else if(message instanceof ServerRemoved){
+ } else if (message instanceof ServerRemoved) {
onShardReplicaRemoved((ServerRemoved) message);
- } else if(message instanceof SaveSnapshotSuccess) {
- onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
- } else if(message instanceof SaveSnapshotFailure) {
- LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
- persistenceId(), ((SaveSnapshotFailure) message).cause());
- } else if(message instanceof Shutdown) {
+ } else if (message instanceof ChangeShardMembersVotingStatus) {
+ onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+ } else if (message instanceof FlipShardMembersVotingStatus) {
+ onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
+ } else if (message instanceof SaveSnapshotSuccess) {
+ onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+ } else if (message instanceof SaveSnapshotFailure) {
+ LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
+ ((SaveSnapshotFailure) message).cause());
+ } else if (message instanceof Shutdown) {
onShutDown();
} else if (message instanceof GetLocalShardIds) {
onGetLocalShardIds();
+ } else if (message instanceof RunnableMessage) {
+ ((RunnableMessage)message).run();
} else {
unknownMessage(message);
}
if (info.getActor() != null) {
LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
- FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+ FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(2);
stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
}
}
LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
- ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+ ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
+ .getDispatcher(Dispatchers.DispatcherType.Client);
Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
self().tell(PoisonPill.getInstance(), self());
- if(failure != null) {
+ if (failure != null) {
LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
} else {
int nfailed = 0;
- for(Boolean r: results) {
- if(!r) {
+ for (Boolean result : results) {
+ if (!result) {
nfailed++;
}
}
- if(nfailed > 0) {
+ if (nfailed > 0) {
LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
}
}
private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
String leaderPath) {
- shardReplicaOperationsInProgress.remove(shardId);
+ shardReplicaOperationsInProgress.remove(shardId.getShardName());
- LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+ LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+ LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
shardId.getShardName());
originalSender.tell(new Status.Success(null), getSelf());
} else {
- LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+ LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
persistenceId(), shardId, replyMsg.getStatus());
Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
}
}
- private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
- if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
- addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
- getSender());
- } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
- removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
- primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
- }
- }
-
private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
final ActorRef sender) {
- if(isShardReplicaOperationInProgress(shardName, sender)) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
}
final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
//inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
- LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
primaryPath, shardId);
- Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
- duration());
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
new RemoveServer(shardId.toString()), removeServerTimeout);
@Override
public void onComplete(Throwable failure, Object response) {
if (failure != null) {
+ shardReplicaOperationsInProgress.remove(shardName);
String msg = String.format("RemoveServer request to leader %s for shard %s failed",
primaryPath, shardName);
- LOG.debug ("{}: {}", persistenceId(), msg, failure);
+ LOG.debug("{}: {}", persistenceId(), msg, failure);
// FAILURE
sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
private void onShardReplicaRemoved(ServerRemoved message) {
final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
- if(shardInformation == null) {
+ if (shardInformation == null) {
LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
return;
- } else if(shardInformation.getActor() != null) {
+ } else if (shardInformation.getActor() != null) {
LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
shardInformation.getActor().tell(Shutdown.INSTANCE, self());
}
LOG.debug("{}: onGetSnapshot", persistenceId());
List<String> notInitialized = null;
- for(ShardInformation shardInfo: localShards.values()) {
- if(!shardInfo.isShardInitialized()) {
- if(notInitialized == null) {
+ for (ShardInformation shardInfo : localShards.values()) {
+ if (!shardInfo.isShardInitialized()) {
+ if (notInitialized == null) {
notInitialized = new ArrayList<>();
}
}
}
- if(notInitialized != null) {
+ if (notInitialized != null) {
getSender().tell(new Status.Failure(new IllegalStateException(String.format(
"%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
return;
}
byte[] shardManagerSnapshot = null;
- if(currentSnapshot != null) {
+ if (currentSnapshot != null) {
shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
}
new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
- for(ShardInformation shardInfo: localShards.values()) {
+ for (ShardInformation shardInfo: localShards.values()) {
shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+ LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
+
+ Object reply;
+ try {
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+ createPrefixedShard.getConfig().getPrefix());
+ if (localShards.containsKey(shardId.getShardName())) {
+ LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
+ reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
+ } else {
+ doCreatePrefixedShard(createPrefixedShard);
+ reply = new Status.Success(null);
+ }
+ } catch (final Exception e) {
+ LOG.error("{}: onCreateShard failed", persistenceId(), e);
+ reply = new Status.Failure(e);
+ }
+
+ if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(reply, getSelf());
+ }
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void onCreateShard(CreateShard createShard) {
LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
Object reply;
try {
String shardName = createShard.getModuleShardConfig().getShardName();
- if(localShards.containsKey(shardName)) {
+ if (localShards.containsKey(shardName)) {
LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
reply = new Status.Failure(e);
}
- if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
getSender().tell(reply, getSelf());
}
}
- private void doCreateShard(CreateShard createShard) {
- ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
- String shardName = moduleShardConfig.getShardName();
+ private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
+ final PrefixShardConfiguration config = createPrefixedShard.getConfig();
+
+ final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
+ createPrefixedShard.getConfig().getPrefix());
+ final String shardName = shardId.getShardName();
+
+ configuration.addPrefixShardConfiguration(config);
+
+ DatastoreContext shardDatastoreContext = createPrefixedShard.getContext();
+
+ if (shardDatastoreContext == null) {
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+ .storeRoot(config.getPrefix().getRootIdentifier());
+ shardDatastoreContext = builder.build();
+ } else {
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+ peerAddressResolver).build();
+ }
+
+ final boolean shardWasInRecoveredSnapshot = currentSnapshot != null
+ && currentSnapshot.getShardList().contains(shardName);
+
+ final Map<String, String> peerAddresses = Collections.emptyMap();
+ final boolean isActiveMember = true;
+ LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, peerAddresses, isActiveMember);
+
+ final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+ shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver);
+ info.setActiveMember(isActiveMember);
+ localShards.put(info.getShardName(), info);
+
+ if (schemaContext != null) {
+ info.setActor(newShardActor(schemaContext, info));
+ }
+ }
+
+ private void doCreateShard(final CreateShard createShard) {
+ final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ final String shardName = moduleShardConfig.getShardName();
configuration.addModuleShardConfiguration(moduleShardConfig);
DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
- if(shardDatastoreContext == null) {
+ if (shardDatastoreContext == null) {
shardDatastoreContext = newShardDatastoreContext(shardName);
} else {
shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
- currentSnapshot.getShardList().contains(shardName);
+ boolean shardWasInRecoveredSnapshot = currentSnapshot != null
+ && currentSnapshot.getShardList().contains(shardName);
Map<String, String> peerAddresses;
boolean isActiveMember;
- if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
- contains(cluster.getCurrentMemberName())) {
+ if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
+ .contains(cluster.getCurrentMemberName())) {
peerAddresses = getPeerAddresses(shardName);
isActiveMember = true;
} else {
// subsequent AddServer request will be needed to make it an active member.
isActiveMember = false;
peerAddresses = Collections.emptyMap();
- shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
- customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
}
LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
info.setActiveMember(isActiveMember);
localShards.put(info.getShardName(), info);
- if(schemaContext != null) {
+ if (schemaContext != null) {
info.setActor(newShardActor(schemaContext, info));
}
}
private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
- return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
- shardPeerAddressResolver(peerAddressResolver);
+ return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
+ .shardPeerAddressResolver(peerAddressResolver);
}
private DatastoreContext newShardDatastoreContext(String shardName) {
return newShardDatastoreContextBuilder(shardName).build();
}
- private void checkReady(){
+ private void checkReady() {
if (isReadyWithLeaderId()) {
LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
persistenceId(), type, waitTillReadyCountdownLatch.getCount());
LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
- if(shardInformation != null) {
+ if (shardInformation != null) {
shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
- if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+ if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
}
shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
- if(!shardInfo.isShardInitialized()) {
+ if (!shardInfo.isShardInitialized()) {
LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
} else {
ShardInformation shardInformation = findShardInformation(status.getName());
- if(shardInformation != null) {
+ if (shardInformation != null) {
shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
- mBean.setSyncStatus(isInSync());
+ shardManagerMBean.setSyncStatus(isInSync());
}
}
roleChanged.getOldRole(), roleChanged.getNewRole());
ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
- if(shardInformation != null) {
+ if (shardInformation != null) {
shardInformation.setRole(roleChanged.getNewRole());
checkReady();
- mBean.setSyncStatus(isInSync());
+ shardManagerMBean.setSyncStatus(isInSync());
}
}
private ShardInformation findShardInformation(String memberId) {
- for(ShardInformation info : localShards.values()){
- if(info.getShardId().toString().equals(memberId)){
+ for (ShardInformation info : localShards.values()) {
+ if (info.getShardId().toString().equals(memberId)) {
return info;
}
}
private boolean isReadyWithLeaderId() {
boolean isReady = true;
for (ShardInformation info : localShards.values()) {
- if(!info.isShardReadyWithLeaderId()){
+ if (!info.isShardReadyWithLeaderId()) {
isReady = false;
break;
}
return isReady;
}
- private boolean isInSync(){
+ private boolean isInSync() {
for (ShardInformation info : localShards.values()) {
- if(!info.isInSync()){
+ if (!info.isInSync()) {
return false;
}
}
String actorName = sender.path().name();
//find shard name from actor name; actor name is stringified shardId
- ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
- if (shardId.getShardName() == null) {
+ final ShardIdentifier shardId;
+ try {
+ shardId = ShardIdentifier.fromShardIdString(actorName);
+ } catch (IllegalArgumentException e) {
+ LOG.debug("{}: ignoring actor {}", actorName, e);
return;
}
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void onRecoveryCompleted() {
LOG.info("Recovery complete : {}", persistenceId());
// journal on upgrade from Helium.
deleteMessages(lastSequenceNr());
- if(currentSnapshot == null && restoreFromSnapshot != null &&
- restoreFromSnapshot.getShardManagerSnapshot() != null) {
- try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
+ if (currentSnapshot == null && restoreFromSnapshot != null
+ && restoreFromSnapshot.getShardManagerSnapshot() != null) {
+ try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
restoreFromSnapshot.getShardManagerSnapshot()))) {
ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
applyShardManagerSnapshot(snapshot);
- } catch(Exception e) {
+ } catch (ClassNotFoundException | IOException e) {
LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
}
}
createLocalShards();
}
- private void findLocalShard(FindLocalShard message) {
- final ShardInformation shardInformation = localShards.get(message.getShardName());
-
- if(shardInformation == null){
- getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
- return;
- }
-
- sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor()));
- }
-
private void sendResponse(ShardInformation shardInformation, boolean doWait,
boolean wantShardReady, final Supplier<Object> messageSupplier) {
- if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
- if(doWait) {
+ if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
+ if (doWait) {
final ActorRef sender = getSender();
final ActorRef self = self();
shardInformation.addOnShardInitialized(onShardInitialized);
- FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
- if(shardInformation.isShardInitialized()) {
+ FiniteDuration timeout = shardInformation.getDatastoreContext()
+ .getShardInitializationTimeout().duration();
+ if (shardInformation.isShardInitialized()) {
// If the shard is already initialized then we'll wait enough time for the shard to
// elect a leader, ie 2 times the election timeout.
timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
private void memberRemoved(ClusterEvent.MemberRemoved message) {
MemberName memberName = memberToName(message.member());
- LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
peerAddressResolver.removePeerAddress(memberName);
- for(ShardInformation info : localShards.values()){
+ for (ShardInformation info : localShards.values()) {
info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
}
}
private void memberExited(ClusterEvent.MemberExited message) {
MemberName memberName = memberToName(message.member());
- LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
peerAddressResolver.removePeerAddress(memberName);
- for(ShardInformation info : localShards.values()){
+ for (ShardInformation info : localShards.values()) {
info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
}
}
private void memberUp(ClusterEvent.MemberUp message) {
MemberName memberName = memberToName(message.member());
- LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
- addPeerAddress(memberName, message.member().address());
+ memberUp(memberName, message.member().address());
+ }
+ private void memberUp(MemberName memberName, Address address) {
+ addPeerAddress(memberName, address);
checkReady();
}
+ private void memberWeaklyUp(MemberWeaklyUp message) {
+ MemberName memberName = memberToName(message.member());
+
+ LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
+ memberUp(memberName, message.member().address());
+ }
+
private void addPeerAddress(MemberName memberName, Address address) {
peerAddressResolver.addPeerAddress(memberName, address);
- for(ShardInformation info : localShards.values()){
+ for (ShardInformation info : localShards.values()) {
String shardName = info.getShardName();
String peerId = getShardIdentifier(memberName, shardName).toString();
info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
private void memberReachable(ClusterEvent.ReachableMember message) {
MemberName memberName = memberToName(message.member());
- LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+ LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
addPeerAddress(memberName, message.member().address());
private void memberUnreachable(ClusterEvent.UnreachableMember message) {
MemberName memberName = memberToName(message.member());
- LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+ LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
markMemberUnavailable(memberName);
}
final ActorRef actor = info.getActor();
if (actor != null) {
actor.tell(switchBehavior, getSelf());
- } else {
+ } else {
LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
info.getShardName(), switchBehavior.getNewState());
}
}
/**
- * Notifies all the local shards of a change in the schema context
+ * Notifies all the local shards of a change in the schema context.
*
- * @param message
+ * @param message the message to send
*/
private void updateSchemaContext(final Object message) {
schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
if (info != null && info.isActiveMember()) {
sendResponse(info, message.isWaitUntilReady(), true, () -> {
String primaryPath = info.getSerializedLeaderActor();
- Object found = canReturnLocalShardState && info.isLeader() ?
- new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+ Object found = canReturnLocalShardState && info.isLeader()
+ ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
- }
-
- return found;
+ LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+ return found;
});
return;
}
final Collection<String> visitedAddresses;
- if(message instanceof RemoteFindPrimary) {
+ if (message instanceof RemoteFindPrimary) {
visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
} else {
visitedAddresses = new ArrayList<>(1);
visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
- for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
- if(visitedAddresses.contains(address)) {
+ for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
+ if (visitedAddresses.contains(address)) {
continue;
}
String.format("No primary shard found for %s.", shardName)), getSelf());
}
+ private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
+ Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+ .getShardInitializationTimeout().duration().$times(2));
+
+ Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ handler.onFailure(failure);
+ } else {
+ if (response instanceof RemotePrimaryShardFound) {
+ handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
+ } else if (response instanceof LocalPrimaryShardFound) {
+ handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
+ } else {
+ handler.onUnknownResponse(response);
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
/**
* Construct the name of the shard actor given the name of the member on
- * which the shard resides and the name of the shard
+ * which the shard resides and the name of the shard.
*
- * @param memberName
- * @param shardName
- * @return
+ * @param memberName the member name
+ * @param shardName the shard name
+ * @return a b
*/
- private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
+ private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
return peerAddressResolver.getShardIdentifier(memberName, shardName);
}
/**
- * Create shards that are local to the member on which the ShardManager
- * runs
- *
+ * Create shards that are local to the member on which the ShardManager runs.
*/
private void createLocalShards() {
MemberName memberName = this.cluster.getCurrentMemberName();
Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
- if(restoreFromSnapshot != null)
- {
- for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+ if (restoreFromSnapshot != null) {
+ for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
shardSnapshots.put(snapshot.getName(), snapshot);
}
}
restoreFromSnapshot = null; // null out to GC
- for(String shardName : memberShardNames){
+ for (String shardName : memberShardNames) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
}
/**
- * Given the name of the shard find the addresses of all it's peers
+ * Given the name of the shard find the addresses of all it's peers.
*
- * @param shardName
+ * @param shardName the shard name
*/
private Map<String, String> getPeerAddresses(String shardName) {
Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
(Function<Throwable, Directive>) t -> {
LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
return SupervisorStrategy.resume();
- }
- );
-
+ });
}
@Override
}
@VisibleForTesting
- ShardManagerInfoMBean getMBean(){
- return mBean;
+ ShardManagerInfoMBean getMBean() {
+ return shardManagerMBean;
}
private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
if (shardReplicaOperationsInProgress.contains(shardName)) {
String msg = String.format("A shard replica operation for %s is already in progress", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
+ LOG.debug("{}: {}", persistenceId(), msg);
sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return true;
}
return false;
}
- private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
+
+ // With this message the shard does NOT have to be preconfigured
+ // do a dynamic lookup if the shard exists somewhere and replicate
+ private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
+ final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
+
+ LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+ if (schemaContext == null) {
+ final String msg = String.format(
+ "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+ LOG.debug("{}: {}", persistenceId(), msg);
+ getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ return;
+ }
+
+ findPrimary(shardName,
+ new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+ getSelf().tell(new RunnableMessage() {
+ @Override
+ public void run() {
+ addShard(getShardName(), response, getSender());
+ }
+ }, getTargetActor());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+ }
+ }
+ );
+ }
+
+ private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
// verify the shard with the specified name is present in the cluster configuration
- if (!(this.configuration.isShardConfigured(shardName))) {
+ if (!this.configuration.isShardConfigured(shardName)) {
String msg = String.format("No module configuration exists for shard %s", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
+ LOG.debug("{}: {}", persistenceId(), msg);
getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
return;
}
if (schemaContext == null) {
String msg = String.format(
"No SchemaContext is available in order to create a local shard instance for %s", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
+ LOG.debug("{}: {}", persistenceId(), msg);
getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return;
}
- findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+ getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
+ getTargetActor());
}
@Override
private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
String msg = String.format("Local shard %s already exists", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
+ LOG.debug("{}: {}", persistenceId(), msg);
sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
}
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
- if(isShardReplicaOperationInProgress(shardName, sender)) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
}
final ShardInformation shardInfo;
final boolean removeShardOnFailure;
ShardInformation existingShardInfo = localShards.get(shardName);
- if(existingShardInfo == null) {
+ if (existingShardInfo == null) {
removeShardOnFailure = true;
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
- DisableElectionsRaftPolicy.class.getName()).build();
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
Shard.builder(), peerAddressResolver);
String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
//inform ShardLeader to add this shard as a replica by sending an AddServer message
- LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+ LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
response.getPrimaryPath(), shardInfo.getShardId());
- Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
- duration());
+ Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
+ .getShardLeaderElectionTimeout().duration());
Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
@Override
public void onComplete(Throwable failure, Object addServerResponse) {
if (failure != null) {
- LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+ LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
String msg = String.format("AddServer request to leader %s for shard %s failed",
boolean removeShardOnFailure) {
shardReplicaOperationsInProgress.remove(shardName);
- if(removeShardOnFailure) {
+ if (removeShardOnFailure) {
ShardInformation shardInfo = localShards.remove(shardName);
if (shardInfo.getActor() != null) {
shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
String shardName = shardInfo.getShardName();
shardReplicaOperationsInProgress.remove(shardName);
- LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+ LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+ LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
// Make the local shard voting capable
shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
persistShardList();
sender.tell(new Status.Success(null), getSelf());
- } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+ } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
sendLocalReplicaAlreadyExistsReply(shardName, sender);
} else {
- LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
+ LOG.warn("{}: Leader failed to add shard replica {} with status {}",
persistenceId(), shardName, replyMsg.getStatus());
- Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
+ Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
+ shardInfo.getShardId());
onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
}
switch (serverChangeStatus) {
case TIMEOUT:
failure = new TimeoutException(String.format(
- "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
- "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
- leaderPath, shardId.getShardName()));
+ "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
+ + "Possible causes - there was a problem replicating the data or shard leadership changed "
+ + "while replicating the shard data", leaderPath, shardId.getShardName()));
break;
case NO_LEADER:
failure = createNoShardLeaderException(shardId);
return failure;
}
- private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+ private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
@Override
public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
}
@Override
public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ private void doRemoveShardReplicaAsync(final String primaryPath) {
+ getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
+ primaryPath, getSender()), getTargetActor());
}
});
}
shardList.remove(shardInfo.getShardName());
}
}
- LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+ LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
saveSnapshot(updateShardManagerSnapshot(shardList));
}
private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
currentSnapshot = snapshot;
- LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
+ LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
final MemberName currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
for (String shard : currentSnapshot.getShardList()) {
if (!configuredShardList.contains(shard)) {
// add the current member as a replica for the shard
- LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+ LOG.debug("{}: adding shard {}", persistenceId(), shard);
configuration.addMemberReplicaForShard(shard, currentMember);
} else {
configuredShardList.remove(shard);
}
for (String shard : configuredShardList) {
// remove the member as a replica for the shard
- LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+ LOG.debug("{}: removing shard {}", persistenceId(), shard);
configuration.removeMemberReplicaForShard(shard, currentMember);
}
}
- private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
- LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+ private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
+ LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
persistenceId());
deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
0, 0));
}
+ private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
+ LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
+
+ String shardName = changeMembersVotingStatus.getShardName();
+ Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+ for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+ serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
+ e.getValue());
+ }
+
+ ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
+
+ findLocalShard(shardName, getSender(),
+ localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+ localShardFound.getPath(), getSender()));
+ }
+
+ private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
+ LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
+
+ ActorRef sender = getSender();
+ final String shardName = flipMembersVotingStatus.getShardName();
+ findLocalShard(shardName, sender, localShardFound -> {
+ Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
+ Timeout.apply(30, TimeUnit.SECONDS));
+
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("Failed to access local shard %s", shardName), failure)), self());
+ return;
+ }
+
+ OnDemandRaftState raftState = (OnDemandRaftState) response;
+ Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+ for ( Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+ serverVotingStatusMap.put(e.getKey(), !e.getValue());
+ }
+
+ serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
+ .toString(), !raftState.isVoting());
+
+ changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
+ shardName, localShardFound.getPath(), sender);
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ });
+
+ }
+
+ private void findLocalShard(FindLocalShard message) {
+ final ShardInformation shardInformation = localShards.get(message.getShardName());
+
+ if (shardInformation == null) {
+ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+ return;
+ }
+
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
+ () -> new LocalShardFound(shardInformation.getActor()));
+ }
+
+ private void findLocalShard(final String shardName, final ActorRef sender,
+ final Consumer<LocalShardFound> onLocalShardFound) {
+ Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+ .getShardInitializationTimeout().duration().$times(2));
+
+ Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
+ failure);
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("Failed to find local shard %s", shardName), failure)), self());
+ } else {
+ if (response instanceof LocalShardFound) {
+ getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
+ sender);
+ } else if (response instanceof LocalShardNotFound) {
+ String msg = String.format("Local shard %s does not exist", shardName);
+ LOG.debug("{}: {}", persistenceId, msg);
+ sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+ } else {
+ String msg = String.format("Failed to find local shard %s: received response: %s",
+ shardName, response);
+ LOG.debug("{}: {}", persistenceId, msg);
+ sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
+ new RuntimeException(msg)), self());
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
+ final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
+ changeServersVotingStatus, shardActorRef.path());
+
+ Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
+ Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ if (failure != null) {
+ String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
+ shardActorRef.path());
+ LOG.debug("{}: {}", persistenceId(), msg, failure);
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+
+ ServerChangeReply replyMsg = (ServerChangeReply) response;
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+ sender.tell(new Status.Success(null), getSelf());
+ } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+ sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+ "The requested voting state change for shard %s is invalid. At least one member "
+ + "must be voting", shardId.getShardName()))), getSelf());
+ } else {
+ LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+ persistenceId(), shardName, replyMsg.getStatus());
+
+ Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+ replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+ sender.tell(new Status.Failure(error), getSelf());
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
private static final class ForwardedAddServerReply {
ShardInformation shardInfo;
AddServerReply addServerReply;
}
}
- private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
- Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
- getShardInitializationTimeout().duration().$times(2));
-
-
- Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
- futureObj.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object response) {
- if (failure != null) {
- handler.onFailure(failure);
- } else {
- if(response instanceof RemotePrimaryShardFound) {
- handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
- } else if(response instanceof LocalPrimaryShardFound) {
- handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
- } else {
- handler.onUnknownResponse(response);
- }
- }
- }
- }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ private interface RunnableMessage extends Runnable {
}
/**
* The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
- * a remote or local find primary message is processed
+ * a remote or local find primary message is processed.
*/
- private static interface FindPrimaryResponseHandler {
+ private interface FindPrimaryResponseHandler {
/**
- * Invoked when a Failure message is received as a response
+ * Invoked when a Failure message is received as a response.
*
- * @param failure
+ * @param failure the failure exception
*/
void onFailure(Throwable failure);
/**
- * Invoked when a RemotePrimaryShardFound response is received
+ * Invoked when a RemotePrimaryShardFound response is received.
*
- * @param response
+ * @param response the response
*/
void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
/**
- * Invoked when a LocalPrimaryShardFound response is received
- * @param response
+ * Invoked when a LocalPrimaryShardFound response is received.
+ *
+ * @param response the response
*/
void onLocalPrimaryFound(LocalPrimaryShardFound response);
/**
* Invoked when an unknown response is received. This is another type of failure.
*
- * @param response
+ * @param response the response
*/
void onUnknownResponse(Object response);
}
/**
* The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
- * replica and sends a wrapped Failure response to some targetActor
+ * replica and sends a wrapped Failure response to some targetActor.
*/
- private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
+ private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
private final ActorRef targetActor;
private final String shardName;
private final String persistenceId;
private final ActorRef shardManagerActor;
/**
+ * Constructs an instance.
+ *
* @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
* @param shardName The name of the shard for which the primary replica had to be found
* @param persistenceId The persistenceId for the ShardManager
* @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
*/
- protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
+ protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
+ ActorRef shardManagerActor) {
this.targetActor = Preconditions.checkNotNull(targetActor);
this.shardName = Preconditions.checkNotNull(shardName);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
@Override
public void onFailure(Throwable failure) {
- LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
+ LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
targetActor.tell(new Status.Failure(new RuntimeException(
String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
}
public void onUnknownResponse(Object response) {
String msg = String.format("Failed to find leader for shard %s: received response: %s",
shardName, response);
- LOG.debug ("{}: {}", persistenceId, msg);
+ LOG.debug("{}: {}", persistenceId, msg);
targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
new RuntimeException(msg)), shardManagerActor);
}
}
- /**
- * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
- * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
- * as a successful response to find primary.
- */
- private static class PrimaryShardFoundForContext {
- private final String shardName;
- private final Object contextMessage;
- private final RemotePrimaryShardFound remotePrimaryShardFound;
- private final LocalPrimaryShardFound localPrimaryShardFound;
-
- public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
- @Nonnull Object primaryFoundMessage) {
- this.shardName = Preconditions.checkNotNull(shardName);
- this.contextMessage = Preconditions.checkNotNull(contextMessage);
- Preconditions.checkNotNull(primaryFoundMessage);
- this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
- (RemotePrimaryShardFound) primaryFoundMessage : null;
- this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
- (LocalPrimaryShardFound) primaryFoundMessage : null;
- }
-
- @Nonnull
- String getPrimaryPath(){
- if(remotePrimaryShardFound != null) {
- return remotePrimaryShardFound.getPrimaryPath();
- }
- return localPrimaryShardFound.getPrimaryPath();
- }
-
- @Nonnull
- Object getContextMessage() {
- return contextMessage;
- }
-
- @Nullable
- RemotePrimaryShardFound getRemotePrimaryShardFound() {
- return remotePrimaryShardFound;
- }
-
- @Nonnull
- String getShardName() {
- return shardName;
- }
- }
-
/**
* The WrappedShardResponse class wraps a response from a Shard.
*/