X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=d3d8ce39c9a108f0b497b942f9e6e5368f3c6a21;hp=05968725ccb221cd7b62b0ea38b9b66ea9e52baa;hb=aa307bc6c06d9bcf8e877553af9babc95c42c39b;hpb=7204c455a1636a7fc89bcd28fe9e9000eaa81b3b diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index 05968725cc..d3d8ce39c9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore.shardmanager; -import static akka.actor.ActorRef.noSender; import static akka.pattern.Patterns.ask; import akka.actor.ActorRef; @@ -22,14 +21,12 @@ import akka.actor.SupervisorStrategy.Directive; import akka.cluster.ClusterEvent; import akka.cluster.ClusterEvent.MemberWeaklyUp; import akka.cluster.Member; -import akka.cluster.ddata.DistributedData; -import akka.cluster.ddata.ORMap; -import akka.cluster.ddata.Replicator.Changed; -import akka.cluster.ddata.Replicator.Subscribe; import akka.dispatch.Futures; import akka.dispatch.OnComplete; import akka.japi.Function; import akka.pattern.Patterns; +import akka.persistence.DeleteSnapshotsFailure; +import akka.persistence.DeleteSnapshotsSuccess; import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; @@ -38,8 +35,6 @@ import akka.persistence.SnapshotSelectionCriteria; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import com.google.common.collect.Sets.SetView; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -54,9 +49,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Supplier; -import java.util.stream.Collectors; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; +import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; @@ -74,7 +69,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; -import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; @@ -84,6 +78,7 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; @@ -93,6 +88,7 @@ import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; @@ -109,8 +105,15 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; +import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler; +import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener; +import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; +import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -164,9 +167,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Set shardReplicaOperationsInProgress = new HashSet<>(); + private final Map> shardActorStoppingFutures = new HashMap<>(); + private final String persistenceId; + private final AbstractDataStore dataStore; - private final ActorRef replicator; + private ListenerRegistration configListenerReg = null; + private PrefixedShardConfigUpdateHandler configUpdateHandler; ShardManager(AbstractShardManagerCreator builder) { this.cluster = builder.getCluster(); @@ -192,16 +199,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); shardManagerMBean.registerMBean(); - replicator = DistributedData.get(context().system()).replicator(); - + dataStore = builder.getDistributedDataStore(); } + @Override public void preStart() { - LOG.info("Starting Shardmanager {}", persistenceId); - - final Subscribe> subscribe = - new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self()); - replicator.tell(subscribe, noSender()); + LOG.info("Starting ShardManager {}", persistenceId); } @Override @@ -209,6 +212,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.info("Stopping ShardManager {}", persistenceId()); shardManagerMBean.unregisterMBean(); + + if (configListenerReg != null) { + configListenerReg.close(); + configListenerReg = null; + } } @Override @@ -249,10 +257,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onCreateShard((CreateShard)message); } else if (message instanceof AddShardReplica) { onAddShardReplica((AddShardReplica) message); - } else if (message instanceof CreatePrefixedShard) { - onCreatePrefixedShard((CreatePrefixedShard) message); } else if (message instanceof AddPrefixShardReplica) { onAddPrefixShardReplica((AddPrefixShardReplica) message); + } else if (message instanceof PrefixShardCreated) { + onPrefixShardCreated((PrefixShardCreated) message); + } else if (message instanceof PrefixShardRemoved) { + onPrefixShardRemoved((PrefixShardRemoved) message); + } else if (message instanceof InitConfigListener) { + onInitConfigListener(); } else if (message instanceof ForwardedAddServerReply) { ForwardedAddServerReply msg = (ForwardedAddServerReply)message; onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, @@ -262,6 +274,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); } else if (message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); + } else if (message instanceof RemovePrefixShardReplica) { + onRemovePrefixShardReplica((RemovePrefixShardReplica) message); } else if (message instanceof WrappedShardResponse) { onWrappedShardResponse((WrappedShardResponse) message); } else if (message instanceof GetSnapshot) { @@ -283,13 +297,35 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onGetLocalShardIds(); } else if (message instanceof RunnableMessage) { ((RunnableMessage)message).run(); - } else if (message instanceof Changed) { - onConfigChanged((Changed) message); + } else if (message instanceof DeleteSnapshotsFailure) { + LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), + ((DeleteSnapshotsFailure) message).cause()); + } else if (message instanceof DeleteSnapshotsSuccess) { + LOG.debug("{}: Successfully deleted prior snapshots", persistenceId(), message); + } else if (message instanceof RegisterRoleChangeListenerReply) { + LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId()); + } else if (message instanceof ClusterEvent.MemberEvent) { + LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message); } else { unknownMessage(message); } } + private void onInitConfigListener() { + LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName()); + + final org.opendaylight.mdsal.common.api.LogicalDatastoreType type = + org.opendaylight.mdsal.common.api.LogicalDatastoreType + .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name()); + + if (configUpdateHandler != null) { + configUpdateHandler.close(); + } + + configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName()); + configUpdateHandler.initListener(dataStore, type); + } + private void onShutDown() { List> stopFutures = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { @@ -340,88 +376,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onConfigChanged(final Changed> change) { - LOG.debug("{}, ShardManager {} received config changed {}", - cluster.getCurrentMemberName(), persistenceId, change.dataValue().getEntries()); - - final Map changedConfig = change.dataValue().getEntries(); - - final Map newConfig = - changedConfig.values().stream().collect( - Collectors.toMap(PrefixShardConfiguration::getPrefix, java.util.function.Function.identity())); - - resolveConfig(newConfig); - } - - private void resolveConfig(final Map newConfig) { - LOG.debug("{} ShardManager : {}, resolving new shard configuration : {}", - cluster.getCurrentMemberName(), persistenceId, newConfig); - - newConfig.forEach((prefix, config) -> - LOG.debug("{} ShardManager : {}, received shard config " - + "for prefix {}, config {}", cluster.getCurrentMemberName(), persistenceId, prefix, config)); - - final SetView removedConfigs = - Sets.difference(configuration.getAllPrefixShardConfigurations().keySet(), newConfig.keySet()); - - // resolve removals - - resolveRemovals(removedConfigs); - - final SetView addedConfigs = - Sets.difference(newConfig.keySet(), configuration.getAllPrefixShardConfigurations().keySet()); - // resolve additions - - resolveAdditions(addedConfigs, newConfig); - // iter through to update existing shards, either start/stop replicas or update the shard - // to check for more peers - resolveUpdates(Collections.emptySet()); - } - - private void resolveRemovals(final Set removedConfigs) { - LOG.debug("{} ShardManager : {}, resolving removed configs : {}", - cluster.getCurrentMemberName(), persistenceId, removedConfigs); - - removedConfigs.forEach(id -> doRemovePrefixedShard(id)); - } - - private void resolveAdditions(final Set addedConfigs, - final Map configs) { - LOG.debug("{} ShardManager : {}, resolving added configs : {}", addedConfigs); - - addedConfigs.stream().filter(identifier - -> identifier - .getDatastoreType().equals( - ClusterUtils.toMDSalApi(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType()))) - .forEach(id -> doCreatePrefixedShard(configs.get(id))); - } - - private void resolveUpdates(Set maybeUpdatedConfigs) { - LOG.debug("{} ShardManager : {}, resolving potentially updated configs : {}", maybeUpdatedConfigs); - } - - private void doRemovePrefixedShard(final DOMDataTreeIdentifier prefix) { - LOG.debug("{} ShardManager : {}, removing prefix shard: {}", - cluster.getCurrentMemberName(), persistenceId, prefix); - final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix); - final ShardInformation shard = localShards.remove(shardId.getShardName()); - - configuration.removePrefixShardConfiguration(prefix); - - if (shard == null) { - LOG.warn("Received removal for unconfigured shard: {} , ignoring.. ", prefix); - return; - } - - if (shard.getActor() != null) { - LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor()); - shard.getActor().tell(Shutdown.INSTANCE, self()); - } - LOG.debug("{} : {} : Local Shard replica for shard {} has been removed", cluster.getCurrentMemberName(), - persistenceId(), shardId.getShardName()); - persistShardList(); - } - private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg, String leaderPath) { shardReplicaOperationsInProgress.remove(shardId.getShardName()); @@ -441,6 +395,46 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName, + final String primaryPath, final ActorRef sender) { + if (isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); + + final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); + + //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message + LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), + primaryPath, shardId); + + Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration()); + Future futureObj = ask(getContext().actorSelection(primaryPath), + new RemoveServer(shardId.toString()), removeServerTimeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + shardReplicaOperationsInProgress.remove(shardName); + String msg = String.format("RemoveServer request to leader %s for shard %s failed", + primaryPath, shardName); + + LOG.debug("{}: {}", persistenceId(), msg, failure); + + // FAILURE + sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self()); + } else { + // SUCCESS + self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { @@ -482,16 +476,41 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onShardReplicaRemoved(ServerRemoved message) { - final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build(); - final ShardInformation shardInformation = localShards.remove(shardId.getShardName()); + removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build()); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void removeShard(final ShardIdentifier shardId) { + final String shardName = shardId.getShardName(); + final ShardInformation shardInformation = localShards.remove(shardName); if (shardInformation == null) { LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString()); return; - } else if (shardInformation.getActor() != null) { - LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor()); - shardInformation.getActor().tell(Shutdown.INSTANCE, self()); } - LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName()); + + final ActorRef shardActor = shardInformation.getActor(); + if (shardActor != null) { + LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor); + FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig() + .getElectionTimeOutInterval().$times(3); + final Future stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE); + shardActorStoppingFutures.put(shardName, stopFuture); + stopFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Boolean result) { + if (failure == null) { + LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor); + } else { + LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure); + } + + self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName), + ActorRef.noSender()); + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName); persistShardList(); } @@ -524,31 +543,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - @SuppressWarnings("checkstyle:IllegalCatch") - private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) { - LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard); - - Object reply; - try { - final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), - createPrefixedShard.getConfig().getPrefix()); - if (localShards.containsKey(shardId.getShardName())) { - LOG.debug("{}: Shard {} already exists", persistenceId(), shardId); - reply = new Status.Success(String.format("Shard with name %s already exists", shardId)); - } else { - doCreatePrefixedShard(createPrefixedShard); - reply = new Status.Success(null); - } - } catch (final Exception e) { - LOG.error("{}: onCreateShard failed", persistenceId(), e); - reply = new Status.Failure(e); - } - - if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) { - getSender().tell(reply, getSelf()); - } - } - @SuppressWarnings("checkstyle:IllegalCatch") private void onCreateShard(CreateShard createShard) { LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard); @@ -573,18 +567,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) { - doCreatePrefixedShard(createPrefixedShard.getConfig()); - // do not replicate on this level - } - - private void doCreatePrefixedShard(final PrefixShardConfiguration config) { - LOG.debug("doCreatePrefixShard : {}", config.getPrefix()); + private void onPrefixShardCreated(final PrefixShardCreated message) { + LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message); - final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), - config.getPrefix()); + final PrefixShardConfiguration config = message.getConfiguration(); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())); final String shardName = shardId.getShardName(); + if (isPreviousShardActorStopInProgress(shardName, message)) { + return; + } + if (localShards.containsKey(shardName)) { LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName); final PrefixShardConfiguration existing = @@ -596,6 +590,30 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + doCreatePrefixShard(config, shardId, shardName); + } + + private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) { + final Future stopFuture = shardActorStoppingFutures.get(shardName); + if (stopFuture == null) { + return false; + } + + LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(), + shardName, messageToDefer); + final ActorRef sender = getSender(); + stopFuture.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Boolean result) { + LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer); + self().tell(messageToDefer, sender); + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + + return true; + } + + private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) { configuration.addPrefixShardConfiguration(config); final Builder builder = newShardDatastoreContextBuilder(shardName); @@ -603,12 +621,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { .storeRoot(config.getPrefix().getRootIdentifier()); DatastoreContext shardDatastoreContext = builder.build(); - final Map peerAddresses = Collections.emptyMap(); + final Map peerAddresses = getPeerAddresses(shardName); final boolean isActiveMember = true; - LOG.debug("{} doCreatePrefixedShard: persistenceId(): {}, memberNames: " - + "{}, peerAddresses: {}, isActiveMember: {}", - shardId, persistenceId(), config.getShardMemberNames(), - peerAddresses, isActiveMember); + + LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", + persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember); final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, shardDatastoreContext, Shard.builder(), peerAddressResolver); @@ -618,8 +635,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { if (schemaContext != null) { info.setActor(newShardActor(schemaContext, info)); } + } - persistShardList(); + private void onPrefixShardRemoved(final PrefixShardRemoved message) { + LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message); + + final DOMDataTreeIdentifier prefix = message.getPrefix(); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); + + configuration.removePrefixShardConfiguration(prefix); + removeShard(shardId); } private void doCreateShard(final CreateShard createShard) { @@ -826,10 +852,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void onRecoveryCompleted() { LOG.info("Recovery complete : {}", persistenceId()); - // We no longer persist SchemaContext modules so delete all the prior messages from the akka - // journal on upgrade from Helium. - deleteMessages(lastSequenceNr()); - if (currentSnapshot == null && restoreFromSnapshot != null && restoreFromSnapshot.getShardManagerSnapshot() != null) { ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot(); @@ -1086,9 +1108,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } @VisibleForTesting - protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) { - return getContext().actorOf(info.newProps(schemaContext) - .withDispatcher(shardDispatcherPath), info.getShardId().toString()); + protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) { + return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath), + info.getShardId().toString()); } private void findPrimary(FindPrimary message) { @@ -1259,36 +1281,38 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } + private void onAddPrefixShardReplica(final AddPrefixShardReplica message) { + LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message); - // With this message the shard does NOT have to be preconfigured - // do a dynamic lookup if the shard exists somewhere and replicate - private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) { - final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix()); - - LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg); + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(message.getShardPrefix())); + final String shardName = shardId.getShardName(); + // Create the localShard if (schemaContext == null) { - final String msg = String.format( + String msg = String.format( "No SchemaContext is available in order to create a local shard instance for %s", shardName); LOG.debug("{}: {}", persistenceId(), msg); getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf()); return; } - findPrimary(shardName, - new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) { - @Override - public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { - getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), - getTargetActor()); - } - - @Override - public void onLocalPrimaryFound(final LocalPrimaryShardFound response) { - sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); - } + findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), + getSelf()) { + @Override + public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(), + message.getShardPrefix(), response, getSender()); + if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) { + getSelf().tell(runnable, getTargetActor()); } - ); + } + + @Override + public void onLocalPrimaryFound(LocalPrimaryShardFound message) { + sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); + } + }); } private void onAddShardReplica(final AddShardReplica shardReplicaMsg) { @@ -1317,15 +1341,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSelf()) { @Override public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { - getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), - getTargetActor()); + final RunnableMessage runnable = (RunnableMessage) () -> + addShard(getShardName(), response, getSender()); + if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) { + getSelf().tell(runnable, getTargetActor()); + } } @Override public void onLocalPrimaryFound(LocalPrimaryShardFound message) { sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); } - }); } @@ -1335,6 +1361,39 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf()); } + private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix, + final RemotePrimaryShardFound response, final ActorRef sender) { + if (isShardReplicaOperationInProgress(shardName, sender)) { + return; + } + + shardReplicaOperationsInProgress.add(shardName); + + final ShardInformation shardInfo; + final boolean removeShardOnFailure; + ShardInformation existingShardInfo = localShards.get(shardName); + if (existingShardInfo == null) { + removeShardOnFailure = true; + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + + final Builder builder = newShardDatastoreContextBuilder(shardName); + builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); + + DatastoreContext datastoreContext = builder.build(); + + shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, + Shard.builder(), peerAddressResolver); + shardInfo.setActiveMember(false); + localShards.put(shardName, shardInfo); + shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + } else { + removeShardOnFailure = false; + shardInfo = existingShardInfo; + } + + execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender); + } + private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { return; @@ -1362,16 +1421,26 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInfo = existingShardInfo; } - String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); + execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender); + } + + private void execAddShard(final String shardName, + final ShardInformation shardInfo, + final RemotePrimaryShardFound response, + final boolean removeShardOnFailure, + final ActorRef sender) { + + final String localShardAddress = + peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName()); //inform ShardLeader to add this shard as a replica by sending an AddServer message LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(), response.getPrimaryPath(), shardInfo.getShardId()); - Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext() + final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext() .getShardLeaderElectionTimeout().duration()); - Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), - new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); + final Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), + new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); futureObj.onComplete(new OnComplete() { @Override @@ -1380,7 +1449,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(), response.getPrimaryPath(), shardName, failure); - String msg = String.format("AddServer request to leader %s for shard %s failed", + final String msg = String.format("AddServer request to leader %s for shard %s failed", response.getPrimaryPath(), shardName); self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender); } else { @@ -1482,6 +1551,32 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } + private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) { + LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message); + + final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + ClusterUtils.getCleanShardName(message.getShardPrefix())); + final String shardName = shardId.getShardName(); + + findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), + shardName, persistenceId(), getSelf()) { + @Override + public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) { + doRemoveShardReplicaAsync(response.getPrimaryPath()); + } + + @Override + public void onLocalPrimaryFound(LocalPrimaryShardFound response) { + doRemoveShardReplicaAsync(response.getPrimaryPath()); + } + + private void doRemoveShardReplicaAsync(final String primaryPath) { + getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(), + primaryPath, getSender()), getTargetActor()); + } + }); + } + private void persistShardList() { List shardList = new ArrayList<>(localShards.keySet()); for (ShardInformation shardInfo : localShards.values()) { @@ -1584,9 +1679,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void findLocalShard(FindLocalShard message) { + LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName()); + final ShardInformation shardInformation = localShards.get(message.getShardName()); if (shardInformation == null) { + LOG.debug("{}: Local shard {} not found - shards present: {}", + persistenceId(), message.getShardName(), localShards.keySet()); + getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf()); return; }