X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=inline;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=77a0e88393d4760cf251de4d1f4df7bcb904f97e;hb=3859df9beca8f13f1ff2b2744ed3470a1715bec3;hp=4c9fcc1dbab70e27800a7528d8d49bed80997f49;hpb=e7ce18361e9d6f2126525934f73438e9841e39bc;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
index 4c9fcc1dba..77a0e88393 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java
@@ -9,121 +9,136 @@
package org.opendaylight.controller.cluster.datastore.shardmanager;
import static akka.pattern.Patterns.ask;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.PoisonPill;
-import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
import akka.cluster.ClusterEvent;
+import akka.cluster.ClusterEvent.MemberWeaklyUp;
+import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
import akka.pattern.Patterns;
+import akka.persistence.DeleteSnapshotsFailure;
+import akka.persistence.DeleteSnapshotsSuccess;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
-import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
-import com.google.common.collect.Sets;
-import java.io.ByteArrayInputStream;
-import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.commons.lang3.SerializationUtils;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.common.actor.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
+import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
+import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
-import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
+import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
-import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
+import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
+import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
- * The ShardManager has the following jobs,
+ * Manages the shards for a data store. The ShardManager has the following jobs:
*
* - Create all the local shard replicas that belong on this cluster member
*
- Find the address of the local shard
*
- Find the primary replica for any given shard
*
- Monitor the cluster members and store their addresses
- *
*/
class ShardManager extends AbstractUntypedPersistentActorWithMetering {
-
private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
// Stores a mapping between a shard name and it's corresponding information
// Shard names look like inventory, topology etc and are as specified in
// configuration
- private final Map localShards = new HashMap<>();
+ @VisibleForTesting
+ final Map localShards = new HashMap<>();
// The type of a ShardManager reflects the type of the datastore itself
// A data store could be of type config/operational
@@ -133,9 +148,10 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private final Configuration configuration;
- private final String shardDispatcherPath;
+ @VisibleForTesting
+ final String shardDispatcherPath;
- private final ShardManagerInfo mBean;
+ private final ShardManagerInfo shardManagerMBean;
private DatastoreContextFactory datastoreContextFactory;
@@ -143,7 +159,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
- private final ShardPeerAddressResolver peerAddressResolver;
+ @VisibleForTesting
+ final ShardPeerAddressResolver peerAddressResolver;
private SchemaContext schemaContext;
@@ -153,16 +170,23 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private final Set shardReplicaOperationsInProgress = new HashSet<>();
+ private final Map> shardActorsStopping = new HashMap<>();
+
+ private final Set> shardAvailabilityCallbacks = new HashSet<>();
+
private final String persistenceId;
+ private final AbstractDataStore dataStore;
- ShardManager(AbstractShardManagerCreator> builder) {
+ private PrefixedShardConfigUpdateHandler configUpdateHandler;
+
+ ShardManager(final AbstractShardManagerCreator> builder) {
this.cluster = builder.getCluster();
this.configuration = builder.getConfiguration();
- this.datastoreContextFactory = builder.getDdatastoreContextFactory();
+ this.datastoreContextFactory = builder.getDatastoreContextFactory();
this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
- this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
+ this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
@@ -174,120 +198,200 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
- mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
+ shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
+ "shard-manager-" + this.type,
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
- mBean.registerMBean();
+ shardManagerMBean.registerMBean();
+
+ dataStore = builder.getDistributedDataStore();
+ }
+
+ @Override
+ public void preStart() {
+ LOG.info("Starting ShardManager {}", persistenceId);
}
@Override
public void postStop() {
LOG.info("Stopping ShardManager {}", persistenceId());
- mBean.unregisterMBean();
+ shardManagerMBean.unregisterMBean();
}
@Override
- public void handleCommand(Object message) throws Exception {
+ public void handleCommand(final Object message) throws Exception {
if (message instanceof FindPrimary) {
findPrimary((FindPrimary)message);
- } else if(message instanceof FindLocalShard){
+ } else if (message instanceof FindLocalShard) {
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
- } else if(message instanceof ActorInitialized) {
+ } else if (message instanceof ActorInitialized) {
onActorInitialized(message);
- } else if (message instanceof ClusterEvent.MemberUp){
+ } else if (message instanceof ClusterEvent.MemberUp) {
memberUp((ClusterEvent.MemberUp) message);
- } else if (message instanceof ClusterEvent.MemberExited){
+ } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
+ memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
+ } else if (message instanceof ClusterEvent.MemberExited) {
memberExited((ClusterEvent.MemberExited) message);
- } else if(message instanceof ClusterEvent.MemberRemoved) {
+ } else if (message instanceof ClusterEvent.MemberRemoved) {
memberRemoved((ClusterEvent.MemberRemoved) message);
- } else if(message instanceof ClusterEvent.UnreachableMember) {
- memberUnreachable((ClusterEvent.UnreachableMember)message);
- } else if(message instanceof ClusterEvent.ReachableMember) {
+ } else if (message instanceof ClusterEvent.UnreachableMember) {
+ memberUnreachable((ClusterEvent.UnreachableMember) message);
+ } else if (message instanceof ClusterEvent.ReachableMember) {
memberReachable((ClusterEvent.ReachableMember) message);
- } else if(message instanceof DatastoreContextFactory) {
- onDatastoreContextFactory((DatastoreContextFactory)message);
- } else if(message instanceof RoleChangeNotification) {
+ } else if (message instanceof DatastoreContextFactory) {
+ onDatastoreContextFactory((DatastoreContextFactory) message);
+ } else if (message instanceof RoleChangeNotification) {
onRoleChangeNotification((RoleChangeNotification) message);
- } else if(message instanceof FollowerInitialSyncUpStatus){
+ } else if (message instanceof FollowerInitialSyncUpStatus) {
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
- } else if(message instanceof ShardNotInitializedTimeout) {
- onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
- } else if(message instanceof ShardLeaderStateChanged) {
+ } else if (message instanceof ShardNotInitializedTimeout) {
+ onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
+ } else if (message instanceof ShardLeaderStateChanged) {
onLeaderStateChanged((ShardLeaderStateChanged) message);
- } else if(message instanceof SwitchShardBehavior){
+ } else if (message instanceof SwitchShardBehavior) {
onSwitchShardBehavior((SwitchShardBehavior) message);
- } else if(message instanceof CreateShard) {
+ } else if (message instanceof CreateShard) {
onCreateShard((CreateShard)message);
- } else if(message instanceof AddShardReplica){
- onAddShardReplica((AddShardReplica)message);
- } else if(message instanceof ForwardedAddServerReply) {
+ } else if (message instanceof AddShardReplica) {
+ onAddShardReplica((AddShardReplica) message);
+ } else if (message instanceof AddPrefixShardReplica) {
+ onAddPrefixShardReplica((AddPrefixShardReplica) message);
+ } else if (message instanceof PrefixShardCreated) {
+ onPrefixShardCreated((PrefixShardCreated) message);
+ } else if (message instanceof PrefixShardRemoved) {
+ onPrefixShardRemoved((PrefixShardRemoved) message);
+ } else if (message instanceof InitConfigListener) {
+ onInitConfigListener();
+ } else if (message instanceof ForwardedAddServerReply) {
ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
msg.removeShardOnFailure);
- } else if(message instanceof ForwardedAddServerFailure) {
+ } else if (message instanceof ForwardedAddServerFailure) {
ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if(message instanceof PrimaryShardFoundForContext) {
- PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
- onPrimaryShardFoundContext(primaryShardFoundContext);
- } else if(message instanceof RemoveShardReplica) {
+ } else if (message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
- } else if(message instanceof WrappedShardResponse){
+ } else if (message instanceof RemovePrefixShardReplica) {
+ onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
+ } else if (message instanceof WrappedShardResponse) {
onWrappedShardResponse((WrappedShardResponse) message);
- } else if(message instanceof GetSnapshot) {
+ } else if (message instanceof GetSnapshot) {
onGetSnapshot();
- } else if(message instanceof ServerRemoved){
+ } else if (message instanceof ServerRemoved) {
onShardReplicaRemoved((ServerRemoved) message);
- } else if(message instanceof SaveSnapshotSuccess) {
- onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
- } else if(message instanceof SaveSnapshotFailure) {
- LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
- persistenceId(), ((SaveSnapshotFailure) message).cause());
- } else if(message instanceof Shutdown) {
+ } else if (message instanceof ChangeShardMembersVotingStatus) {
+ onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
+ } else if (message instanceof FlipShardMembersVotingStatus) {
+ onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
+ } else if (message instanceof SaveSnapshotSuccess) {
+ onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
+ } else if (message instanceof SaveSnapshotFailure) {
+ LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
+ ((SaveSnapshotFailure) message).cause());
+ } else if (message instanceof Shutdown) {
onShutDown();
} else if (message instanceof GetLocalShardIds) {
onGetLocalShardIds();
+ } else if (message instanceof GetShardRole) {
+ onGetShardRole((GetShardRole) message);
+ } else if (message instanceof RunnableMessage) {
+ ((RunnableMessage)message).run();
+ } else if (message instanceof RegisterForShardAvailabilityChanges) {
+ onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message);
+ } else if (message instanceof DeleteSnapshotsFailure) {
+ LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
+ ((DeleteSnapshotsFailure) message).cause());
+ } else if (message instanceof DeleteSnapshotsSuccess) {
+ LOG.debug("{}: Successfully deleted prior snapshots", persistenceId());
+ } else if (message instanceof RegisterRoleChangeListenerReply) {
+ LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
+ } else if (message instanceof ClusterEvent.MemberEvent) {
+ LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message);
} else {
unknownMessage(message);
}
}
+ private void onRegisterForShardAvailabilityChanges(final RegisterForShardAvailabilityChanges message) {
+ LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
+
+ final Consumer callback = message.getCallback();
+ shardAvailabilityCallbacks.add(callback);
+
+ getSender().tell(new Status.Success((Registration)
+ () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self());
+ }
+
+ private void onGetShardRole(final GetShardRole message) {
+ LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
+
+ final String name = message.getName();
+
+ final ShardInformation shardInformation = localShards.get(name);
+
+ if (shardInformation == null) {
+ LOG.info("{}: no shard information for {} found", persistenceId(), name);
+ getSender().tell(new Status.Failure(
+ new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender());
+ return;
+ }
+
+ getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
+ }
+
+ private void onInitConfigListener() {
+ LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
+
+ final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType =
+ org.opendaylight.mdsal.common.api.LogicalDatastoreType
+ .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
+
+ if (configUpdateHandler != null) {
+ configUpdateHandler.close();
+ }
+
+ configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
+ configUpdateHandler.initListener(dataStore, datastoreType);
+ }
+
private void onShutDown() {
List> stopFutures = new ArrayList<>(localShards.size());
for (ShardInformation info : localShards.values()) {
if (info.getActor() != null) {
LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
- FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+ FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
+ .getElectionTimeOutInterval().$times(2);
stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
}
}
LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
- ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+ ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
+ .getDispatcher(Dispatchers.DispatcherType.Client);
Future> combinedFutures = Futures.sequence(stopFutures, dispatcher);
combinedFutures.onComplete(new OnComplete>() {
@Override
- public void onComplete(Throwable failure, Iterable results) {
+ public void onComplete(final Throwable failure, final Iterable results) {
LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
self().tell(PoisonPill.getInstance(), self());
- if(failure != null) {
+ if (failure != null) {
LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
} else {
int nfailed = 0;
- for(Boolean r: results) {
- if(!r) {
+ for (Boolean result : results) {
+ if (!result) {
nfailed++;
}
}
- if(nfailed > 0) {
+ if (nfailed > 0) {
LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
}
}
@@ -295,25 +399,25 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
}, dispatcher);
}
- private void onWrappedShardResponse(WrappedShardResponse message) {
+ private void onWrappedShardResponse(final WrappedShardResponse message) {
if (message.getResponse() instanceof RemoveServerReply) {
onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
message.getLeaderPath());
}
}
- private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
- String leaderPath) {
- shardReplicaOperationsInProgress.remove(shardId);
+ private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId,
+ final RemoveServerReply replyMsg, final String leaderPath) {
+ shardReplicaOperationsInProgress.remove(shardId.getShardName());
- LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+ LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+ LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
shardId.getShardName());
originalSender.tell(new Status.Success(null), getSelf());
} else {
- LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+ LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
persistenceId(), shardId, replyMsg.getStatus());
Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
@@ -321,19 +425,50 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering {
}
}
- private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
- if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
- addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
- getSender());
- } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
- removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
- primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
+ private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
+ final String primaryPath, final ActorRef sender) {
+ if (isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
}
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+ final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+ //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+ LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ primaryPath, shardId);
+
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
+ Future