package org.opendaylight.controller.cluster.datastore.shardmanager;
import static akka.pattern.Patterns.ask;
+
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.PoisonPill;
-import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
+import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
import akka.pattern.Patterns;
+import akka.persistence.DeleteSnapshotsFailure;
+import akka.persistence.DeleteSnapshotsSuccess;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
-import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Sets;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.commons.lang3.SerializationUtils;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.Shard;
-import org.opendaylight.controller.cluster.datastore.ShardManagerSnapshot;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
-import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
-import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
/**
- * The ShardManager has the following jobs,
+ * Manages the shards for a data store. The ShardManager has the following jobs:
* <ul>
* <li> Create all the local shard replicas that belong on this cluster member
* <li> Find the address of the local shard
* <li> Find the primary replica for any given shard
* <li> Monitor the cluster members and store their addresses
- * <ul>
+ * </ul>
*/
-public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
-
+class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
// Stores a mapping between a shard name and it's corresponding information
private final String shardDispatcherPath;
- private final ShardManagerInfo mBean;
+ private final ShardManagerInfo shardManagerMBean;
private DatastoreContextFactory datastoreContextFactory;
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
+ private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
+
private final String persistenceId;
+ private final AbstractDataStore dataStore;
- /**
- */
- protected ShardManager(AbstractBuilder<?> builder) {
+ private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
+ private PrefixedShardConfigUpdateHandler configUpdateHandler;
- this.cluster = builder.cluster;
- this.configuration = builder.configuration;
- this.datastoreContextFactory = builder.datastoreContextFactory;
- this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
+ ShardManager(final AbstractShardManagerCreator<?> builder) {
+ this.cluster = builder.getCluster();
+ this.configuration = builder.getConfiguration();
+ this.datastoreContextFactory = builder.getDatastoreContextFactory();
+ this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
- this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
- this.primaryShardInfoCache = builder.primaryShardInfoCache;
- this.restoreFromSnapshot = builder.restoreFromSnapshot;
+ this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
+ this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
+ this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- List<String> localShardActorNames = new ArrayList<>();
- mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
+ shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
"shard-manager-" + this.type,
- datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
- localShardActorNames);
- mBean.setShardManager(this);
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+ shardManagerMBean.registerMBean();
+
+ dataStore = builder.getDistributedDataStore();
+ }
+
+ @Override
+ public void preStart() {
+ LOG.info("Starting ShardManager {}", persistenceId);
}
@Override
public void postStop() {
LOG.info("Stopping ShardManager {}", persistenceId());
- mBean.unregisterMBean();
+ shardManagerMBean.unregisterMBean();
+
+ if (configListenerReg != null) {
+ configListenerReg.close();
+ configListenerReg = null;
+ }
}
@Override
- public void handleCommand(Object message) throws Exception {
+ public void handleCommand(final Object message) throws Exception {
if (message instanceof FindPrimary) {
findPrimary((FindPrimary)message);
- } else if(message instanceof FindLocalShard){
+ } else if (message instanceof FindLocalShard) {
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
- } else if(message instanceof ActorInitialized) {
+ } else if (message instanceof ActorInitialized) {
onActorInitialized(message);
- } else if (message instanceof ClusterEvent.MemberUp){
+ } else if (message instanceof ClusterEvent.MemberUp) {
memberUp((ClusterEvent.MemberUp) message);
- } else if (message instanceof ClusterEvent.MemberExited){
+ } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
+ memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
+ } else if (message instanceof ClusterEvent.MemberExited) {
memberExited((ClusterEvent.MemberExited) message);
- } else if(message instanceof ClusterEvent.MemberRemoved) {
+ } else if (message instanceof ClusterEvent.MemberRemoved) {
memberRemoved((ClusterEvent.MemberRemoved) message);
- } else if(message instanceof ClusterEvent.UnreachableMember) {
- memberUnreachable((ClusterEvent.UnreachableMember)message);
- } else if(message instanceof ClusterEvent.ReachableMember) {
+ } else if (message instanceof ClusterEvent.UnreachableMember) {
+ memberUnreachable((ClusterEvent.UnreachableMember) message);
+ } else if (message instanceof ClusterEvent.ReachableMember) {
memberReachable((ClusterEvent.ReachableMember) message);
- } else if(message instanceof DatastoreContextFactory) {
- onDatastoreContextFactory((DatastoreContextFactory)message);
- } else if(message instanceof RoleChangeNotification) {
+ } else if (message instanceof DatastoreContextFactory) {
+ onDatastoreContextFactory((DatastoreContextFactory) message);
+ } else if (message instanceof RoleChangeNotification) {
onRoleChangeNotification((RoleChangeNotification) message);
- } else if(message instanceof FollowerInitialSyncUpStatus){
+ } else if (message instanceof FollowerInitialSyncUpStatus) {
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
- } else if(message instanceof ShardNotInitializedTimeout) {
- onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
- } else if(message instanceof ShardLeaderStateChanged) {
+ } else if (message instanceof ShardNotInitializedTimeout) {
+ onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
+ } else if (message instanceof ShardLeaderStateChanged) {
onLeaderStateChanged((ShardLeaderStateChanged) message);
- } else if(message instanceof SwitchShardBehavior){
+ } else if (message instanceof SwitchShardBehavior) {
onSwitchShardBehavior((SwitchShardBehavior) message);
- } else if(message instanceof CreateShard) {
+ } else if (message instanceof CreateShard) {
onCreateShard((CreateShard)message);
- } else if(message instanceof AddShardReplica){
- onAddShardReplica((AddShardReplica)message);
- } else if(message instanceof ForwardedAddServerReply) {
+ } else if (message instanceof AddShardReplica) {
+ onAddShardReplica((AddShardReplica) message);
+ } else if (message instanceof AddPrefixShardReplica) {
+ onAddPrefixShardReplica((AddPrefixShardReplica) message);
+ } else if (message instanceof PrefixShardCreated) {
+ onPrefixShardCreated((PrefixShardCreated) message);
+ } else if (message instanceof PrefixShardRemoved) {
+ onPrefixShardRemoved((PrefixShardRemoved) message);
+ } else if (message instanceof InitConfigListener) {
+ onInitConfigListener();
+ } else if (message instanceof ForwardedAddServerReply) {
ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
msg.removeShardOnFailure);
- } else if(message instanceof ForwardedAddServerFailure) {
+ } else if (message instanceof ForwardedAddServerFailure) {
ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if(message instanceof PrimaryShardFoundForContext) {
- PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
- onPrimaryShardFoundContext(primaryShardFoundContext);
- } else if(message instanceof RemoveShardReplica) {
+ } else if (message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
- } else if(message instanceof WrappedShardResponse){
+ } else if (message instanceof RemovePrefixShardReplica) {
+ onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
+ } else if (message instanceof WrappedShardResponse) {
onWrappedShardResponse((WrappedShardResponse) message);
- } else if(message instanceof GetSnapshot) {
+ } else if (message instanceof GetSnapshot) {
onGetSnapshot();
- } else if(message instanceof ServerRemoved){
+ } else if (message instanceof ServerRemoved) {
onShardReplicaRemoved((ServerRemoved) message);
- } else if(message instanceof SaveSnapshotSuccess) {
- onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
- } else if(message instanceof SaveSnapshotFailure) {
- LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
- persistenceId(), ((SaveSnapshotFailure) message).cause());
- } else if(message instanceof Shutdown) {
+ } else if (message instanceof ChangeShardMembersVotingStatus) {
+ onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+ } else if (message instanceof FlipShardMembersVotingStatus) {
+ onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
+ } else if (message instanceof SaveSnapshotSuccess) {
+ onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+ } else if (message instanceof SaveSnapshotFailure) {
+ LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
+ ((SaveSnapshotFailure) message).cause());
+ } else if (message instanceof Shutdown) {
onShutDown();
+ } else if (message instanceof GetLocalShardIds) {
+ onGetLocalShardIds();
+ } else if (message instanceof GetShardRole) {
+ onGetShardRole((GetShardRole) message);
+ } else if (message instanceof RunnableMessage) {
+ ((RunnableMessage)message).run();
+ } else if (message instanceof DeleteSnapshotsFailure) {
+ LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
+ ((DeleteSnapshotsFailure) message).cause());
+ } else if (message instanceof DeleteSnapshotsSuccess) {
+ LOG.debug("{}: Successfully deleted prior snapshots", persistenceId(), message);
+ } else if (message instanceof RegisterRoleChangeListenerReply) {
+ LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
+ } else if (message instanceof ClusterEvent.MemberEvent) {
+ LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message);
} else {
unknownMessage(message);
}
}
+ private void onGetShardRole(final GetShardRole message) {
+ LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
+
+ final String name = message.getName();
+
+ final ShardInformation shardInformation = localShards.get(name);
+
+ if (shardInformation == null) {
+ LOG.info("{}: no shard information for {} found", persistenceId(), name);
+ getSender().tell(new Status.Failure(
+ new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender());
+ return;
+ }
+
+ getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
+ }
+
+ private void onInitConfigListener() {
+ LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
+
+ final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
+ org.opendaylight.mdsal.common.api.LogicalDatastoreType
+ .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
+
+ if (configUpdateHandler != null) {
+ configUpdateHandler.close();
+ }
+
+ configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
+ configUpdateHandler.initListener(dataStore, type);
+ }
+
private void onShutDown() {
List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
for (ShardInformation info : localShards.values()) {
if (info.getActor() != null) {
LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
- FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+ FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(2);
stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
}
}
LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
- ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+ ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
+ .getDispatcher(Dispatchers.DispatcherType.Client);
Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
@Override
- public void onComplete(Throwable failure, Iterable<Boolean> results) {
+ public void onComplete(final Throwable failure, final Iterable<Boolean> results) {
LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
self().tell(PoisonPill.getInstance(), self());
- if(failure != null) {
+ if (failure != null) {
LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
} else {
int nfailed = 0;
- for(Boolean r: results) {
- if(!r) {
+ for (Boolean result : results) {
+ if (!result) {
nfailed++;
}
}
- if(nfailed > 0) {
+ if (nfailed > 0) {
LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
}
}
}, dispatcher);
}
- private void onWrappedShardResponse(WrappedShardResponse message) {
+ private void onWrappedShardResponse(final WrappedShardResponse message) {
if (message.getResponse() instanceof RemoveServerReply) {
onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
message.getLeaderPath());
}
}
- private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
- String leaderPath) {
- shardReplicaOperationsInProgress.remove(shardId);
+ private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId,
+ final RemoveServerReply replyMsg, final String leaderPath) {
+ shardReplicaOperationsInProgress.remove(shardId.getShardName());
- LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+ LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+ LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
shardId.getShardName());
- originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+ originalSender.tell(new Status.Success(null), getSelf());
} else {
- LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+ LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
persistenceId(), shardId, replyMsg.getStatus());
- Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
- leaderPath, shardId);
- originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+ Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
+ originalSender.tell(new Status.Failure(failure), getSelf());
}
}
- private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
- if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
- addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
- getSender());
- } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
- removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
- primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
+ private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
+ final String primaryPath, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
}
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+ final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+ //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+ LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ primaryPath, shardId);
+
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
+ Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+ new RemoveServer(shardId.toString()), removeServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+ primaryPath, shardName);
+
+ LOG.debug("{}: {}", persistenceId(), msg, failure);
+
+ // FAILURE
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ // SUCCESS
+ self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
- private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
- final ActorRef sender) {
- if(isShardReplicaOperationInProgress(shardName, sender)) {
+ private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
+ final String primaryPath, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
}
final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
//inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
- LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
primaryPath, shardId);
- Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
- duration());
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
new RemoveServer(shardId.toString()), removeServerTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object response) {
+ public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
+ shardReplicaOperationsInProgress.remove(shardName);
String msg = String.format("RemoveServer request to leader %s for shard %s failed",
primaryPath, shardName);
- LOG.debug ("{}: {}", persistenceId(), msg, failure);
+ LOG.debug("{}: {}", persistenceId(), msg, failure);
// FAILURE
sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
- private void onShardReplicaRemoved(ServerRemoved message) {
- final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
- final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
- if(shardInformation == null) {
+ private void onShardReplicaRemoved(final ServerRemoved message) {
+ removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void removeShard(final ShardIdentifier shardId) {
+ final String shardName = shardId.getShardName();
+ final ShardInformation shardInformation = localShards.remove(shardName);
+ if (shardInformation == null) {
LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
return;
- } else if(shardInformation.getActor() != null) {
- LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
- shardInformation.getActor().tell(Shutdown.INSTANCE, self());
}
- LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+
+ final ActorRef shardActor = shardInformation.getActor();
+ if (shardActor != null) {
+ long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
+
+ LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
+ timeoutInMS);
+
+ final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
+ FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
+
+ final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
+ @Override
+ public void onComplete(final Throwable failure, final Boolean result) {
+ if (failure == null) {
+ LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
+ } else {
+ LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
+ }
+
+ self().tell((RunnableMessage) () -> {
+ shardActorsStopping.remove(shardName);
+ notifyOnCompleteTasks(failure, result);
+ }, ActorRef.noSender());
+ }
+ };
+
+ shardActorsStopping.put(shardName, onComplete);
+ stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
+ .getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
persistShardList();
}
LOG.debug("{}: onGetSnapshot", persistenceId());
List<String> notInitialized = null;
- for(ShardInformation shardInfo: localShards.values()) {
- if(!shardInfo.isShardInitialized()) {
- if(notInitialized == null) {
+ for (ShardInformation shardInfo : localShards.values()) {
+ if (!shardInfo.isShardInitialized()) {
+ if (notInitialized == null) {
notInitialized = new ArrayList<>();
}
}
}
- if(notInitialized != null) {
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+ if (notInitialized != null) {
+ getSender().tell(new Status.Failure(new IllegalStateException(String.format(
"%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
return;
}
- byte[] shardManagerSnapshot = null;
- if(currentSnapshot != null) {
- shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
- }
-
ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
- new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
+ new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
- for(ShardInformation shardInfo: localShards.values()) {
+ for (ShardInformation shardInfo: localShards.values()) {
shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
}
}
- private void onCreateShard(CreateShard createShard) {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void onCreateShard(final CreateShard createShard) {
LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
Object reply;
try {
String shardName = createShard.getModuleShardConfig().getShardName();
- if(localShards.containsKey(shardName)) {
+ if (localShards.containsKey(shardName)) {
LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
- reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+ reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
doCreateShard(createShard);
- reply = new akka.actor.Status.Success(null);
+ reply = new Status.Success(null);
}
} catch (Exception e) {
LOG.error("{}: onCreateShard failed", persistenceId(), e);
- reply = new akka.actor.Status.Failure(e);
+ reply = new Status.Failure(e);
}
- if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
getSender().tell(reply, getSelf());
}
}
- private void doCreateShard(CreateShard createShard) {
- ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
- String shardName = moduleShardConfig.getShardName();
+ private void onPrefixShardCreated(final PrefixShardCreated message) {
+ LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
+
+ final PrefixShardConfiguration config = message.getConfiguration();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
+ final String shardName = shardId.getShardName();
+
+ if (isPreviousShardActorStopInProgress(shardName, message)) {
+ return;
+ }
+
+ if (localShards.containsKey(shardName)) {
+ LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
+ final PrefixShardConfiguration existing =
+ configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
+
+ if (existing != null && existing.equals(config)) {
+ // we don't have to do nothing here
+ return;
+ }
+ }
+
+ doCreatePrefixShard(config, shardId, shardName);
+ }
+
+ private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
+ final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
+ if (stopOnComplete == null) {
+ return false;
+ }
+
+ LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
+ shardName, messageToDefer);
+ final ActorRef sender = getSender();
+ stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
+ @Override
+ public void onComplete(final Throwable failure, final Boolean result) {
+ LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
+ self().tell(messageToDefer, sender);
+ }
+ });
+
+ return true;
+ }
+
+ private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId,
+ final String shardName) {
+ configuration.addPrefixShardConfiguration(config);
+
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
+ .storeRoot(config.getPrefix().getRootIdentifier());
+ DatastoreContext shardDatastoreContext = builder.build();
+
+ final Map<String, String> peerAddresses = getPeerAddresses(shardName);
+ final boolean isActiveMember = true;
+
+ LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
+
+ final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+ shardDatastoreContext, Shard.builder(), peerAddressResolver);
+ info.setActiveMember(isActiveMember);
+ localShards.put(info.getShardName(), info);
+
+ if (schemaContext != null) {
+ info.setSchemaContext(schemaContext);
+ info.setActor(newShardActor(info));
+ }
+ }
+
+ private void onPrefixShardRemoved(final PrefixShardRemoved message) {
+ LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
+
+ final DOMDataTreeIdentifier prefix = message.getPrefix();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
+
+ configuration.removePrefixShardConfiguration(prefix);
+ removeShard(shardId);
+ }
+
+ private void doCreateShard(final CreateShard createShard) {
+ final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ final String shardName = moduleShardConfig.getShardName();
configuration.addModuleShardConfiguration(moduleShardConfig);
DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
- if(shardDatastoreContext == null) {
+ if (shardDatastoreContext == null) {
shardDatastoreContext = newShardDatastoreContext(shardName);
} else {
shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
- currentSnapshot.getShardList().contains(shardName);
+ boolean shardWasInRecoveredSnapshot = currentSnapshot != null
+ && currentSnapshot.getShardList().contains(shardName);
Map<String, String> peerAddresses;
boolean isActiveMember;
- if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
- contains(cluster.getCurrentMemberName())) {
+ if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
+ .contains(cluster.getCurrentMemberName())) {
peerAddresses = getPeerAddresses(shardName);
isActiveMember = true;
} else {
// subsequent AddServer request will be needed to make it an active member.
isActiveMember = false;
peerAddresses = Collections.emptyMap();
- shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
- customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
}
LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
info.setActiveMember(isActiveMember);
localShards.put(info.getShardName(), info);
- mBean.addLocalShard(shardId.toString());
-
- if(schemaContext != null) {
- info.setActor(newShardActor(schemaContext, info));
+ if (schemaContext != null) {
+ info.setSchemaContext(schemaContext);
+ info.setActor(newShardActor(info));
}
}
- private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
- return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
- shardPeerAddressResolver(peerAddressResolver);
+ private DatastoreContext.Builder newShardDatastoreContextBuilder(final String shardName) {
+ return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
+ .shardPeerAddressResolver(peerAddressResolver);
}
- private DatastoreContext newShardDatastoreContext(String shardName) {
+ private DatastoreContext newShardDatastoreContext(final String shardName) {
return newShardDatastoreContextBuilder(shardName).build();
}
- private void checkReady(){
+ private void checkReady() {
if (isReadyWithLeaderId()) {
LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
persistenceId(), type, waitTillReadyCountdownLatch.getCount());
}
}
- private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
+ private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) {
LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
- if(shardInformation != null) {
+ if (shardInformation != null) {
shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
- if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+ if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
}
}
}
- private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+ private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
ShardInformation shardInfo = message.getShardInfo();
LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
- if(!shardInfo.isShardInitialized()) {
+ if (!shardInfo.isShardInitialized()) {
LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
+ message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
- message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+ message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
}
}
- private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
+ private void onFollowerInitialSyncStatus(final FollowerInitialSyncUpStatus status) {
LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
status.getName(), status.isInitialSyncDone());
ShardInformation shardInformation = findShardInformation(status.getName());
- if(shardInformation != null) {
+ if (shardInformation != null) {
shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
- mBean.setSyncStatus(isInSync());
+ shardManagerMBean.setSyncStatus(isInSync());
}
}
- private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
+ private void onRoleChangeNotification(final RoleChangeNotification roleChanged) {
LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
roleChanged.getOldRole(), roleChanged.getNewRole());
ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
- if(shardInformation != null) {
+ if (shardInformation != null) {
shardInformation.setRole(roleChanged.getNewRole());
checkReady();
- mBean.setSyncStatus(isInSync());
+ shardManagerMBean.setSyncStatus(isInSync());
}
}
- private ShardInformation findShardInformation(String memberId) {
- for(ShardInformation info : localShards.values()){
- if(info.getShardId().toString().equals(memberId)){
+ private ShardInformation findShardInformation(final String memberId) {
+ for (ShardInformation info : localShards.values()) {
+ if (info.getShardId().toString().equals(memberId)) {
return info;
}
}
private boolean isReadyWithLeaderId() {
boolean isReady = true;
for (ShardInformation info : localShards.values()) {
- if(!info.isShardReadyWithLeaderId()){
+ if (!info.isShardReadyWithLeaderId()) {
isReady = false;
break;
}
return isReady;
}
- private boolean isInSync(){
+ private boolean isInSync() {
for (ShardInformation info : localShards.values()) {
- if(!info.isInSync()){
+ if (!info.isInSync()) {
return false;
}
}
return true;
}
- private void onActorInitialized(Object message) {
+ private void onActorInitialized(final Object message) {
final ActorRef sender = getSender();
if (sender == null) {
String actorName = sender.path().name();
//find shard name from actor name; actor name is stringified shardId
- ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
- if (shardId.getShardName() == null) {
+ final ShardIdentifier shardId;
+ try {
+ shardId = ShardIdentifier.fromShardIdString(actorName);
+ } catch (IllegalArgumentException e) {
+ LOG.debug("{}: ignoring actor {}", actorName, e);
return;
}
markShardAsInitialized(shardId.getShardName());
}
- private void markShardAsInitialized(String shardName) {
+ private void markShardAsInitialized(final String shardName) {
LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
ShardInformation shardInformation = localShards.get(shardName);
}
@Override
- protected void handleRecover(Object message) throws Exception {
+ protected void handleRecover(final Object message) throws Exception {
if (message instanceof RecoveryCompleted) {
onRecoveryCompleted();
} else if (message instanceof SnapshotOffer) {
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void onRecoveryCompleted() {
LOG.info("Recovery complete : {}", persistenceId());
- // We no longer persist SchemaContext modules so delete all the prior messages from the akka
- // journal on upgrade from Helium.
- deleteMessages(lastSequenceNr());
-
- if(currentSnapshot == null && restoreFromSnapshot != null &&
- restoreFromSnapshot.getShardManagerSnapshot() != null) {
- try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
- restoreFromSnapshot.getShardManagerSnapshot()))) {
- ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
+ if (currentSnapshot == null && restoreFromSnapshot != null
+ && restoreFromSnapshot.getShardManagerSnapshot() != null) {
+ ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
- LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
+ LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
- applyShardManagerSnapshot(snapshot);
- } catch(Exception e) {
- LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
- }
+ applyShardManagerSnapshot(snapshot);
}
createLocalShards();
}
- private void findLocalShard(FindLocalShard message) {
- final ShardInformation shardInformation = localShards.get(message.getShardName());
-
- if(shardInformation == null){
- getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
- return;
- }
-
- sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
- @Override
- public Object get() {
- return new LocalShardFound(shardInformation.getActor());
- }
- });
- }
-
- private void sendResponse(ShardInformation shardInformation, boolean doWait,
- boolean wantShardReady, final Supplier<Object> messageSupplier) {
- if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
- if(doWait) {
+ private void sendResponse(final ShardInformation shardInformation, final boolean doWait,
+ final boolean wantShardReady, final Supplier<Object> messageSupplier) {
+ if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
+ if (doWait) {
final ActorRef sender = getSender();
final ActorRef self = self();
- Runnable replyRunnable = new Runnable() {
- @Override
- public void run() {
- sender.tell(messageSupplier.get(), self);
- }
- };
+ Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
new OnShardInitialized(replyRunnable);
shardInformation.addOnShardInitialized(onShardInitialized);
- FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
- if(shardInformation.isShardInitialized()) {
+ FiniteDuration timeout = shardInformation.getDatastoreContext()
+ .getShardInitializationTimeout().duration();
+ if (shardInformation.isShardInitialized()) {
// If the shard is already initialized then we'll wait enough time for the shard to
// elect a leader, ie 2 times the election timeout.
timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
} else if (!shardInformation.isShardInitialized()) {
LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
shardInformation.getShardName());
- getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
+ getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
shardInformation.getShardName());
- getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
+ getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
}
return;
getSender().tell(messageSupplier.get(), getSelf());
}
- private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+ private static NoShardLeaderException createNoShardLeaderException(final ShardIdentifier shardId) {
return new NoShardLeaderException(null, shardId.toString());
}
- private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+ private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
return new NotInitializedException(String.format(
"Found primary shard %s but it's not initialized yet. Please try again later", shardId));
}
- private void memberRemoved(ClusterEvent.MemberRemoved message) {
- String memberName = message.member().roles().iterator().next();
+ @VisibleForTesting
+ static MemberName memberToName(final Member member) {
+ return MemberName.forName(member.roles().iterator().next());
+ }
+
+ private void memberRemoved(final ClusterEvent.MemberRemoved message) {
+ MemberName memberName = memberToName(message.member());
- LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
peerAddressResolver.removePeerAddress(memberName);
- for(ShardInformation info : localShards.values()){
+ for (ShardInformation info : localShards.values()) {
info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
}
}
- private void memberExited(ClusterEvent.MemberExited message) {
- String memberName = message.member().roles().iterator().next();
+ private void memberExited(final ClusterEvent.MemberExited message) {
+ MemberName memberName = memberToName(message.member());
- LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
peerAddressResolver.removePeerAddress(memberName);
- for(ShardInformation info : localShards.values()){
+ for (ShardInformation info : localShards.values()) {
info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
}
}
- private void memberUp(ClusterEvent.MemberUp message) {
- String memberName = message.member().roles().iterator().next();
+ private void memberUp(final ClusterEvent.MemberUp message) {
+ MemberName memberName = memberToName(message.member());
- LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
message.member().address());
- addPeerAddress(memberName, message.member().address());
+ memberUp(memberName, message.member().address());
+ }
+ private void memberUp(final MemberName memberName, final Address address) {
+ addPeerAddress(memberName, address);
checkReady();
}
- private void addPeerAddress(String memberName, Address address) {
+ private void memberWeaklyUp(final MemberWeaklyUp message) {
+ MemberName memberName = memberToName(message.member());
+
+ LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+ message.member().address());
+
+ memberUp(memberName, message.member().address());
+ }
+
+ private void addPeerAddress(final MemberName memberName, final Address address) {
peerAddressResolver.addPeerAddress(memberName, address);
- for(ShardInformation info : localShards.values()){
+ for (ShardInformation info : localShards.values()) {
String shardName = info.getShardName();
String peerId = getShardIdentifier(memberName, shardName).toString();
info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
}
}
- private void memberReachable(ClusterEvent.ReachableMember message) {
- String memberName = message.member().roles().iterator().next();
- LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
+ private void memberReachable(final ClusterEvent.ReachableMember message) {
+ MemberName memberName = memberToName(message.member());
+ LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
addPeerAddress(memberName, message.member().address());
markMemberAvailable(memberName);
}
- private void memberUnreachable(ClusterEvent.UnreachableMember message) {
- String memberName = message.member().roles().iterator().next();
- LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
+ private void memberUnreachable(final ClusterEvent.UnreachableMember message) {
+ MemberName memberName = memberToName(message.member());
+ LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
markMemberUnavailable(memberName);
}
- private void markMemberUnavailable(final String memberName) {
- for(ShardInformation info : localShards.values()){
+ private void markMemberUnavailable(final MemberName memberName) {
+ final String memberStr = memberName.getName();
+ for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- if(leaderId != null && leaderId.contains(memberName)) {
+ // XXX: why are we using String#contains() here?
+ if (leaderId != null && leaderId.contains(memberStr)) {
LOG.debug("Marking Leader {} as unavailable.", leaderId);
info.setLeaderAvailable(false);
}
}
- private void markMemberAvailable(final String memberName) {
- for(ShardInformation info : localShards.values()){
+ private void markMemberAvailable(final MemberName memberName) {
+ final String memberStr = memberName.getName();
+ for (ShardInformation info : localShards.values()) {
String leaderId = info.getLeaderId();
- if(leaderId != null && leaderId.contains(memberName)) {
+ // XXX: why are we using String#contains() here?
+ if (leaderId != null && leaderId.contains(memberStr)) {
LOG.debug("Marking Leader {} as available.", leaderId);
info.setLeaderAvailable(true);
}
}
}
- private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+ private void onDatastoreContextFactory(final DatastoreContextFactory factory) {
datastoreContextFactory = factory;
for (ShardInformation info : localShards.values()) {
info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
}
}
- private void onSwitchShardBehavior(SwitchShardBehavior message) {
- ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+ private void onGetLocalShardIds() {
+ final List<String> response = new ArrayList<>(localShards.size());
+
+ for (ShardInformation info : localShards.values()) {
+ response.add(info.getShardId().toString());
+ }
+
+ getSender().tell(new Status.Success(response), getSelf());
+ }
+
+ private void onSwitchShardBehavior(final SwitchShardBehavior message) {
+ final ShardIdentifier identifier = message.getShardId();
+
+ if (identifier != null) {
+ final ShardInformation info = localShards.get(identifier.getShardName());
+ if (info == null) {
+ getSender().tell(new Status.Failure(
+ new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
+ return;
+ }
+
+ switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
+ } else {
+ for (ShardInformation info : localShards.values()) {
+ switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
+ }
+ }
- ShardInformation shardInformation = localShards.get(identifier.getShardName());
+ getSender().tell(new Status.Success(null), getSelf());
+ }
- if(shardInformation != null && shardInformation.getActor() != null) {
- shardInformation.getActor().tell(
- new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
+ private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
+ final ActorRef actor = info.getActor();
+ if (actor != null) {
+ actor.tell(switchBehavior, getSelf());
} else {
LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
- message.getShardName(), message.getNewState());
+ info.getShardName(), switchBehavior.getNewState());
}
}
/**
- * Notifies all the local shards of a change in the schema context
+ * Notifies all the local shards of a change in the schema context.
*
- * @param message
+ * @param message the message to send
*/
private void updateSchemaContext(final Object message) {
schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
for (ShardInformation info : localShards.values()) {
+ info.setSchemaContext(schemaContext);
+
if (info.getActor() == null) {
LOG.debug("Creating Shard {}", info.getShardId());
- info.setActor(newShardActor(schemaContext, info));
+ info.setActor(newShardActor(info));
} else {
info.getActor().tell(message, getSelf());
}
}
@VisibleForTesting
- protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
- return getContext().actorOf(info.newProps(schemaContext)
- .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+ protected ActorRef newShardActor(final ShardInformation info) {
+ return getContext().actorOf(info.newProps().withDispatcher(shardDispatcherPath),
+ info.getShardId().toString());
}
- private void findPrimary(FindPrimary message) {
+ private void findPrimary(final FindPrimary message) {
LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
final String shardName = message.getShardName();
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
if (info != null && info.isActiveMember()) {
- sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
- @Override
- public Object get() {
- String primaryPath = info.getSerializedLeaderActor();
- Object found = canReturnLocalShardState && info.isLeader() ?
- new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
- new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
- }
-
- return found;
- }
+ sendResponse(info, message.isWaitUntilReady(), true, () -> {
+ String primaryPath = info.getSerializedLeaderActor();
+ Object found = canReturnLocalShardState && info.isLeader()
+ ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+ new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
+
+ LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+ return found;
});
return;
}
final Collection<String> visitedAddresses;
- if(message instanceof RemoteFindPrimary) {
+ if (message instanceof RemoteFindPrimary) {
visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
} else {
visitedAddresses = new ArrayList<>(1);
visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
- for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
- if(visitedAddresses.contains(address)) {
+ for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
+ if (visitedAddresses.contains(address)) {
continue;
}
String.format("No primary shard found for %s.", shardName)), getSelf());
}
+ private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
+ Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+ .getShardInitializationTimeout().duration().$times(2));
+
+ Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ handler.onFailure(failure);
+ } else {
+ if (response instanceof RemotePrimaryShardFound) {
+ handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
+ } else if (response instanceof LocalPrimaryShardFound) {
+ handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
+ } else {
+ handler.onUnknownResponse(response);
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
/**
* Construct the name of the shard actor given the name of the member on
- * which the shard resides and the name of the shard
+ * which the shard resides and the name of the shard.
*
- * @param memberName
- * @param shardName
- * @return
+ * @param memberName the member name
+ * @param shardName the shard name
+ * @return a b
*/
- private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+ private ShardIdentifier getShardIdentifier(final MemberName memberName, final String shardName) {
return peerAddressResolver.getShardIdentifier(memberName, shardName);
}
/**
- * Create shards that are local to the member on which the ShardManager
- * runs
- *
+ * Create shards that are local to the member on which the ShardManager runs.
*/
private void createLocalShards() {
- String memberName = this.cluster.getCurrentMemberName();
+ MemberName memberName = this.cluster.getCurrentMemberName();
Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
- if(restoreFromSnapshot != null)
- {
- for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+ if (restoreFromSnapshot != null) {
+ for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
shardSnapshots.put(snapshot.getName(), snapshot);
}
}
restoreFromSnapshot = null; // null out to GC
- for(String shardName : memberShardNames){
+ for (String shardName : memberShardNames) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
shardSnapshots.get(shardName)), peerAddressResolver));
- mBean.addLocalShard(shardId.toString());
}
}
/**
- * Given the name of the shard find the addresses of all it's peers
+ * Given the name of the shard find the addresses of all it's peers.
*
- * @param shardName
+ * @param shardName the shard name
*/
- private Map<String, String> getPeerAddresses(String shardName) {
- Collection<String> members = configuration.getMembersFromShardName(shardName);
- Map<String, String> peerAddresses = new HashMap<>();
+ private Map<String, String> getPeerAddresses(final String shardName) {
+ final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
+ return getPeerAddresses(shardName, members);
+ }
- String currentMemberName = this.cluster.getCurrentMemberName();
+ private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
+ Map<String, String> peerAddresses = new HashMap<>();
+ MemberName currentMemberName = this.cluster.getCurrentMemberName();
- for(String memberName : members) {
- if(!currentMemberName.equals(memberName)) {
+ for (MemberName memberName : members) {
+ if (!currentMemberName.equals(memberName)) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
peerAddresses.put(shardId.toString(), address);
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(10, Duration.create("1 minute"),
- new Function<Throwable, SupervisorStrategy.Directive>() {
- @Override
- public SupervisorStrategy.Directive apply(Throwable t) {
- LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
- return SupervisorStrategy.resume();
- }
- }
- );
-
+ (Function<Throwable, Directive>) t -> {
+ LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+ return SupervisorStrategy.resume();
+ });
}
@Override
}
@VisibleForTesting
- ShardManagerInfoMBean getMBean(){
- return mBean;
+ ShardManagerInfoMBean getMBean() {
+ return shardManagerMBean;
}
private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
if (shardReplicaOperationsInProgress.contains(shardName)) {
String msg = String.format("A shard replica operation for %s is already in progress", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ LOG.debug("{}: {}", persistenceId(), msg);
+ sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return true;
}
return false;
}
- private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
+ private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
+ LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
+
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(message.getShardPrefix()));
+ final String shardName = shardId.getShardName();
+
+ // Create the localShard
+ if (schemaContext == null) {
+ String msg = String.format(
+ "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+ LOG.debug("{}: {}", persistenceId(), msg);
+ getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
+ return;
+ }
+
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+ getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+ final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
+ message.getShardPrefix(), response, getSender());
+ if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+ getSelf().tell(runnable, getTargetActor());
+ }
+ }
+
+ @Override
+ public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+ }
+ });
+ }
+
+ private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
// verify the shard with the specified name is present in the cluster configuration
- if (!(this.configuration.isShardConfigured(shardName))) {
+ if (!this.configuration.isShardConfigured(shardName)) {
String msg = String.format("No module configuration exists for shard %s", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
- getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ LOG.debug("{}: {}", persistenceId(), msg);
+ getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
return;
}
if (schemaContext == null) {
String msg = String.format(
"No SchemaContext is available in order to create a local shard instance for %s", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
- getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ LOG.debug("{}: {}", persistenceId(), msg);
+ getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
return;
}
- findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+ getSelf()) {
@Override
- public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+ final RunnableMessage runnable = (RunnableMessage) () ->
+ addShard(getShardName(), response, getSender());
+ if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
+ getSelf().tell(runnable, getTargetActor());
+ }
}
@Override
- public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+ public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
}
-
});
}
- private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
+ private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
String msg = String.format("Local shard %s already exists", shardName);
- LOG.debug ("{}: {}", persistenceId(), msg);
- sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+ LOG.debug("{}: {}", persistenceId(), msg);
+ sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
+ }
+
+ private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
+ final RemotePrimaryShardFound response, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardInformation shardInfo;
+ final boolean removeShardOnFailure;
+ ShardInformation existingShardInfo = localShards.get(shardName);
+ if (existingShardInfo == null) {
+ removeShardOnFailure = true;
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ final Builder builder = newShardDatastoreContextBuilder(shardName);
+ builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ DatastoreContext datastoreContext = builder.build();
+
+ shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setActiveMember(false);
+ shardInfo.setSchemaContext(schemaContext);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(shardInfo));
+ } else {
+ removeShardOnFailure = false;
+ shardInfo = existingShardInfo;
+ }
+
+ execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
}
private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
- if(isShardReplicaOperationInProgress(shardName, sender)) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
return;
}
final ShardInformation shardInfo;
final boolean removeShardOnFailure;
ShardInformation existingShardInfo = localShards.get(shardName);
- if(existingShardInfo == null) {
+ if (existingShardInfo == null) {
removeShardOnFailure = true;
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
- DisableElectionsRaftPolicy.class.getName()).build();
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
Shard.builder(), peerAddressResolver);
shardInfo.setActiveMember(false);
+ shardInfo.setSchemaContext(schemaContext);
localShards.put(shardName, shardInfo);
- shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ shardInfo.setActor(newShardActor(shardInfo));
} else {
removeShardOnFailure = false;
shardInfo = existingShardInfo;
}
- String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+ execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
+ }
+
+ private void execAddShard(final String shardName,
+ final ShardInformation shardInfo,
+ final RemotePrimaryShardFound response,
+ final boolean removeShardOnFailure,
+ final ActorRef sender) {
+
+ final String localShardAddress =
+ peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
//inform ShardLeader to add this shard as a replica by sending an AddServer message
- LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+ LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
response.getPrimaryPath(), shardInfo.getShardId());
- Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
- duration());
- Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
- new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
+ final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
+ .getShardLeaderElectionTimeout().duration());
+ final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+ new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
futureObj.onComplete(new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object addServerResponse) {
+ public void onComplete(final Throwable failure, final Object addServerResponse) {
if (failure != null) {
- LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+ LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
response.getPrimaryPath(), shardName, failure);
- String msg = String.format("AddServer request to leader %s for shard %s failed",
+ final String msg = String.format("AddServer request to leader %s for shard %s failed",
response.getPrimaryPath(), shardName);
self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
} else {
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
- private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
- boolean removeShardOnFailure) {
+ private void onAddServerFailure(final String shardName, final String message, final Throwable failure,
+ final ActorRef sender, final boolean removeShardOnFailure) {
shardReplicaOperationsInProgress.remove(shardName);
- if(removeShardOnFailure) {
+ if (removeShardOnFailure) {
ShardInformation shardInfo = localShards.remove(shardName);
if (shardInfo.getActor() != null) {
shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
}
}
- sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+ sender.tell(new Status.Failure(message == null ? failure :
new RuntimeException(message, failure)), getSelf());
}
- private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
- String leaderPath, boolean removeShardOnFailure) {
+ private void onAddServerReply(final ShardInformation shardInfo, final AddServerReply replyMsg,
+ final ActorRef sender, final String leaderPath, final boolean removeShardOnFailure) {
String shardName = shardInfo.getShardName();
shardReplicaOperationsInProgress.remove(shardName);
- LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+ LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+ LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
// Make the local shard voting capable
shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
shardInfo.setActiveMember(true);
persistShardList();
- mBean.addLocalShard(shardInfo.getShardId().toString());
- sender.tell(new akka.actor.Status.Success(null), getSelf());
- } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+ sender.tell(new Status.Success(null), getSelf());
+ } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
sendLocalReplicaAlreadyExistsReply(shardName, sender);
} else {
- LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
+ LOG.warn("{}: Leader failed to add shard replica {} with status {}",
persistenceId(), shardName, replyMsg.getStatus());
- Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
+ Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
+ shardInfo.getShardId());
onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
}
}
- private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
- String leaderPath, ShardIdentifier shardId) {
- Exception failure;
+ private static Exception getServerChangeException(final Class<?> serverChange,
+ final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) {
switch (serverChangeStatus) {
case TIMEOUT:
- failure = new TimeoutException(String.format(
- "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
- "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
- leaderPath, shardId.getShardName()));
- break;
+ return new TimeoutException(String.format(
+ "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
+ + "Possible causes - there was a problem replicating the data or shard leadership changed "
+ + "while replicating the shard data", leaderPath, shardId.getShardName()));
case NO_LEADER:
- failure = createNoShardLeaderException(shardId);
- break;
+ return createNoShardLeaderException(shardId);
case NOT_SUPPORTED:
- failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
+ return new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
serverChange.getSimpleName(), shardId.getShardName()));
- break;
default :
- failure = new RuntimeException(String.format(
- "%s request to leader %s for shard %s failed with status %s",
+ return new RuntimeException(String.format("%s request to leader %s for shard %s failed with status %s",
serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
}
- return failure;
}
- private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+ private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
@Override
- public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ private void doRemoveShardReplicaAsync(final String primaryPath) {
+ getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
+ primaryPath, getSender()), getTargetActor());
+ }
+ });
+ }
+
+ private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
+ LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
+
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ ClusterUtils.getCleanShardName(message.getShardPrefix()));
+ final String shardName = shardId.getShardName();
+
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
+ shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
}
@Override
- public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
- getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
+ doRemoveShardReplicaAsync(response.getPrimaryPath());
+ }
+
+ private void doRemoveShardReplicaAsync(final String primaryPath) {
+ getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
+ primaryPath, getSender()), getTargetActor());
}
});
}
shardList.remove(shardInfo.getShardName());
}
}
- LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
- saveSnapshot(updateShardManagerSnapshot(shardList));
+ LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
+ saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
}
- private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
- currentSnapshot = new ShardManagerSnapshot(shardList);
+ private ShardManagerSnapshot updateShardManagerSnapshot(
+ final List<String> shardList,
+ final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
+ currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
return currentSnapshot;
}
- private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
+ private void applyShardManagerSnapshot(final ShardManagerSnapshot snapshot) {
currentSnapshot = snapshot;
- LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
+ LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
- String currentMember = cluster.getCurrentMemberName();
+ final MemberName currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
new HashSet<>(configuration.getMemberShardNames(currentMember));
for (String shard : currentSnapshot.getShardList()) {
if (!configuredShardList.contains(shard)) {
// add the current member as a replica for the shard
- LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+ LOG.debug("{}: adding shard {}", persistenceId(), shard);
configuration.addMemberReplicaForShard(shard, currentMember);
} else {
configuredShardList.remove(shard);
}
for (String shard : configuredShardList) {
// remove the member as a replica for the shard
- LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+ LOG.debug("{}: removing shard {}", persistenceId(), shard);
configuration.removeMemberReplicaForShard(shard, currentMember);
}
}
- private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
- LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+ private void onSaveSnapshotSuccess(final SaveSnapshotSuccess successMessage) {
+ LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
persistenceId());
deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
0, 0));
}
- private static class ForwardedAddServerReply {
- ShardInformation shardInfo;
- AddServerReply addServerReply;
- String leaderPath;
- boolean removeShardOnFailure;
+ private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
+ LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
- ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
- boolean removeShardOnFailure) {
- this.shardInfo = shardInfo;
- this.addServerReply = addServerReply;
- this.leaderPath = leaderPath;
- this.removeShardOnFailure = removeShardOnFailure;
+ String shardName = changeMembersVotingStatus.getShardName();
+ Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+ for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
+ serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
+ e.getValue());
}
- }
- private static class ForwardedAddServerFailure {
- String shardName;
- String failureMessage;
- Throwable failure;
- boolean removeShardOnFailure;
+ ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
- ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
- boolean removeShardOnFailure) {
- this.shardName = shardName;
- this.failureMessage = failureMessage;
- this.failure = failure;
- this.removeShardOnFailure = removeShardOnFailure;
- }
+ findLocalShard(shardName, getSender(),
+ localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
+ localShardFound.getPath(), getSender()));
}
- @VisibleForTesting
- protected static class ShardInformation {
- private final ShardIdentifier shardId;
- private final String shardName;
- private ActorRef actor;
- private final Map<String, String> initialPeerAddresses;
- private Optional<DataTree> localShardDataTree;
- private boolean leaderAvailable = false;
-
- // flag that determines if the actor is ready for business
- private boolean actorInitialized = false;
+ private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) {
+ LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
- private boolean followerSyncStatus = false;
+ ActorRef sender = getSender();
+ final String shardName = flipMembersVotingStatus.getShardName();
+ findLocalShard(shardName, sender, localShardFound -> {
+ Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
+ Timeout.apply(30, TimeUnit.SECONDS));
- private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
- private String role ;
- private String leaderId;
- private short leaderVersion;
-
- private DatastoreContext datastoreContext;
- private Shard.AbstractBuilder<?, ?> builder;
- private final ShardPeerAddressResolver addressResolver;
- private boolean isActiveMember = true;
-
- private ShardInformation(String shardName, ShardIdentifier shardId,
- Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
- Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
- this.shardName = shardName;
- this.shardId = shardId;
- this.initialPeerAddresses = initialPeerAddresses;
- this.datastoreContext = datastoreContext;
- this.builder = builder;
- this.addressResolver = addressResolver;
- }
-
- Props newProps(SchemaContext schemaContext) {
- Preconditions.checkNotNull(builder);
- Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
- schemaContext(schemaContext).props();
- builder = null;
- return props;
- }
-
- String getShardName() {
- return shardName;
- }
-
- @Nullable
- ActorRef getActor(){
- return actor;
- }
-
- void setActor(ActorRef actor) {
- this.actor = actor;
- }
-
- ShardIdentifier getShardId() {
- return shardId;
- }
-
- void setLocalDataTree(Optional<DataTree> localShardDataTree) {
- this.localShardDataTree = localShardDataTree;
- }
-
- Optional<DataTree> getLocalShardDataTree() {
- return localShardDataTree;
- }
-
- DatastoreContext getDatastoreContext() {
- return datastoreContext;
- }
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("Failed to access local shard %s", shardName), failure)), self());
+ return;
+ }
- void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
- this.datastoreContext = datastoreContext;
- if (actor != null) {
- LOG.debug ("Sending new DatastoreContext to {}", shardId);
- actor.tell(this.datastoreContext, sender);
- }
- }
+ OnDemandRaftState raftState = (OnDemandRaftState) response;
+ Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
+ for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
+ serverVotingStatusMap.put(e.getKey(), !e.getValue());
+ }
- void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
- LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+ serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
+ .toString(), !raftState.isVoting());
- if(actor != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
- peerId, peerAddress, actor.path());
+ changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
+ shardName, localShardFound.getPath(), sender);
}
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ });
- actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
- }
-
- notifyOnShardInitializedCallbacks();
- }
-
- void peerDown(String memberName, String peerId, ActorRef sender) {
- if(actor != null) {
- actor.tell(new PeerDown(memberName, peerId), sender);
- }
- }
-
- void peerUp(String memberName, String peerId, ActorRef sender) {
- if(actor != null) {
- actor.tell(new PeerUp(memberName, peerId), sender);
- }
- }
-
- boolean isShardReady() {
- return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
- }
-
- boolean isShardReadyWithLeaderId() {
- return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
- (isLeader() || addressResolver.resolve(leaderId) != null);
- }
-
- boolean isShardInitialized() {
- return getActor() != null && actorInitialized;
- }
-
- boolean isLeader() {
- return Objects.equal(leaderId, shardId.toString());
- }
+ }
- String getSerializedLeaderActor() {
- if(isLeader()) {
- return Serialization.serializedActorPath(getActor());
- } else {
- return addressResolver.resolve(leaderId);
- }
- }
+ private void findLocalShard(final FindLocalShard message) {
+ LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
- void setActorInitialized() {
- LOG.debug("Shard {} is initialized", shardId);
+ final ShardInformation shardInformation = localShards.get(message.getShardName());
- this.actorInitialized = true;
+ if (shardInformation == null) {
+ LOG.debug("{}: Local shard {} not found - shards present: {}",
+ persistenceId(), message.getShardName(), localShards.keySet());
- notifyOnShardInitializedCallbacks();
+ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+ return;
}
- private void notifyOnShardInitializedCallbacks() {
- if(onShardInitializedSet.isEmpty()) {
- return;
- }
-
- boolean ready = isShardReadyWithLeaderId();
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
+ () -> new LocalShardFound(shardInformation.getActor()));
+ }
- if(LOG.isDebugEnabled()) {
- LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
- ready ? "ready" : "initialized", onShardInitializedSet.size());
- }
+ private void findLocalShard(final String shardName, final ActorRef sender,
+ final Consumer<LocalShardFound> onLocalShardFound) {
+ Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
+ .getShardInitializationTimeout().duration().$times(2));
- Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
- while(iter.hasNext()) {
- OnShardInitialized onShardInitialized = iter.next();
- if(!(onShardInitialized instanceof OnShardReady) || ready) {
- iter.remove();
- onShardInitialized.getTimeoutSchedule().cancel();
- onShardInitialized.getReplyRunnable().run();
+ Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ if (failure != null) {
+ LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
+ failure);
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("Failed to find local shard %s", shardName), failure)), self());
+ } else {
+ if (response instanceof LocalShardFound) {
+ getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
+ sender);
+ } else if (response instanceof LocalShardNotFound) {
+ String msg = String.format("Local shard %s does not exist", shardName);
+ LOG.debug("{}: {}", persistenceId, msg);
+ sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
+ } else {
+ String msg = String.format("Failed to find local shard %s: received response: %s",
+ shardName, response);
+ LOG.debug("{}: {}", persistenceId, msg);
+ sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
+ new RuntimeException(msg)), self());
+ }
}
}
- }
-
- void addOnShardInitialized(OnShardInitialized onShardInitialized) {
- onShardInitializedSet.add(onShardInitialized);
- }
-
- void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
- onShardInitializedSet.remove(onShardInitialized);
- }
-
- void setRole(String newRole) {
- this.role = newRole;
-
- notifyOnShardInitializedCallbacks();
- }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
- void setFollowerSyncStatus(boolean syncStatus){
- this.followerSyncStatus = syncStatus;
+ private void changeShardMembersVotingStatus(final ChangeServersVotingStatus changeServersVotingStatus,
+ final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
}
- boolean isInSync(){
- if(RaftState.Follower.name().equals(this.role)){
- return followerSyncStatus;
- } else if(RaftState.Leader.name().equals(this.role)){
- return true;
- }
+ shardReplicaOperationsInProgress.add(shardName);
- return false;
- }
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+ final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- boolean setLeaderId(String leaderId) {
- boolean changed = !Objects.equal(this.leaderId, leaderId);
- this.leaderId = leaderId;
- if(leaderId != null) {
- this.leaderAvailable = true;
- }
- notifyOnShardInitializedCallbacks();
+ LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
+ changeServersVotingStatus, shardActorRef.path());
- return changed;
- }
+ Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
+ Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
- String getLeaderId() {
- return leaderId;
- }
-
- void setLeaderAvailable(boolean leaderAvailable) {
- this.leaderAvailable = leaderAvailable;
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ if (failure != null) {
+ String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
+ shardActorRef.path());
+ LOG.debug("{}: {}", persistenceId(), msg, failure);
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+
+ ServerChangeReply replyMsg = (ServerChangeReply) response;
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
+ sender.tell(new Status.Success(null), getSelf());
+ } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+ sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+ "The requested voting state change for shard %s is invalid. At least one member "
+ + "must be voting", shardId.getShardName()))), getSelf());
+ } else {
+ LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+ persistenceId(), shardName, replyMsg.getStatus());
- if(leaderAvailable) {
- notifyOnShardInitializedCallbacks();
+ Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+ replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+ sender.tell(new Status.Failure(error), getSelf());
+ }
+ }
}
- }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
- short getLeaderVersion() {
- return leaderVersion;
- }
+ private static final class ForwardedAddServerReply {
+ ShardInformation shardInfo;
+ AddServerReply addServerReply;
+ String leaderPath;
+ boolean removeShardOnFailure;
- void setLeaderVersion(short leaderVersion) {
- this.leaderVersion = leaderVersion;
+ ForwardedAddServerReply(final ShardInformation shardInfo, final AddServerReply addServerReply,
+ final String leaderPath, final boolean removeShardOnFailure) {
+ this.shardInfo = shardInfo;
+ this.addServerReply = addServerReply;
+ this.leaderPath = leaderPath;
+ this.removeShardOnFailure = removeShardOnFailure;
}
+ }
- boolean isActiveMember() {
- return isActiveMember;
- }
+ private static final class ForwardedAddServerFailure {
+ String shardName;
+ String failureMessage;
+ Throwable failure;
+ boolean removeShardOnFailure;
- void setActiveMember(boolean isActiveMember) {
- this.isActiveMember = isActiveMember;
+ ForwardedAddServerFailure(final String shardName, final String failureMessage, final Throwable failure,
+ final boolean removeShardOnFailure) {
+ this.shardName = shardName;
+ this.failureMessage = failureMessage;
+ this.failure = failure;
+ this.removeShardOnFailure = removeShardOnFailure;
}
}
- private static class OnShardInitialized {
+ static class OnShardInitialized {
private final Runnable replyRunnable;
private Cancellable timeoutSchedule;
- OnShardInitialized(Runnable replyRunnable) {
+ OnShardInitialized(final Runnable replyRunnable) {
this.replyRunnable = replyRunnable;
}
return timeoutSchedule;
}
- void setTimeoutSchedule(Cancellable timeoutSchedule) {
+ void setTimeoutSchedule(final Cancellable timeoutSchedule) {
this.timeoutSchedule = timeoutSchedule;
}
}
- private static class OnShardReady extends OnShardInitialized {
- OnShardReady(Runnable replyRunnable) {
+ static class OnShardReady extends OnShardInitialized {
+ OnShardReady(final Runnable replyRunnable) {
super(replyRunnable);
}
}
- private static class ShardNotInitializedTimeout {
- private final ActorRef sender;
- private final ShardInformation shardInfo;
- private final OnShardInitialized onShardInitialized;
-
- ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
- this.sender = sender;
- this.shardInfo = shardInfo;
- this.onShardInitialized = onShardInitialized;
- }
-
- ActorRef getSender() {
- return sender;
- }
-
- ShardInformation getShardInfo() {
- return shardInfo;
- }
-
- OnShardInitialized getOnShardInitialized() {
- return onShardInitialized;
- }
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
- private ClusterWrapper cluster;
- private Configuration configuration;
- private DatastoreContextFactory datastoreContextFactory;
- private CountDownLatch waitTillReadyCountdownLatch;
- private PrimaryShardInfoFutureCache primaryShardInfoCache;
- private DatastoreSnapshot restoreFromSnapshot;
- private volatile boolean sealed;
-
- @SuppressWarnings("unchecked")
- private T self() {
- return (T) this;
- }
-
- protected void checkSealed() {
- Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
- }
-
- public T cluster(ClusterWrapper cluster) {
- checkSealed();
- this.cluster = cluster;
- return self();
- }
-
- public T configuration(Configuration configuration) {
- checkSealed();
- this.configuration = configuration;
- return self();
- }
-
- public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
- checkSealed();
- this.datastoreContextFactory = datastoreContextFactory;
- return self();
- }
-
- public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
- checkSealed();
- this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
- return self();
- }
-
- public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
- checkSealed();
- this.primaryShardInfoCache = primaryShardInfoCache;
- return self();
- }
-
- public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
- checkSealed();
- this.restoreFromSnapshot = restoreFromSnapshot;
- return self();
- }
-
- protected void verify() {
- sealed = true;
- Preconditions.checkNotNull(cluster, "cluster should not be null");
- Preconditions.checkNotNull(configuration, "configuration should not be null");
- Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
- Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
- Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
- }
-
- public Props props() {
- verify();
- return Props.create(ShardManager.class, this);
- }
- }
-
- public static class Builder extends AbstractBuilder<Builder> {
- }
-
- private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
- Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
- getShardInitializationTimeout().duration().$times(2));
-
-
- Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
- futureObj.onComplete(new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object response) {
- if (failure != null) {
- handler.onFailure(failure);
- } else {
- if(response instanceof RemotePrimaryShardFound) {
- handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
- } else if(response instanceof LocalPrimaryShardFound) {
- handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
- } else {
- handler.onUnknownResponse(response);
- }
- }
- }
- }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ private interface RunnableMessage extends Runnable {
}
/**
* The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
- * a remote or local find primary message is processed
+ * a remote or local find primary message is processed.
*/
- private static interface FindPrimaryResponseHandler {
+ private interface FindPrimaryResponseHandler {
/**
- * Invoked when a Failure message is received as a response
+ * Invoked when a Failure message is received as a response.
*
- * @param failure
+ * @param failure the failure exception
*/
void onFailure(Throwable failure);
/**
- * Invoked when a RemotePrimaryShardFound response is received
+ * Invoked when a RemotePrimaryShardFound response is received.
*
- * @param response
+ * @param response the response
*/
void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
/**
- * Invoked when a LocalPrimaryShardFound response is received
- * @param response
+ * Invoked when a LocalPrimaryShardFound response is received.
+ *
+ * @param response the response
*/
void onLocalPrimaryFound(LocalPrimaryShardFound response);
/**
* Invoked when an unknown response is received. This is another type of failure.
*
- * @param response
+ * @param response the response
*/
void onUnknownResponse(Object response);
}
/**
* The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
- * replica and sends a wrapped Failure response to some targetActor
+ * replica and sends a wrapped Failure response to some targetActor.
*/
- private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
+ private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
private final ActorRef targetActor;
private final String shardName;
private final String persistenceId;
private final ActorRef shardManagerActor;
/**
+ * Constructs an instance.
+ *
* @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
* @param shardName The name of the shard for which the primary replica had to be found
* @param persistenceId The persistenceId for the ShardManager
* @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
*/
- protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
+ protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
+ final String persistenceId, final ActorRef shardManagerActor) {
this.targetActor = Preconditions.checkNotNull(targetActor);
this.shardName = Preconditions.checkNotNull(shardName);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
}
@Override
- public void onFailure(Throwable failure) {
- LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
- targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
+ public void onFailure(final Throwable failure) {
+ LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
+ targetActor.tell(new Status.Failure(new RuntimeException(
String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
}
@Override
- public void onUnknownResponse(Object response) {
+ public void onUnknownResponse(final Object response) {
String msg = String.format("Failed to find leader for shard %s: received response: %s",
shardName, response);
- LOG.debug ("{}: {}", persistenceId, msg);
- targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
+ LOG.debug("{}: {}", persistenceId, msg);
+ targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
new RuntimeException(msg)), shardManagerActor);
}
}
-
- /**
- * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
- * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
- * as a successful response to find primary.
- */
- private static class PrimaryShardFoundForContext {
- private final String shardName;
- private final Object contextMessage;
- private final RemotePrimaryShardFound remotePrimaryShardFound;
- private final LocalPrimaryShardFound localPrimaryShardFound;
-
- public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
- @Nonnull Object primaryFoundMessage) {
- this.shardName = Preconditions.checkNotNull(shardName);
- this.contextMessage = Preconditions.checkNotNull(contextMessage);
- Preconditions.checkNotNull(primaryFoundMessage);
- this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
- (RemotePrimaryShardFound) primaryFoundMessage : null;
- this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
- (LocalPrimaryShardFound) primaryFoundMessage : null;
- }
-
- @Nonnull
- String getPrimaryPath(){
- if(remotePrimaryShardFound != null) {
- return remotePrimaryShardFound.getPrimaryPath();
- }
- return localPrimaryShardFound.getPrimaryPath();
- }
-
- @Nonnull
- Object getContextMessage() {
- return contextMessage;
- }
-
- @Nullable
- RemotePrimaryShardFound getRemotePrimaryShardFound() {
- return remotePrimaryShardFound;
- }
-
- @Nonnull
- String getShardName() {
- return shardName;
- }
- }
-
/**
* The WrappedShardResponse class wraps a response from a Shard.
*/
- private static class WrappedShardResponse {
+ private static final class WrappedShardResponse {
private final ShardIdentifier shardId;
private final Object response;
private final String leaderPath;
- private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
+ WrappedShardResponse(final ShardIdentifier shardId, final Object response, final String leaderPath) {
this.shardId = shardId;
this.response = response;
this.leaderPath = leaderPath;
return leaderPath;
}
}
+
+ private static final class ShardNotInitializedTimeout {
+ private final ActorRef sender;
+ private final ShardInformation shardInfo;
+ private final OnShardInitialized onShardInitialized;
+
+ ShardNotInitializedTimeout(final ShardInformation shardInfo, final OnShardInitialized onShardInitialized,
+ final ActorRef sender) {
+ this.sender = sender;
+ this.shardInfo = shardInfo;
+ this.onShardInitialized = onShardInitialized;
+ }
+
+ ActorRef getSender() {
+ return sender;
+ }
+
+ ShardInformation getShardInfo() {
+ return shardInfo;
+ }
+
+ OnShardInitialized getOnShardInitialized() {
+ return onShardInitialized;
+ }
+ }
}