X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fshardmanager%2FShardManager.java;h=adc686723bd67fc602af2c19005fb5a44358284f;hp=e86640060806326887a5e7f6f8198f2b8fefb907;hb=HEAD;hpb=dcc776a5e749d495a66e8753e123a1ddbd15d9c6 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java index e866400608..adc686723b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/shardmanager/ShardManager.java @@ -5,10 +5,8 @@ * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.controller.cluster.datastore.shardmanager; -import static akka.pattern.Patterns.ask; import static java.util.Objects.requireNonNull; import akka.actor.ActorRef; @@ -39,7 +37,6 @@ import com.google.common.util.concurrent.SettableFuture; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -104,6 +101,7 @@ import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,14 +142,14 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreContextFactory datastoreContextFactory; - private final SettableFuture readinessFuture; + private final SettableFuture readinessFuture; private final PrimaryShardInfoFutureCache primaryShardInfoCache; @VisibleForTesting final ShardPeerAddressResolver peerAddressResolver; - private EffectiveModelContext schemaContext; + private EffectiveModelContext modelContext; private DatastoreSnapshot restoreFromSnapshot; @@ -165,16 +163,17 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final String persistenceId; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design") ShardManager(final AbstractShardManagerCreator builder) { - this.cluster = builder.getCluster(); - this.configuration = builder.getConfiguration(); - this.datastoreContextFactory = builder.getDatastoreContextFactory(); - this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); - this.shardDispatcherPath = - new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); - this.readinessFuture = builder.getReadinessFuture(); - this.primaryShardInfoCache = builder.getPrimaryShardInfoCache(); - this.restoreFromSnapshot = builder.getRestoreFromSnapshot(); + cluster = builder.getCluster(); + configuration = builder.getConfiguration(); + datastoreContextFactory = builder.getDatastoreContextFactory(); + type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(); + shardDispatcherPath = new Dispatchers(context().system().dispatchers()) + .getDispatcherPath(Dispatchers.DispatcherType.Shard); + readinessFuture = builder.getReadinessFuture(); + primaryShardInfoCache = builder.getPrimaryShardInfoCache(); + restoreFromSnapshot = builder.getRestoreFromSnapshot(); String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId(); persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type; @@ -185,7 +184,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { cluster.subscribeToMemberEvents(getSelf()); shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), - "shard-manager-" + this.type, + "shard-manager-" + type, datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType()); shardManagerMBean.registerMBean(); } @@ -204,85 +203,80 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { @Override public void handleCommand(final Object message) throws Exception { - if (message instanceof FindPrimary) { - findPrimary((FindPrimary)message); - } else if (message instanceof FindLocalShard) { - findLocalShard((FindLocalShard) message); - } else if (message instanceof UpdateSchemaContext) { - updateSchemaContext(message); - } else if (message instanceof ActorInitialized) { - onActorInitialized(message); - } else if (message instanceof ClusterEvent.MemberUp) { - memberUp((ClusterEvent.MemberUp) message); - } else if (message instanceof ClusterEvent.MemberWeaklyUp) { - memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message); - } else if (message instanceof ClusterEvent.MemberExited) { - memberExited((ClusterEvent.MemberExited) message); - } else if (message instanceof ClusterEvent.MemberRemoved) { - memberRemoved((ClusterEvent.MemberRemoved) message); - } else if (message instanceof ClusterEvent.UnreachableMember) { - memberUnreachable((ClusterEvent.UnreachableMember) message); - } else if (message instanceof ClusterEvent.ReachableMember) { - memberReachable((ClusterEvent.ReachableMember) message); - } else if (message instanceof DatastoreContextFactory) { - onDatastoreContextFactory((DatastoreContextFactory) message); - } else if (message instanceof RoleChangeNotification) { - onRoleChangeNotification((RoleChangeNotification) message); - } else if (message instanceof FollowerInitialSyncUpStatus) { - onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message); - } else if (message instanceof ShardNotInitializedTimeout) { - onShardNotInitializedTimeout((ShardNotInitializedTimeout) message); - } else if (message instanceof ShardLeaderStateChanged) { - onLeaderStateChanged((ShardLeaderStateChanged) message); - } else if (message instanceof SwitchShardBehavior) { - onSwitchShardBehavior((SwitchShardBehavior) message); - } else if (message instanceof CreateShard) { - onCreateShard((CreateShard)message); - } else if (message instanceof AddShardReplica) { - onAddShardReplica((AddShardReplica) message); - } else if (message instanceof ForwardedAddServerReply) { - ForwardedAddServerReply msg = (ForwardedAddServerReply)message; - onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, - msg.removeShardOnFailure); - } else if (message instanceof ForwardedAddServerFailure) { - ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message; + if (message instanceof FindPrimary msg) { + findPrimary(msg); + } else if (message instanceof FindLocalShard msg) { + findLocalShard(msg); + } else if (message instanceof UpdateSchemaContext msg) { + updateSchemaContext(msg); + } else if (message instanceof ActorInitialized msg) { + onActorInitialized(msg); + } else if (message instanceof ClusterEvent.MemberUp msg) { + memberUp(msg); + } else if (message instanceof ClusterEvent.MemberWeaklyUp msg) { + memberWeaklyUp(msg); + } else if (message instanceof ClusterEvent.MemberExited msg) { + memberExited(msg); + } else if (message instanceof ClusterEvent.MemberRemoved msg) { + memberRemoved(msg); + } else if (message instanceof ClusterEvent.UnreachableMember msg) { + memberUnreachable(msg); + } else if (message instanceof ClusterEvent.ReachableMember msg) { + memberReachable(msg); + } else if (message instanceof DatastoreContextFactory msg) { + onDatastoreContextFactory(msg); + } else if (message instanceof RoleChangeNotification msg) { + onRoleChangeNotification(msg); + } else if (message instanceof FollowerInitialSyncUpStatus msg) { + onFollowerInitialSyncStatus(msg); + } else if (message instanceof ShardNotInitializedTimeout msg) { + onShardNotInitializedTimeout(msg); + } else if (message instanceof ShardLeaderStateChanged msg) { + onLeaderStateChanged(msg); + } else if (message instanceof SwitchShardBehavior msg) { + onSwitchShardBehavior(msg); + } else if (message instanceof CreateShard msg) { + onCreateShard(msg); + } else if (message instanceof AddShardReplica msg) { + onAddShardReplica(msg); + } else if (message instanceof ForwardedAddServerReply msg) { + onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure); + } else if (message instanceof ForwardedAddServerFailure msg) { onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure); - } else if (message instanceof RemoveShardReplica) { - onRemoveShardReplica((RemoveShardReplica) message); - } else if (message instanceof WrappedShardResponse) { - onWrappedShardResponse((WrappedShardResponse) message); - } else if (message instanceof GetSnapshot) { - onGetSnapshot((GetSnapshot) message); - } else if (message instanceof ServerRemoved) { - onShardReplicaRemoved((ServerRemoved) message); - } else if (message instanceof ChangeShardMembersVotingStatus) { - onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message); - } else if (message instanceof FlipShardMembersVotingStatus) { - onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message); - } else if (message instanceof SaveSnapshotSuccess) { - onSaveSnapshotSuccess((SaveSnapshotSuccess) message); - } else if (message instanceof SaveSnapshotFailure) { - LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), - ((SaveSnapshotFailure) message).cause()); + } else if (message instanceof RemoveShardReplica msg) { + onRemoveShardReplica(msg); + } else if (message instanceof WrappedShardResponse msg) { + onWrappedShardResponse(msg); + } else if (message instanceof GetSnapshot msg) { + onGetSnapshot(msg); + } else if (message instanceof ServerRemoved msg) { + onShardReplicaRemoved(msg); + } else if (message instanceof ChangeShardMembersVotingStatus msg) { + onChangeShardServersVotingStatus(msg); + } else if (message instanceof FlipShardMembersVotingStatus msg) { + onFlipShardMembersVotingStatus(msg); + } else if (message instanceof SaveSnapshotSuccess msg) { + onSaveSnapshotSuccess(msg); + } else if (message instanceof SaveSnapshotFailure msg) { + LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), msg.cause()); } else if (message instanceof Shutdown) { onShutDown(); } else if (message instanceof GetLocalShardIds) { onGetLocalShardIds(); - } else if (message instanceof GetShardRole) { - onGetShardRole((GetShardRole) message); - } else if (message instanceof RunnableMessage) { - ((RunnableMessage)message).run(); - } else if (message instanceof RegisterForShardAvailabilityChanges) { - onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message); - } else if (message instanceof DeleteSnapshotsFailure) { - LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), - ((DeleteSnapshotsFailure) message).cause()); + } else if (message instanceof GetShardRole msg) { + onGetShardRole(msg); + } else if (message instanceof RunnableMessage msg) { + msg.run(); + } else if (message instanceof RegisterForShardAvailabilityChanges msg) { + onRegisterForShardAvailabilityChanges(msg); + } else if (message instanceof DeleteSnapshotsFailure msg) { + LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), msg.cause()); } else if (message instanceof DeleteSnapshotsSuccess) { LOG.debug("{}: Successfully deleted prior snapshots", persistenceId()); } else if (message instanceof RegisterRoleChangeListenerReply) { LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId()); - } else if (message instanceof ClusterEvent.MemberEvent) { - LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message); + } else if (message instanceof ClusterEvent.MemberEvent msg) { + LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), msg); } else { unknownMessage(message); } @@ -384,8 +378,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName, final String primaryPath, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { @@ -403,7 +395,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { primaryPath, shardId); Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration()); - Future futureObj = ask(getContext().actorSelection(primaryPath), + Future futureObj = Patterns.ask(getContext().actorSelection(primaryPath), new RemoveServer(shardId.toString()), removeServerTimeout); futureObj.onComplete(new OnComplete<>() { @@ -531,8 +523,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) { final CompositeOnComplete stopOnComplete = shardActorsStopping.get(shardName); if (stopOnComplete == null) { @@ -584,7 +574,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { // the shard with no peers and with elections disabled so it stays as follower. A // subsequent AddServer request will be needed to make it an active member. isActiveMember = false; - peerAddresses = Collections.emptyMap(); + peerAddresses = Map.of(); shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext) .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build(); } @@ -598,8 +588,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { info.setActiveMember(isActiveMember); localShards.put(info.getShardName(), info); - if (schemaContext != null) { - info.setSchemaContext(schemaContext); + if (modelContext != null) { + info.setSchemaContext(modelContext); info.setActor(newShardActor(info)); } } @@ -616,7 +606,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private void checkReady() { if (isReadyWithLeaderId()) { LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type); - readinessFuture.set(null); + readinessFuture.set(Empty.value()); } } @@ -625,7 +615,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId()); if (shardInformation != null) { - shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree()); + shardInformation.setLocalDataTree(leaderStateChanged.localShardDataTree()); shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion()); if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) { primaryShardInfoCache.remove(shardInformation.getShardName()); @@ -717,13 +707,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { return true; } - private void onActorInitialized(final Object message) { - final ActorRef sender = getSender(); - - if (sender == null) { - // why is a non-actor sending this message? Just ignore. - return; - } + private void onActorInitialized(final ActorInitialized message) { + final var sender = message.actorRef(); String actorName = sender.path().name(); //find shard name from actor name; actor name is stringified shardId @@ -754,8 +739,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { protected void handleRecover(final Object message) throws Exception { if (message instanceof RecoveryCompleted) { onRecoveryCompleted(); - } else if (message instanceof SnapshotOffer) { - applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot()); + } else if (message instanceof SnapshotOffer msg) { + applyShardManagerSnapshot((ShardManagerSnapshot) msg.snapshot()); } } @@ -841,10 +826,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { message.member().address()); peerAddressResolver.removePeerAddress(memberName); - - for (ShardInformation info : localShards.values()) { - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); - } } private void memberExited(final ClusterEvent.MemberExited message) { @@ -854,10 +835,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { message.member().address()); peerAddressResolver.removePeerAddress(memberName); - - for (ShardInformation info : localShards.values()) { - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); - } } private void memberUp(final ClusterEvent.MemberUp message) { @@ -890,8 +867,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String shardName = info.getShardName(); String peerId = getShardIdentifier(memberName, shardName).toString(); info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf()); - - info.peerUp(memberName, peerId, getSelf()); } } @@ -922,8 +897,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { notifyShardAvailabilityCallbacks(info); } - - info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } @@ -934,8 +907,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("Marking Leader {} as available.", leaderId); info.setLeaderAvailable(true); } - - info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf()); } } @@ -992,13 +963,13 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * * @param message the message to send */ - private void updateSchemaContext(final Object message) { - schemaContext = ((UpdateSchemaContext) message).getEffectiveModelContext(); + private void updateSchemaContext(final UpdateSchemaContext message) { + modelContext = message.modelContext(); - LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size()); + LOG.debug("Got updated SchemaContext: # of modules {}", modelContext.getModules().size()); for (ShardInformation info : localShards.values()) { - info.setSchemaContext(schemaContext); + info.setSchemaContext(modelContext); if (info.getActor() == null) { LOG.debug("Creating Shard {}", info.getShardId()); @@ -1010,7 +981,6 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { String peerId = getShardIdentifier(memberName, shardName).toString() ; String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName); info.updatePeerAddress(peerId, peerAddress, getSelf()); - info.peerUp(memberName, peerId, getSelf()); LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}", persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor()); } @@ -1043,7 +1013,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { sendResponse(info, message.isWaitUntilReady(), true, () -> { String primaryPath = info.getSerializedLeaderActor(); Object found = canReturnLocalShardState && info.isLeader() - ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) : + ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().orElseThrow()) : new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion()); LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found); @@ -1085,20 +1055,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext() .getShardInitializationTimeout().duration().$times(2)); - Future futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout); + Future futureObj = Patterns.ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout); futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { handler.onFailure(failure); + } else if (response instanceof RemotePrimaryShardFound msg) { + handler.onRemotePrimaryShardFound(msg); + } else if (response instanceof LocalPrimaryShardFound msg) { + handler.onLocalPrimaryFound(msg); } else { - if (response instanceof RemotePrimaryShardFound) { - handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response); - } else if (response instanceof LocalPrimaryShardFound) { - handler.onLocalPrimaryFound((LocalPrimaryShardFound) response); - } else { - handler.onUnknownResponse(response); - } + handler.onUnknownResponse(response); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); @@ -1120,8 +1088,8 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { * Create shards that are local to the member on which the ShardManager runs. */ private void createLocalShards() { - MemberName memberName = this.cluster.getCurrentMemberName(); - Collection memberShardNames = this.configuration.getMemberShardNames(memberName); + MemberName memberName = cluster.getCurrentMemberName(); + Collection memberShardNames = configuration.getMemberShardNames(memberName); Map shardSnapshots = new HashMap<>(); if (restoreFromSnapshot != null) { @@ -1166,7 +1134,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private Map getPeerAddresses(final String shardName, final Collection members) { Map peerAddresses = new HashMap<>(); - MemberName currentMemberName = this.cluster.getCurrentMemberName(); + MemberName currentMemberName = cluster.getCurrentMemberName(); for (MemberName memberName : members) { if (!currentMemberName.equals(memberName)) { @@ -1215,7 +1183,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg); // verify the shard with the specified name is present in the cluster configuration - if (!this.configuration.isShardConfigured(shardName)) { + if (!configuration.isShardConfigured(shardName)) { LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName); getSender().tell(new Status.Failure(new IllegalArgumentException( "No module configuration exists for shard " + shardName)), getSelf()); @@ -1223,7 +1191,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { } // Create the localShard - if (schemaContext == null) { + if (modelContext == null) { LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}", persistenceId(), shardName); getSender().tell(new Status.Failure(new IllegalStateException( @@ -1250,16 +1218,12 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { }); } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) { LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName); sender.tell(new Status.Failure(new AlreadyExistsException( String.format("Local shard %s already exists", shardName))), getSelf()); } - @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", - justification = "https://github.com/spotbugs/spotbugs/issues/811") private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) { if (isShardReplicaOperationInProgress(shardName, sender)) { return; @@ -1280,7 +1244,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext, Shard.builder(), peerAddressResolver); shardInfo.setActiveMember(false); - shardInfo.setSchemaContext(schemaContext); + shardInfo.setSchemaContext(modelContext); localShards.put(shardName, shardInfo); shardInfo.setActor(newShardActor(shardInfo)); } else { @@ -1306,7 +1270,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext() .getShardLeaderElectionTimeout().duration()); - final Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), + final Future futureObj = Patterns.ask(getContext().actorSelection(response.getPrimaryPath()), new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout); futureObj.onComplete(new OnComplete<>() { @@ -1373,21 +1337,18 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { private static Exception getServerChangeException(final Class serverChange, final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) { - switch (serverChangeStatus) { - case TIMEOUT: - return new TimeoutException(String.format( - "The shard leader %s timed out trying to replicate the initial data to the new shard %s." - + "Possible causes - there was a problem replicating the data or shard leadership changed " - + "while replicating the shard data", leaderPath, shardId.getShardName())); - case NO_LEADER: - return new NoShardLeaderException(shardId); - case NOT_SUPPORTED: - return new UnsupportedOperationException(String.format("%s request is not supported for shard %s", - serverChange.getSimpleName(), shardId.getShardName())); - default : - return new RuntimeException(String.format("%s request to leader %s for shard %s failed with status %s", - serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus)); - } + return switch (serverChangeStatus) { + case TIMEOUT -> new TimeoutException(""" + The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible \ + causes - there was a problem replicating the data or shard leadership changed while replicating the \ + shard data""".formatted(leaderPath, shardId.getShardName())); + case NO_LEADER -> new NoShardLeaderException(shardId); + case NOT_SUPPORTED -> new UnsupportedOperationException( + "%s request is not supported for shard %s".formatted( + serverChange.getSimpleName(), shardId.getShardName())); + default -> new RuntimeException("%s request to leader %s for shard %s failed with status %s".formatted( + serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus)); + }; } private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) { @@ -1482,7 +1443,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { ActorRef sender = getSender(); final String shardName = flipMembersVotingStatus.getShardName(); findLocalShard(shardName, sender, localShardFound -> { - Future future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, + Future future = Patterns.ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE, Timeout.apply(30, TimeUnit.SECONDS)); future.onComplete(new OnComplete<>() { @@ -1533,31 +1494,27 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext() .getShardInitializationTimeout().duration().$times(2)); - Future futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); + Future futureObj = Patterns.ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout); futureObj.onComplete(new OnComplete<>() { @Override public void onComplete(final Throwable failure, final Object response) { if (failure != null) { LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, - failure); + failure); sender.tell(new Status.Failure(new RuntimeException( - String.format("Failed to find local shard %s", shardName), failure)), self()); + String.format("Failed to find local shard %s", shardName), failure)), self()); + } if (response instanceof LocalShardFound msg) { + getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept(msg), sender); + } else if (response instanceof LocalShardNotFound) { + LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName); + sender.tell(new Status.Failure(new IllegalArgumentException( + String.format("Local shard %s does not exist", shardName))), self()); } else { - if (response instanceof LocalShardFound) { - getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), - sender); - } else if (response instanceof LocalShardNotFound) { - LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName); - sender.tell(new Status.Failure(new IllegalArgumentException( - String.format("Local shard %s does not exist", shardName))), self()); - } else { - LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName, - response); - sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response - : new RuntimeException( - String.format("Failed to find local shard %s: received response: %s", shardName, - response))), self()); - } + LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName, + response); + sender.tell(new Status.Failure(response instanceof Throwable throwable ? throwable + : new RuntimeException(String.format("Failed to find local shard %s: received response: %s", + shardName, response))), self()); } } }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); @@ -1578,7 +1535,7 @@ class ShardManager extends AbstractUntypedPersistentActorWithMetering { changeServersVotingStatus, shardActorRef.path()); Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2)); - Future futureObj = ask(shardActorRef, changeServersVotingStatus, timeout); + Future futureObj = Patterns.ask(shardActorRef, changeServersVotingStatus, timeout); futureObj.onComplete(new OnComplete<>() { @Override