X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=93b57eda2864d416d0f4cd548264a141fc5640b6;hb=1d5ca4009be6c61d7b61989799037ad8f1ab7a75;hp=d1b9aba9d666268c6ab159f11e543ebc674fefe5;hpb=6ef0b898f2117a4bb3a510c0df7af340f4fc8eca;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index d1b9aba9d6..93b57eda28 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -5,7 +5,6 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore.shardmanager; import static akka.pattern.Patterns.ask; @@ -35,17 +34,16 @@ import akka.persistence.SnapshotOffer; import akka.persistence.SnapshotSelectionCriteria; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.SettableFuture; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; @@ -53,22 +51,18 @@ import java.util.function.Supplier; import org.opendaylight.controller.cluster.access.concepts.MemberName; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.common.actor.Dispatchers; -import org.opendaylight.controller.cluster.datastore.AbstractDataStore; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.DatastoreContext; -import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; -import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration; import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; -import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; @@ -82,13 +76,11 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary; import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; -import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica; import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot; -import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils; import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete; import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -109,13 +101,8 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply; import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; -import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler; -import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated; -import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved; -import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,7 +143,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreContextFactory datastoreContextFactory; - private final CountDownLatch waitTillReadyCountdownLatch; + private final SettableFuture readinessFuture; private final PrimaryShardInfoFutureCache primaryShardInfoCache; @@ -176,20 +163,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final Set> shardAvailabilityCallbacks = new HashSet<>(); private final String persistenceId; - private final AbstractDataStore dataStore; - - private PrefixedShardConfigUpdateHandler configUpdateHandler; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") ShardManager(final AbstractShardManagerCreator builder) { - this.cluster = builder.getCluster(); - this.configuration = builder.getConfiguration(); - this.datastoreContextFactory = builder.getDatastoreContextFactory(); - this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); - this.shardDispatcherPath = - new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch(); - this.primaryShardInfoCache = builder.getPrimaryShardInfoCache(); - this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); + cluster = builder.getCluster(); + configuration = builder.getConfiguration(); + datastoreContextFactory = builder.getDatastoreContextFactory(); + type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); + shardDispatcherPath = new Dispatchers(context().system().dispatchers()) + .getDispatcherPath(Dispatchers.DispatcherType.Shard); + readinessFuture = builder.getReadinessFuture(); + primaryShardInfoCache = builder.getPrimaryShardInfoCache(); + restoreFromSnapshot = builder.getRestoreFromSnapshot(); String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId(); persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type; @@ -200,11 +185,9 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { cluster.subscribeToMemberEvents(getSelf()); shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), - "shard-manager-" + this.type, + "shard-manager-" + type, datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); shardManagerMBean.registerMBean(); - - dataStore = builder.getDistributedDataStore(); } @Override @@ -257,14 +240,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onCreateShard((CreateShard)message); } else if (message instanceof AddShardReplica) { onAddShardReplica((AddShardReplica) message); - } else if (message instanceof AddPrefixShardReplica) { - onAddPrefixShardReplica((AddPrefixShardReplica) message); - } else if (message instanceof PrefixShardCreated) { - onPrefixShardCreated((PrefixShardCreated) message); - } else if (message instanceof PrefixShardRemoved) { - onPrefixShardRemoved((PrefixShardRemoved) message); - } else if (message instanceof InitConfigListener) { - onInitConfigListener(); } else if (message instanceof ForwardedAddServerReply) { ForwardedAddServerReply msg = (ForwardedAddServerReply)message; onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, @@ -274,8 +249,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); } else if (message instanceof RemoveShardReplica) { onRemoveShardReplica((RemoveShardReplica) message); - } else if (message instanceof RemovePrefixShardReplica) { - onRemovePrefixShardReplica((RemovePrefixShardReplica) message); } else if (message instanceof WrappedShardResponse) { onWrappedShardResponse((WrappedShardResponse) message); } else if (message instanceof GetSnapshot) { @@ -342,21 +315,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender()); } - private void onInitConfigListener() { - LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName()); - - final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType = - org.opendaylight.mdsal.common.api.LogicalDatastoreType - .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name()); - - if (configUpdateHandler != null) { - configUpdateHandler.close(); - } - - configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName()); - configUpdateHandler.initListener(dataStore, datastoreType); - } - void onShutDown() { List> stopFutures = new ArrayList<>(localShards.size()); for (ShardInformation info : localShards.values()) { @@ -426,51 +384,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") - private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName, - final String primaryPath, final ActorRef sender) { - if (isShardReplicaOperationInProgress(shardName, sender)) { - return; - } - - shardReplicaOperationsInProgress.add(shardName); - - final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName); - - final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build(); - - //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message - LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(), - primaryPath, shardId); - - Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration()); - Future futureObj = ask(getContext().actorSelection(primaryPath), - new RemoveServer(shardId.toString()), removeServerTimeout); - - futureObj.onComplete(new OnComplete<>() { - @Override - public void onComplete(final Throwable failure, final Object response) { - if (failure != null) { - shardReplicaOperationsInProgress.remove(shardName); - - LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath, - shardName, failure); - - // FAILURE - sender.tell(new Status.Failure(new RuntimeException( - String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName), - failure)), self()); - } else { - // SUCCESS - self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender); - } - } - }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); - } - - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { @@ -616,32 +529,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - private void onPrefixShardCreated(final PrefixShardCreated message) { - LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message); - - final PrefixShardConfiguration config = message.getConfiguration(); - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier())); - final String shardName = shardId.getShardName(); - - if (isPreviousShardActorStopInProgress(shardName, message)) { - return; - } - - if (localShards.containsKey(shardName)) { - LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName); - final PrefixShardConfiguration existing = - configuration.getAllPrefixShardConfigurations().get(config.getPrefix()); - - if (existing != null && existing.equals(config)) { - // we don't have to do nothing here - return; - } - } - - doCreatePrefixShard(config, shardId, shardName); - } - private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) { final CompositeOnComplete stopOnComplete = shardActorsStopping.get(shardName); if (stopOnComplete == null) { @@ -662,43 +549,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return true; } - private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId, - final String shardName) { - configuration.addPrefixShardConfiguration(config); - - final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.logicalStoreType(config.getPrefix().getDatastoreType()) - .storeRoot(config.getPrefix().getRootIdentifier()); - DatastoreContext shardDatastoreContext = builder.build(); - - final Map peerAddresses = getPeerAddresses(shardName); - final boolean isActiveMember = true; - - LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", - persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember); - - final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, - shardDatastoreContext, Shard.builder(), peerAddressResolver); - info.setActiveMember(isActiveMember); - localShards.put(info.getShardName(), info); - - if (schemaContext != null) { - info.setSchemaContext(schemaContext); - info.setActor(newShardActor(info)); - } - } - - private void onPrefixShardRemoved(final PrefixShardRemoved message) { - LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message); - - final DOMDataTreeIdentifier prefix = message.getPrefix(); - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(prefix.getRootIdentifier())); - - configuration.removePrefixShardConfiguration(prefix); - removeShard(shardId); - } - private void doCreateShard(final CreateShard createShard) { final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig(); final String shardName = moduleShardConfig.getShardName(); @@ -730,7 +580,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // the shard with no peers and with elections disabled so it stays as follower. A // subsequent AddServer request will be needed to make it an active member. isActiveMember = false; - peerAddresses = Collections.emptyMap(); + peerAddresses = Map.of(); shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext) .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build(); } @@ -761,10 +611,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void checkReady() { if (isReadyWithLeaderId()) { - LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}", - persistenceId(), type, waitTillReadyCountdownLatch.getCount()); - - waitTillReadyCountdownLatch.countDown(); + LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type); + readinessFuture.set(Empty.value()); } } @@ -989,10 +837,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { message.member().address()); peerAddressResolver.removePeerAddress(memberName); - - for (ShardInformation info : localShards.values()) { - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); - } } private void memberExited(final ClusterEvent.MemberExited message) { @@ -1002,10 +846,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { message.member().address()); peerAddressResolver.removePeerAddress(memberName); - - for (ShardInformation info : localShards.values()) { - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); - } } private void memberUp(final ClusterEvent.MemberUp message) { @@ -1038,8 +878,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String shardName = info.getShardName(); String peerId = getShardIdentifier(memberName, shardName).toString(); info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf()); - - info.peerUp(memberName, peerId, getSelf()); } } @@ -1070,8 +908,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { notifyShardAvailabilityCallbacks(info); } - - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } @@ -1082,8 +918,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } - - info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } @@ -1158,7 +992,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String peerId = getShardIdentifier(memberName, shardName).toString() ; String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName); info.updatePeerAddress(peerId, peerAddress, getSelf()); - info.peerUp(memberName, peerId, getSelf()); LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}", persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor()); } @@ -1268,8 +1101,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * Create shards that are local to the member on which the ShardManager runs. */ private void createLocalShards() { - MemberName memberName = this.cluster.getCurrentMemberName(); - Collection memberShardNames = this.configuration.getMemberShardNames(memberName); + MemberName memberName = cluster.getCurrentMemberName(); + Collection memberShardNames = configuration.getMemberShardNames(memberName); Map shardSnapshots = new HashMap<>(); if (restoreFromSnapshot != null) { @@ -1314,7 +1147,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private Map getPeerAddresses(final String shardName, final Collection members) { Map peerAddresses = new HashMap<>(); - MemberName currentMemberName = this.cluster.getCurrentMemberName(); + MemberName currentMemberName = cluster.getCurrentMemberName(); for (MemberName memberName : members) { if (!currentMemberName.equals(memberName)) { @@ -1357,48 +1190,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return false; } - private void onAddPrefixShardReplica(final AddPrefixShardReplica message) { - LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message); - - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(message.getShardPrefix())); - final String shardName = shardId.getShardName(); - - // Create the localShard - if (schemaContext == null) { - LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}", - persistenceId(), shardName); - getSender().tell(new Status.Failure(new IllegalStateException( - "No SchemaContext is available in order to create a local shard instance for " + shardName)), - getSelf()); - return; - } - - findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), - getSelf()) { - @Override - public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { - final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(), - message.getShardPrefix(), response, getSender()); - if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) { - getSelf().tell(runnable, getTargetActor()); - } - } - - @Override - public void onLocalPrimaryFound(final LocalPrimaryShardFound message) { - sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor()); - } - }); - } - private void onAddShardReplica(final AddShardReplica shardReplicaMsg) { final String shardName = shardReplicaMsg.getShardName(); LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg); // verify the shard with the specified name is present in the cluster configuration - if (!this.configuration.isShardConfigured(shardName)) { + if (!configuration.isShardConfigured(shardName)) { LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName); getSender().tell(new Status.Failure(new IllegalArgumentException( "No module configuration exists for shard " + shardName)), getSelf()); @@ -1433,52 +1231,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) { LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName); sender.tell(new Status.Failure(new AlreadyExistsException( String.format("Local shard %s already exists", shardName))), getSelf()); } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") - private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix, - final RemotePrimaryShardFound response, final ActorRef sender) { - if (isShardReplicaOperationInProgress(shardName, sender)) { - return; - } - - shardReplicaOperationsInProgress.add(shardName); - - final ShardInformation shardInfo; - final boolean removeShardOnFailure; - ShardInformation existingShardInfo = localShards.get(shardName); - if (existingShardInfo == null) { - removeShardOnFailure = true; - ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); - - final Builder builder = newShardDatastoreContextBuilder(shardName); - builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()); - - DatastoreContext datastoreContext = builder.build(); - - shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, - Shard.builder(), peerAddressResolver); - shardInfo.setActiveMember(false); - shardInfo.setSchemaContext(schemaContext); - localShards.put(shardName, shardInfo); - shardInfo.setActor(newShardActor(shardInfo)); - } else { - removeShardOnFailure = false; - shardInfo = existingShardInfo; - } - - execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender); - } - - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { return; @@ -1631,32 +1389,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) { - LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message); - - final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), - ClusterUtils.getCleanShardName(message.getShardPrefix())); - final String shardName = shardId.getShardName(); - - findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), - shardName, persistenceId(), getSelf()) { - @Override - public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) { - doRemoveShardReplicaAsync(response.getPrimaryPath()); - } - - @Override - public void onLocalPrimaryFound(final LocalPrimaryShardFound response) { - doRemoveShardReplicaAsync(response.getPrimaryPath()); - } - - private void doRemoveShardReplicaAsync(final String primaryPath) { - getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(), - primaryPath, getSender()), getTargetActor()); - } - }); - } - private void persistShardList() { List shardList = new ArrayList<>(localShards.keySet()); for (ShardInformation shardInfo : localShards.values()) { @@ -1665,13 +1397,11 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList); - saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations())); + saveSnapshot(updateShardManagerSnapshot(shardList)); } - private ShardManagerSnapshot updateShardManagerSnapshot( - final List shardList, - final Map allPrefixShardConfigurations) { - currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations); + private ShardManagerSnapshot updateShardManagerSnapshot(final List shardList) { + currentSnapshot = new ShardManagerSnapshot(shardList); return currentSnapshot; }