Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Status;
18 import akka.actor.SupervisorStrategy;
19 import akka.actor.SupervisorStrategy.Directive;
20 import akka.cluster.ClusterEvent;
21 import akka.cluster.ClusterEvent.MemberWeaklyUp;
22 import akka.cluster.Member;
23 import akka.dispatch.Futures;
24 import akka.dispatch.OnComplete;
25 import akka.japi.Function;
26 import akka.pattern.Patterns;
27 import akka.persistence.DeleteSnapshotsFailure;
28 import akka.persistence.DeleteSnapshotsSuccess;
29 import akka.persistence.RecoveryCompleted;
30 import akka.persistence.SaveSnapshotFailure;
31 import akka.persistence.SaveSnapshotSuccess;
32 import akka.persistence.SnapshotOffer;
33 import akka.persistence.SnapshotSelectionCriteria;
34 import akka.util.Timeout;
35 import com.google.common.annotations.VisibleForTesting;
36 import com.google.common.util.concurrent.SettableFuture;
37 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.HashMap;
41 import java.util.HashSet;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Map.Entry;
45 import java.util.Set;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import java.util.function.Consumer;
49 import java.util.function.Supplier;
50 import org.opendaylight.controller.cluster.access.concepts.MemberName;
51 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
52 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
53 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
54 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
55 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
56 import org.opendaylight.controller.cluster.datastore.Shard;
57 import org.opendaylight.controller.cluster.datastore.config.Configuration;
58 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
59 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
60 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
61 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
63 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
64 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
65 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
66 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
67 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
68 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
69 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
70 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
71 import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
72 import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
73 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
76 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
77 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
78 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
79 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
80 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
81 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
82 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
83 import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
84 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
87 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
88 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
89 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
90 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
91 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
92 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
93 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
94 import org.opendaylight.controller.cluster.raft.messages.AddServer;
95 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
96 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
97 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
98 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
99 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
100 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
101 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
102 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
103 import org.opendaylight.yangtools.concepts.Registration;
104 import org.opendaylight.yangtools.yang.common.Empty;
105 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
108 import scala.concurrent.ExecutionContext;
109 import scala.concurrent.Future;
110 import scala.concurrent.duration.FiniteDuration;
111
112 /**
113  * Manages the shards for a data store. The ShardManager has the following jobs:
114  * <ul>
115  * <li> Create all the local shard replicas that belong on this cluster member
116  * <li> Find the address of the local shard
117  * <li> Find the primary replica for any given shard
118  * <li> Monitor the cluster members and store their addresses
119  * </ul>
120  */
121 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
122     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
123
124     // Stores a mapping between a shard name and it's corresponding information
125     // Shard names look like inventory, topology etc and are as specified in
126     // configuration
127     @VisibleForTesting
128     final Map<String, ShardInformation> localShards = new HashMap<>();
129
130     // The type of a ShardManager reflects the type of the datastore itself
131     // A data store could be of type config/operational
132     private final String type;
133
134     private final ClusterWrapper cluster;
135
136     private final Configuration configuration;
137
138     @VisibleForTesting
139     final String shardDispatcherPath;
140
141     private final ShardManagerInfo shardManagerMBean;
142
143     private DatastoreContextFactory datastoreContextFactory;
144
145     private final SettableFuture<Empty> readinessFuture;
146
147     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
148
149     @VisibleForTesting
150     final ShardPeerAddressResolver peerAddressResolver;
151
152     private EffectiveModelContext modelContext;
153
154     private DatastoreSnapshot restoreFromSnapshot;
155
156     private ShardManagerSnapshot currentSnapshot;
157
158     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
159
160     private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
161
162     private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
163
164     private final String persistenceId;
165
166     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design")
167     ShardManager(final AbstractShardManagerCreator<?> builder) {
168         cluster = builder.getCluster();
169         configuration = builder.getConfiguration();
170         datastoreContextFactory = builder.getDatastoreContextFactory();
171         type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
172         shardDispatcherPath = new Dispatchers(context().system().dispatchers())
173             .getDispatcherPath(Dispatchers.DispatcherType.Shard);
174         readinessFuture = builder.getReadinessFuture();
175         primaryShardInfoCache = builder.getPrimaryShardInfoCache();
176         restoreFromSnapshot = builder.getRestoreFromSnapshot();
177
178         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
179         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
180
181         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
182
183         // Subscribe this actor to cluster member events
184         cluster.subscribeToMemberEvents(getSelf());
185
186         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
187                 "shard-manager-" + type,
188                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
189         shardManagerMBean.registerMBean();
190     }
191
192     @Override
193     public void preStart() {
194         LOG.info("Starting ShardManager {}", persistenceId);
195     }
196
197     @Override
198     public void postStop() {
199         LOG.info("Stopping ShardManager {}", persistenceId());
200
201         shardManagerMBean.unregisterMBean();
202     }
203
204     @Override
205     public void handleCommand(final Object message) throws Exception {
206         if (message instanceof FindPrimary msg) {
207             findPrimary(msg);
208         } else if (message instanceof FindLocalShard msg) {
209             findLocalShard(msg);
210         } else if (message instanceof UpdateSchemaContext msg) {
211             updateSchemaContext(msg);
212         } else if (message instanceof ActorInitialized msg) {
213             onActorInitialized(msg);
214         } else if (message instanceof ClusterEvent.MemberUp msg) {
215             memberUp(msg);
216         } else if (message instanceof ClusterEvent.MemberWeaklyUp msg) {
217             memberWeaklyUp(msg);
218         } else if (message instanceof ClusterEvent.MemberExited msg) {
219             memberExited(msg);
220         } else if (message instanceof ClusterEvent.MemberRemoved msg) {
221             memberRemoved(msg);
222         } else if (message instanceof ClusterEvent.UnreachableMember msg) {
223             memberUnreachable(msg);
224         } else if (message instanceof ClusterEvent.ReachableMember msg) {
225             memberReachable(msg);
226         } else if (message instanceof DatastoreContextFactory msg) {
227             onDatastoreContextFactory(msg);
228         } else if (message instanceof RoleChangeNotification msg) {
229             onRoleChangeNotification(msg);
230         } else if (message instanceof FollowerInitialSyncUpStatus msg) {
231             onFollowerInitialSyncStatus(msg);
232         } else if (message instanceof ShardNotInitializedTimeout msg) {
233             onShardNotInitializedTimeout(msg);
234         } else if (message instanceof ShardLeaderStateChanged msg) {
235             onLeaderStateChanged(msg);
236         } else if (message instanceof SwitchShardBehavior msg) {
237             onSwitchShardBehavior(msg);
238         } else if (message instanceof CreateShard msg) {
239             onCreateShard(msg);
240         } else if (message instanceof AddShardReplica msg) {
241             onAddShardReplica(msg);
242         } else if (message instanceof ForwardedAddServerReply msg) {
243             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure);
244         } else if (message instanceof ForwardedAddServerFailure msg) {
245             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
246         } else if (message instanceof RemoveShardReplica msg) {
247             onRemoveShardReplica(msg);
248         } else if (message instanceof WrappedShardResponse msg) {
249             onWrappedShardResponse(msg);
250         } else if (message instanceof GetSnapshot msg) {
251             onGetSnapshot(msg);
252         } else if (message instanceof ServerRemoved msg) {
253             onShardReplicaRemoved(msg);
254         } else if (message instanceof ChangeShardMembersVotingStatus msg) {
255             onChangeShardServersVotingStatus(msg);
256         } else if (message instanceof FlipShardMembersVotingStatus msg) {
257             onFlipShardMembersVotingStatus(msg);
258         } else if (message instanceof SaveSnapshotSuccess msg) {
259             onSaveSnapshotSuccess(msg);
260         } else if (message instanceof SaveSnapshotFailure msg) {
261             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), msg.cause());
262         } else if (message instanceof Shutdown) {
263             onShutDown();
264         } else if (message instanceof GetLocalShardIds) {
265             onGetLocalShardIds();
266         } else if (message instanceof GetShardRole msg) {
267             onGetShardRole(msg);
268         } else if (message instanceof RunnableMessage msg) {
269             msg.run();
270         } else if (message instanceof RegisterForShardAvailabilityChanges msg) {
271             onRegisterForShardAvailabilityChanges(msg);
272         } else if (message instanceof DeleteSnapshotsFailure msg) {
273             LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), msg.cause());
274         } else if (message instanceof DeleteSnapshotsSuccess) {
275             LOG.debug("{}: Successfully deleted prior snapshots", persistenceId());
276         } else if (message instanceof RegisterRoleChangeListenerReply) {
277             LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
278         } else if (message instanceof ClusterEvent.MemberEvent msg) {
279             LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), msg);
280         } else {
281             unknownMessage(message);
282         }
283     }
284
285     private void onRegisterForShardAvailabilityChanges(final RegisterForShardAvailabilityChanges message) {
286         LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
287
288         final Consumer<String> callback = message.getCallback();
289         shardAvailabilityCallbacks.add(callback);
290
291         getSender().tell(new Status.Success((Registration)
292             () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self());
293     }
294
295     private void onGetShardRole(final GetShardRole message) {
296         LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
297
298         final String name = message.getName();
299
300         final ShardInformation shardInformation = localShards.get(name);
301
302         if (shardInformation == null) {
303             LOG.info("{}: no shard information for {} found", persistenceId(), name);
304             getSender().tell(new Status.Failure(
305                     new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender());
306             return;
307         }
308
309         getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
310     }
311
312     void onShutDown() {
313         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
314         for (ShardInformation info : localShards.values()) {
315             if (info.getActor() != null) {
316                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
317
318                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
319                         .getElectionTimeOutInterval().$times(2);
320                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
321             }
322         }
323
324         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
325
326         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
327                 .getDispatcher(Dispatchers.DispatcherType.Client);
328         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
329
330         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
331             @Override
332             public void onComplete(final Throwable failure, final Iterable<Boolean> results) {
333                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
334
335                 self().tell(PoisonPill.getInstance(), self());
336
337                 if (failure != null) {
338                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
339                 } else {
340                     int nfailed = 0;
341                     for (Boolean result : results) {
342                         if (!result) {
343                             nfailed++;
344                         }
345                     }
346
347                     if (nfailed > 0) {
348                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
349                     }
350                 }
351             }
352         }, dispatcher);
353     }
354
355     private void onWrappedShardResponse(final WrappedShardResponse message) {
356         if (message.getResponse() instanceof RemoveServerReply) {
357             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
358                     message.getLeaderPath());
359         }
360     }
361
362     private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId,
363             final RemoveServerReply replyMsg, final String leaderPath) {
364         shardReplicaOperationsInProgress.remove(shardId.getShardName());
365
366         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
367
368         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
369             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
370                     shardId.getShardName());
371             originalSender.tell(new Status.Success(null), getSelf());
372         } else {
373             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
374                     persistenceId(), shardId, replyMsg.getStatus());
375
376             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
377             originalSender.tell(new Status.Failure(failure), getSelf());
378         }
379     }
380
381     private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
382             final String primaryPath, final ActorRef sender) {
383         if (isShardReplicaOperationInProgress(shardName, sender)) {
384             return;
385         }
386
387         shardReplicaOperationsInProgress.add(shardName);
388
389         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
390
391         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
392
393         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
394         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
395                 primaryPath, shardId);
396
397         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
398         Future<Object> futureObj = Patterns.ask(getContext().actorSelection(primaryPath),
399                 new RemoveServer(shardId.toString()), removeServerTimeout);
400
401         futureObj.onComplete(new OnComplete<>() {
402             @Override
403             public void onComplete(final Throwable failure, final Object response) {
404                 if (failure != null) {
405                     shardReplicaOperationsInProgress.remove(shardName);
406                     LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
407                         shardName, failure);
408
409                     // FAILURE
410                     sender.tell(new Status.Failure(new RuntimeException(
411                         String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
412                         failure)), self());
413                 } else {
414                     // SUCCESS
415                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
416                 }
417             }
418         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
419     }
420
421     private void onShardReplicaRemoved(final ServerRemoved message) {
422         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
423     }
424
425     @SuppressWarnings("checkstyle:IllegalCatch")
426     private void removeShard(final ShardIdentifier shardId) {
427         final String shardName = shardId.getShardName();
428         final ShardInformation shardInformation = localShards.remove(shardName);
429         if (shardInformation == null) {
430             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
431             return;
432         }
433
434         final ActorRef shardActor = shardInformation.getActor();
435         if (shardActor != null) {
436             long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
437                     .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
438
439             LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
440                     timeoutInMS);
441
442             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
443                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
444
445             final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<>() {
446                 @Override
447                 public void onComplete(final Throwable failure, final Boolean result) {
448                     if (failure == null) {
449                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
450                     } else {
451                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
452                     }
453
454                     self().tell((RunnableMessage) () -> {
455                         // At any rate, invalidate primaryShardInfo cache
456                         primaryShardInfoCache.remove(shardName);
457
458                         shardActorsStopping.remove(shardName);
459                         notifyOnCompleteTasks(failure, result);
460                     }, ActorRef.noSender());
461                 }
462             };
463
464             shardActorsStopping.put(shardName, onComplete);
465             stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
466                     .getDispatcher(Dispatchers.DispatcherType.Client));
467         }
468
469         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
470         persistShardList();
471     }
472
473     private void onGetSnapshot(final GetSnapshot getSnapshot) {
474         LOG.debug("{}: onGetSnapshot", persistenceId());
475
476         List<String> notInitialized = null;
477         for (ShardInformation shardInfo : localShards.values()) {
478             if (!shardInfo.isShardInitialized()) {
479                 if (notInitialized == null) {
480                     notInitialized = new ArrayList<>();
481                 }
482
483                 notInitialized.add(shardInfo.getShardName());
484             }
485         }
486
487         if (notInitialized != null) {
488             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
489                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
490             return;
491         }
492
493         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
494                 new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
495                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
496
497         for (ShardInformation shardInfo: localShards.values()) {
498             shardInfo.getActor().tell(getSnapshot, replyActor);
499         }
500     }
501
502     @SuppressWarnings("checkstyle:IllegalCatch")
503     private void onCreateShard(final CreateShard createShard) {
504         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
505
506         Object reply;
507         try {
508             String shardName = createShard.getModuleShardConfig().getShardName();
509             if (localShards.containsKey(shardName)) {
510                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
511                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
512             } else {
513                 doCreateShard(createShard);
514                 reply = new Status.Success(null);
515             }
516         } catch (Exception e) {
517             LOG.error("{}: onCreateShard failed", persistenceId(), e);
518             reply = new Status.Failure(e);
519         }
520
521         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
522             getSender().tell(reply, getSelf());
523         }
524     }
525
526     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
527         final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
528         if (stopOnComplete == null) {
529             return false;
530         }
531
532         LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
533                 shardName, messageToDefer);
534         final ActorRef sender = getSender();
535         stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
536             @Override
537             public void onComplete(final Throwable failure, final Boolean result) {
538                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
539                 self().tell(messageToDefer, sender);
540             }
541         });
542
543         return true;
544     }
545
546     private void doCreateShard(final CreateShard createShard) {
547         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
548         final String shardName = moduleShardConfig.getShardName();
549
550         configuration.addModuleShardConfiguration(moduleShardConfig);
551
552         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
553         if (shardDatastoreContext == null) {
554             shardDatastoreContext = newShardDatastoreContext(shardName);
555         } else {
556             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
557                     peerAddressResolver).build();
558         }
559
560         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
561
562         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
563                 && currentSnapshot.getShardList().contains(shardName);
564
565         Map<String, String> peerAddresses;
566         boolean isActiveMember;
567         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
568                 .contains(cluster.getCurrentMemberName())) {
569             peerAddresses = getPeerAddresses(shardName);
570             isActiveMember = true;
571         } else {
572             // The local member is not in the static shard member configuration and the shard did not
573             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
574             // the shard with no peers and with elections disabled so it stays as follower. A
575             // subsequent AddServer request will be needed to make it an active member.
576             isActiveMember = false;
577             peerAddresses = Map.of();
578             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
579                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
580         }
581
582         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
583                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
584                 isActiveMember);
585
586         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
587                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
588         info.setActiveMember(isActiveMember);
589         localShards.put(info.getShardName(), info);
590
591         if (modelContext != null) {
592             info.setSchemaContext(modelContext);
593             info.setActor(newShardActor(info));
594         }
595     }
596
597     private DatastoreContext.Builder newShardDatastoreContextBuilder(final String shardName) {
598         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
599                 .shardPeerAddressResolver(peerAddressResolver);
600     }
601
602     private DatastoreContext newShardDatastoreContext(final String shardName) {
603         return newShardDatastoreContextBuilder(shardName).build();
604     }
605
606     private void checkReady() {
607         if (isReadyWithLeaderId()) {
608             LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type);
609             readinessFuture.set(Empty.value());
610         }
611     }
612
613     private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) {
614         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
615
616         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
617         if (shardInformation != null) {
618             shardInformation.setLocalDataTree(leaderStateChanged.localShardDataTree());
619             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
620             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
621                 primaryShardInfoCache.remove(shardInformation.getShardName());
622
623                 notifyShardAvailabilityCallbacks(shardInformation);
624             }
625
626             checkReady();
627         } else {
628             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
629         }
630     }
631
632     private void notifyShardAvailabilityCallbacks(final ShardInformation shardInformation) {
633         shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName()));
634     }
635
636     private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
637         ShardInformation shardInfo = message.getShardInfo();
638
639         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
640                 shardInfo.getShardName());
641
642         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
643
644         if (!shardInfo.isShardInitialized()) {
645             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
646             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
647         } else {
648             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
649             message.getSender().tell(new NoShardLeaderException(shardInfo.getShardId()), getSelf());
650         }
651     }
652
653     private void onFollowerInitialSyncStatus(final FollowerInitialSyncUpStatus status) {
654         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
655                 status.getName(), status.isInitialSyncDone());
656
657         ShardInformation shardInformation = findShardInformation(status.getName());
658
659         if (shardInformation != null) {
660             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
661
662             shardManagerMBean.setSyncStatus(isInSync());
663         }
664
665     }
666
667     private void onRoleChangeNotification(final RoleChangeNotification roleChanged) {
668         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
669                 roleChanged.getOldRole(), roleChanged.getNewRole());
670
671         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
672         if (shardInformation != null) {
673             shardInformation.setRole(roleChanged.getNewRole());
674             checkReady();
675             shardManagerMBean.setSyncStatus(isInSync());
676         }
677     }
678
679
680     private ShardInformation findShardInformation(final String memberId) {
681         for (ShardInformation info : localShards.values()) {
682             if (info.getShardId().toString().equals(memberId)) {
683                 return info;
684             }
685         }
686
687         return null;
688     }
689
690     private boolean isReadyWithLeaderId() {
691         boolean isReady = true;
692         for (ShardInformation info : localShards.values()) {
693             if (!info.isShardReadyWithLeaderId()) {
694                 isReady = false;
695                 break;
696             }
697         }
698         return isReady;
699     }
700
701     private boolean isInSync() {
702         for (ShardInformation info : localShards.values()) {
703             if (!info.isInSync()) {
704                 return false;
705             }
706         }
707         return true;
708     }
709
710     private void onActorInitialized(final ActorInitialized message) {
711         final var sender = message.actorRef();
712
713         String actorName = sender.path().name();
714         //find shard name from actor name; actor name is stringified shardId
715
716         final ShardIdentifier shardId;
717         try {
718             shardId = ShardIdentifier.fromShardIdString(actorName);
719         } catch (IllegalArgumentException e) {
720             LOG.debug("{}: ignoring actor {}", persistenceId, actorName, e);
721             return;
722         }
723
724         markShardAsInitialized(shardId.getShardName());
725     }
726
727     private void markShardAsInitialized(final String shardName) {
728         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
729
730         ShardInformation shardInformation = localShards.get(shardName);
731         if (shardInformation != null) {
732             shardInformation.setActorInitialized();
733
734             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
735         }
736     }
737
738     @Override
739     protected void handleRecover(final Object message) throws Exception {
740         if (message instanceof RecoveryCompleted) {
741             onRecoveryCompleted();
742         } else if (message instanceof SnapshotOffer msg) {
743             applyShardManagerSnapshot((ShardManagerSnapshot) msg.snapshot());
744         }
745     }
746
747     @SuppressWarnings("checkstyle:IllegalCatch")
748     private void onRecoveryCompleted() {
749         LOG.info("Recovery complete : {}", persistenceId());
750
751         if (currentSnapshot == null && restoreFromSnapshot != null
752                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
753             ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
754
755             LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
756
757             applyShardManagerSnapshot(snapshot);
758         }
759
760         createLocalShards();
761     }
762
763     private void sendResponse(final ShardInformation shardInformation, final boolean doWait,
764             final boolean wantShardReady, final Supplier<Object> messageSupplier) {
765         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
766             if (doWait) {
767                 final ActorRef sender = getSender();
768                 final ActorRef self = self();
769
770                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
771
772                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
773                     new OnShardInitialized(replyRunnable);
774
775                 shardInformation.addOnShardInitialized(onShardInitialized);
776
777                 FiniteDuration timeout = shardInformation.getDatastoreContext()
778                         .getShardInitializationTimeout().duration();
779                 if (shardInformation.isShardInitialized()) {
780                     // If the shard is already initialized then we'll wait enough time for the shard to
781                     // elect a leader, ie 2 times the election timeout.
782                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
783                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
784                 }
785
786                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
787                         shardInformation);
788
789                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
790                         timeout, getSelf(),
791                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
792                         getContext().dispatcher(), getSelf());
793
794                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
795
796             } else if (!shardInformation.isShardInitialized()) {
797                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
798                         shardInformation.getShardName());
799                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
800             } else {
801                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
802                         shardInformation.getShardName());
803                 getSender().tell(new NoShardLeaderException(shardInformation.getShardId()), getSelf());
804             }
805
806             return;
807         }
808
809         getSender().tell(messageSupplier.get(), getSelf());
810     }
811
812     private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
813         return new NotInitializedException(String.format(
814                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
815     }
816
817     @VisibleForTesting
818     static MemberName memberToName(final Member member) {
819         return MemberName.forName(member.roles().iterator().next());
820     }
821
822     private void memberRemoved(final ClusterEvent.MemberRemoved message) {
823         MemberName memberName = memberToName(message.member());
824
825         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
826                 message.member().address());
827
828         peerAddressResolver.removePeerAddress(memberName);
829     }
830
831     private void memberExited(final ClusterEvent.MemberExited message) {
832         MemberName memberName = memberToName(message.member());
833
834         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
835                 message.member().address());
836
837         peerAddressResolver.removePeerAddress(memberName);
838     }
839
840     private void memberUp(final ClusterEvent.MemberUp message) {
841         MemberName memberName = memberToName(message.member());
842
843         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
844                 message.member().address());
845
846         memberUp(memberName, message.member().address());
847     }
848
849     private void memberUp(final MemberName memberName, final Address address) {
850         addPeerAddress(memberName, address);
851         checkReady();
852     }
853
854     private void memberWeaklyUp(final MemberWeaklyUp message) {
855         MemberName memberName = memberToName(message.member());
856
857         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
858                 message.member().address());
859
860         memberUp(memberName, message.member().address());
861     }
862
863     private void addPeerAddress(final MemberName memberName, final Address address) {
864         peerAddressResolver.addPeerAddress(memberName, address);
865
866         for (ShardInformation info : localShards.values()) {
867             String shardName = info.getShardName();
868             String peerId = getShardIdentifier(memberName, shardName).toString();
869             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
870         }
871     }
872
873     private void memberReachable(final ClusterEvent.ReachableMember message) {
874         MemberName memberName = memberToName(message.member());
875         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
876
877         addPeerAddress(memberName, message.member().address());
878
879         markMemberAvailable(memberName);
880     }
881
882     private void memberUnreachable(final ClusterEvent.UnreachableMember message) {
883         MemberName memberName = memberToName(message.member());
884         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
885
886         markMemberUnavailable(memberName);
887     }
888
889     private void markMemberUnavailable(final MemberName memberName) {
890         for (ShardInformation info : localShards.values()) {
891             String leaderId = info.getLeaderId();
892             if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
893                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
894                 info.setLeaderAvailable(false);
895
896                 primaryShardInfoCache.remove(info.getShardName());
897
898                 notifyShardAvailabilityCallbacks(info);
899             }
900         }
901     }
902
903     private void markMemberAvailable(final MemberName memberName) {
904         for (ShardInformation info : localShards.values()) {
905             String leaderId = info.getLeaderId();
906             if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
907                 LOG.debug("Marking Leader {} as available.", leaderId);
908                 info.setLeaderAvailable(true);
909             }
910         }
911     }
912
913     private void onDatastoreContextFactory(final DatastoreContextFactory factory) {
914         datastoreContextFactory = factory;
915         for (ShardInformation info : localShards.values()) {
916             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
917         }
918     }
919
920     private void onGetLocalShardIds() {
921         final List<String> response = new ArrayList<>(localShards.size());
922
923         for (ShardInformation info : localShards.values()) {
924             response.add(info.getShardId().toString());
925         }
926
927         getSender().tell(new Status.Success(response), getSelf());
928     }
929
930     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
931         final ShardIdentifier identifier = message.getShardId();
932
933         if (identifier != null) {
934             final ShardInformation info = localShards.get(identifier.getShardName());
935             if (info == null) {
936                 getSender().tell(new Status.Failure(
937                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
938                 return;
939             }
940
941             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
942         } else {
943             for (ShardInformation info : localShards.values()) {
944                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
945             }
946         }
947
948         getSender().tell(new Status.Success(null), getSelf());
949     }
950
951     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
952         final ActorRef actor = info.getActor();
953         if (actor != null) {
954             actor.tell(switchBehavior, getSelf());
955         } else {
956             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
957                 info.getShardName(), switchBehavior.getNewState());
958         }
959     }
960
961     /**
962      * Notifies all the local shards of a change in the schema context.
963      *
964      * @param message the message to send
965      */
966     private void updateSchemaContext(final UpdateSchemaContext message) {
967         modelContext = message.modelContext();
968
969         LOG.debug("Got updated SchemaContext: # of modules {}", modelContext.getModules().size());
970
971         for (ShardInformation info : localShards.values()) {
972             info.setSchemaContext(modelContext);
973
974             if (info.getActor() == null) {
975                 LOG.debug("Creating Shard {}", info.getShardId());
976                 info.setActor(newShardActor(info));
977                 // Update peer address for every existing peer memeber to avoid missing sending
978                 // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp.
979                 String shardName = info.getShardName();
980                 for (MemberName memberName : peerAddressResolver.getPeerMembers()) {
981                     String peerId = getShardIdentifier(memberName, shardName).toString() ;
982                     String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName);
983                     info.updatePeerAddress(peerId, peerAddress, getSelf());
984                     LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}",
985                             persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
986                 }
987             } else {
988                 info.getActor().tell(message, getSelf());
989             }
990         }
991     }
992
993     @VisibleForTesting
994     protected ClusterWrapper getCluster() {
995         return cluster;
996     }
997
998     @VisibleForTesting
999     protected ActorRef newShardActor(final ShardInformation info) {
1000         return getContext().actorOf(info.newProps().withDispatcher(shardDispatcherPath),
1001                 info.getShardId().toString());
1002     }
1003
1004     private void findPrimary(final FindPrimary message) {
1005         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
1006
1007         final String shardName = message.getShardName();
1008         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
1009
1010         // First see if the there is a local replica for the shard
1011         final ShardInformation info = localShards.get(shardName);
1012         if (info != null && info.isActiveMember()) {
1013             sendResponse(info, message.isWaitUntilReady(), true, () -> {
1014                 String primaryPath = info.getSerializedLeaderActor();
1015                 Object found = canReturnLocalShardState && info.isLeader()
1016                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().orElseThrow()) :
1017                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
1018
1019                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
1020                 return found;
1021             });
1022
1023             return;
1024         }
1025
1026         final Collection<String> visitedAddresses;
1027         if (message instanceof RemoteFindPrimary) {
1028             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1029         } else {
1030             visitedAddresses = new ArrayList<>(1);
1031         }
1032
1033         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1034
1035         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1036             if (visitedAddresses.contains(address)) {
1037                 continue;
1038             }
1039
1040             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1041                     persistenceId(), shardName, address, visitedAddresses);
1042
1043             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1044                     message.isWaitUntilReady(), visitedAddresses), getContext());
1045             return;
1046         }
1047
1048         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1049
1050         getSender().tell(new PrimaryNotFoundException(
1051                 String.format("No primary shard found for %s.", shardName)), getSelf());
1052     }
1053
1054     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1055         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1056                 .getShardInitializationTimeout().duration().$times(2));
1057
1058         Future<Object> futureObj = Patterns.ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1059         futureObj.onComplete(new OnComplete<>() {
1060             @Override
1061             public void onComplete(final Throwable failure, final Object response) {
1062                 if (failure != null) {
1063                     handler.onFailure(failure);
1064                 } else if (response instanceof RemotePrimaryShardFound msg) {
1065                     handler.onRemotePrimaryShardFound(msg);
1066                 } else if (response instanceof LocalPrimaryShardFound msg) {
1067                     handler.onLocalPrimaryFound(msg);
1068                 } else {
1069                     handler.onUnknownResponse(response);
1070                 }
1071             }
1072         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1073     }
1074
1075     /**
1076      * Construct the name of the shard actor given the name of the member on
1077      * which the shard resides and the name of the shard.
1078      *
1079      * @param memberName the member name
1080      * @param shardName the shard name
1081      * @return a b
1082      */
1083     private ShardIdentifier getShardIdentifier(final MemberName memberName, final String shardName) {
1084         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1085     }
1086
1087     /**
1088      * Create shards that are local to the member on which the ShardManager runs.
1089      */
1090     private void createLocalShards() {
1091         MemberName memberName = cluster.getCurrentMemberName();
1092         Collection<String> memberShardNames = configuration.getMemberShardNames(memberName);
1093
1094         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1095         if (restoreFromSnapshot != null) {
1096             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1097                 shardSnapshots.put(snapshot.getName(), snapshot);
1098             }
1099         }
1100
1101         // null out to GC
1102         restoreFromSnapshot = null;
1103
1104         for (String shardName : memberShardNames) {
1105             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1106
1107             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1108
1109             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1110             localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
1111                     newShardDatastoreContext(shardName), shardSnapshots));
1112         }
1113     }
1114
1115     @VisibleForTesting
1116     ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId,
1117                                         final Map<String, String> peerAddresses,
1118                                         final DatastoreContext datastoreContext,
1119                                         final Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
1120         return new ShardInformation(shardName, shardId, peerAddresses,
1121                 datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
1122                 peerAddressResolver);
1123     }
1124
1125     /**
1126      * Given the name of the shard find the addresses of all it's peers.
1127      *
1128      * @param shardName the shard name
1129      */
1130     Map<String, String> getPeerAddresses(final String shardName) {
1131         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1132         return getPeerAddresses(shardName, members);
1133     }
1134
1135     private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
1136         Map<String, String> peerAddresses = new HashMap<>();
1137         MemberName currentMemberName = cluster.getCurrentMemberName();
1138
1139         for (MemberName memberName : members) {
1140             if (!currentMemberName.equals(memberName)) {
1141                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1142                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1143                 peerAddresses.put(shardId.toString(), address);
1144             }
1145         }
1146         return peerAddresses;
1147     }
1148
1149     @Override
1150     public SupervisorStrategy supervisorStrategy() {
1151
1152         return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES),
1153                 (Function<Throwable, Directive>) t -> {
1154                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1155                     return SupervisorStrategy.resume();
1156                 });
1157     }
1158
1159     @Override
1160     public String persistenceId() {
1161         return persistenceId;
1162     }
1163
1164     @VisibleForTesting
1165     ShardManagerInfoMBean getMBean() {
1166         return shardManagerMBean;
1167     }
1168
1169     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1170         if (shardReplicaOperationsInProgress.contains(shardName)) {
1171             LOG.debug("{}: A shard replica operation for {} is already in progress", persistenceId(), shardName);
1172             sender.tell(new Status.Failure(new IllegalStateException(
1173                 String.format("A shard replica operation for %s is already in progress", shardName))), getSelf());
1174             return true;
1175         }
1176
1177         return false;
1178     }
1179
1180     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1181         final String shardName = shardReplicaMsg.getShardName();
1182
1183         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1184
1185         // verify the shard with the specified name is present in the cluster configuration
1186         if (!configuration.isShardConfigured(shardName)) {
1187             LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName);
1188             getSender().tell(new Status.Failure(new IllegalArgumentException(
1189                 "No module configuration exists for shard " + shardName)), getSelf());
1190             return;
1191         }
1192
1193         // Create the localShard
1194         if (modelContext == null) {
1195             LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
1196                 persistenceId(), shardName);
1197             getSender().tell(new Status.Failure(new IllegalStateException(
1198                 "No SchemaContext is available in order to create a local shard instance for " + shardName)),
1199                 getSelf());
1200             return;
1201         }
1202
1203         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1204                 getSelf()) {
1205             @Override
1206             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1207                 final RunnableMessage runnable = (RunnableMessage) () ->
1208                     addShard(getShardName(), response, getSender());
1209                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1210                     getSelf().tell(runnable, getTargetActor());
1211                 }
1212             }
1213
1214             @Override
1215             public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
1216                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1217             }
1218         });
1219     }
1220
1221     private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
1222         LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
1223         sender.tell(new Status.Failure(new AlreadyExistsException(
1224             String.format("Local shard %s already exists", shardName))), getSelf());
1225     }
1226
1227     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1228         if (isShardReplicaOperationInProgress(shardName, sender)) {
1229             return;
1230         }
1231
1232         shardReplicaOperationsInProgress.add(shardName);
1233
1234         final ShardInformation shardInfo;
1235         final boolean removeShardOnFailure;
1236         ShardInformation existingShardInfo = localShards.get(shardName);
1237         if (existingShardInfo == null) {
1238             removeShardOnFailure = true;
1239             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1240
1241             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1242                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1243
1244             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1245                     Shard.builder(), peerAddressResolver);
1246             shardInfo.setActiveMember(false);
1247             shardInfo.setSchemaContext(modelContext);
1248             localShards.put(shardName, shardInfo);
1249             shardInfo.setActor(newShardActor(shardInfo));
1250         } else {
1251             removeShardOnFailure = false;
1252             shardInfo = existingShardInfo;
1253         }
1254
1255         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1256     }
1257
1258     private void execAddShard(final String shardName,
1259                               final ShardInformation shardInfo,
1260                               final RemotePrimaryShardFound response,
1261                               final boolean removeShardOnFailure,
1262                               final ActorRef sender) {
1263
1264         final String localShardAddress =
1265                 peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1266
1267         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1268         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1269                 response.getPrimaryPath(), shardInfo.getShardId());
1270
1271         final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1272                 .getShardLeaderElectionTimeout().duration());
1273         final Future<Object> futureObj = Patterns.ask(getContext().actorSelection(response.getPrimaryPath()),
1274                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1275
1276         futureObj.onComplete(new OnComplete<>() {
1277             @Override
1278             public void onComplete(final Throwable failure, final Object addServerResponse) {
1279                 if (failure != null) {
1280                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1281                             response.getPrimaryPath(), shardName, failure);
1282
1283                     final String msg = String.format("AddServer request to leader %s for shard %s failed",
1284                             response.getPrimaryPath(), shardName);
1285                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1286                 } else {
1287                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1288                             response.getPrimaryPath(), removeShardOnFailure), sender);
1289                 }
1290             }
1291         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1292     }
1293
1294     private void onAddServerFailure(final String shardName, final String message, final Throwable failure,
1295             final ActorRef sender, final boolean removeShardOnFailure) {
1296         shardReplicaOperationsInProgress.remove(shardName);
1297
1298         if (removeShardOnFailure) {
1299             ShardInformation shardInfo = localShards.remove(shardName);
1300             if (shardInfo.getActor() != null) {
1301                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1302             }
1303         }
1304
1305         sender.tell(new Status.Failure(message == null ? failure :
1306             new RuntimeException(message, failure)), getSelf());
1307     }
1308
1309     private void onAddServerReply(final ShardInformation shardInfo, final AddServerReply replyMsg,
1310             final ActorRef sender, final String leaderPath, final boolean removeShardOnFailure) {
1311         String shardName = shardInfo.getShardName();
1312         shardReplicaOperationsInProgress.remove(shardName);
1313
1314         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1315
1316         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1317             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1318
1319             // Make the local shard voting capable
1320             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1321             shardInfo.setActiveMember(true);
1322             persistShardList();
1323
1324             sender.tell(new Status.Success(null), getSelf());
1325         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1326             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1327         } else {
1328             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1329                     persistenceId(), shardName, replyMsg.getStatus());
1330
1331             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1332                     shardInfo.getShardId());
1333
1334             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1335         }
1336     }
1337
1338     private static Exception getServerChangeException(final Class<?> serverChange,
1339             final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) {
1340         return switch (serverChangeStatus) {
1341             case TIMEOUT -> new TimeoutException("""
1342                 The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible \
1343                 causes - there was a problem replicating the data or shard leadership changed while replicating the \
1344                 shard data""".formatted(leaderPath, shardId.getShardName()));
1345             case NO_LEADER -> new NoShardLeaderException(shardId);
1346             case NOT_SUPPORTED -> new UnsupportedOperationException(
1347                 "%s request is not supported for shard %s".formatted(
1348                     serverChange.getSimpleName(), shardId.getShardName()));
1349             default -> new RuntimeException("%s request to leader %s for shard %s failed with status %s".formatted(
1350                 serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1351         };
1352     }
1353
1354     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1355         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1356
1357         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1358                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1359             @Override
1360             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1361                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1362             }
1363
1364             @Override
1365             public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
1366                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1367             }
1368
1369             private void doRemoveShardReplicaAsync(final String primaryPath) {
1370                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1371                         primaryPath, getSender()), getTargetActor());
1372             }
1373         });
1374     }
1375
1376     private void persistShardList() {
1377         List<String> shardList = new ArrayList<>(localShards.keySet());
1378         for (ShardInformation shardInfo : localShards.values()) {
1379             if (!shardInfo.isActiveMember()) {
1380                 shardList.remove(shardInfo.getShardName());
1381             }
1382         }
1383         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1384         saveSnapshot(updateShardManagerSnapshot(shardList));
1385     }
1386
1387     private ShardManagerSnapshot updateShardManagerSnapshot(final List<String> shardList) {
1388         currentSnapshot = new ShardManagerSnapshot(shardList);
1389         return currentSnapshot;
1390     }
1391
1392     private void applyShardManagerSnapshot(final ShardManagerSnapshot snapshot) {
1393         currentSnapshot = snapshot;
1394
1395         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1396
1397         final MemberName currentMember = cluster.getCurrentMemberName();
1398         Set<String> configuredShardList =
1399             new HashSet<>(configuration.getMemberShardNames(currentMember));
1400         for (String shard : currentSnapshot.getShardList()) {
1401             if (!configuredShardList.contains(shard)) {
1402                 // add the current member as a replica for the shard
1403                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1404                 configuration.addMemberReplicaForShard(shard, currentMember);
1405             } else {
1406                 configuredShardList.remove(shard);
1407             }
1408         }
1409         for (String shard : configuredShardList) {
1410             // remove the member as a replica for the shard
1411             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1412             configuration.removeMemberReplicaForShard(shard, currentMember);
1413         }
1414     }
1415
1416     private void onSaveSnapshotSuccess(final SaveSnapshotSuccess successMessage) {
1417         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1418             persistenceId());
1419         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1420             0, 0));
1421     }
1422
1423     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1424         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1425
1426         String shardName = changeMembersVotingStatus.getShardName();
1427         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1428         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1429             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1430                     e.getValue());
1431         }
1432
1433         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1434
1435         findLocalShard(shardName, getSender(),
1436             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1437             localShardFound.getPath(), getSender()));
1438     }
1439
1440     private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) {
1441         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1442
1443         ActorRef sender = getSender();
1444         final String shardName = flipMembersVotingStatus.getShardName();
1445         findLocalShard(shardName, sender, localShardFound -> {
1446             Future<Object> future = Patterns.ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1447                     Timeout.apply(30, TimeUnit.SECONDS));
1448
1449             future.onComplete(new OnComplete<>() {
1450                 @Override
1451                 public void onComplete(final Throwable failure, final Object response) {
1452                     if (failure != null) {
1453                         sender.tell(new Status.Failure(new RuntimeException(
1454                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1455                         return;
1456                     }
1457
1458                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1459                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1460                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1461                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1462                     }
1463
1464                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1465                             .toString(), !raftState.isVoting());
1466
1467                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1468                             shardName, localShardFound.getPath(), sender);
1469                 }
1470             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1471         });
1472
1473     }
1474
1475     private void findLocalShard(final FindLocalShard message) {
1476         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
1477
1478         final ShardInformation shardInformation = localShards.get(message.getShardName());
1479
1480         if (shardInformation == null) {
1481             LOG.debug("{}: Local shard {} not found - shards present: {}",
1482                     persistenceId(), message.getShardName(), localShards.keySet());
1483
1484             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1485             return;
1486         }
1487
1488         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1489             () -> new LocalShardFound(shardInformation.getActor()));
1490     }
1491
1492     private void findLocalShard(final String shardName, final ActorRef sender,
1493             final Consumer<LocalShardFound> onLocalShardFound) {
1494         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1495                 .getShardInitializationTimeout().duration().$times(2));
1496
1497         Future<Object> futureObj = Patterns.ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1498         futureObj.onComplete(new OnComplete<>() {
1499             @Override
1500             public void onComplete(final Throwable failure, final Object response) {
1501                 if (failure != null) {
1502                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1503                         failure);
1504                     sender.tell(new Status.Failure(new RuntimeException(
1505                         String.format("Failed to find local shard %s", shardName), failure)), self());
1506                 } if (response instanceof LocalShardFound msg) {
1507                     getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept(msg), sender);
1508                 } else if (response instanceof LocalShardNotFound) {
1509                     LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName);
1510                     sender.tell(new Status.Failure(new IllegalArgumentException(
1511                         String.format("Local shard %s does not exist", shardName))), self());
1512                 } else {
1513                     LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName,
1514                         response);
1515                     sender.tell(new Status.Failure(response instanceof Throwable throwable ? throwable
1516                         : new RuntimeException(String.format("Failed to find local shard %s: received response: %s",
1517                             shardName, response))), self());
1518                 }
1519             }
1520         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1521     }
1522
1523     private void changeShardMembersVotingStatus(final ChangeServersVotingStatus changeServersVotingStatus,
1524             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1525         if (isShardReplicaOperationInProgress(shardName, sender)) {
1526             return;
1527         }
1528
1529         shardReplicaOperationsInProgress.add(shardName);
1530
1531         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1532         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1533
1534         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1535                 changeServersVotingStatus, shardActorRef.path());
1536
1537         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1538         Future<Object> futureObj = Patterns.ask(shardActorRef, changeServersVotingStatus, timeout);
1539
1540         futureObj.onComplete(new OnComplete<>() {
1541             @Override
1542             public void onComplete(final Throwable failure, final Object response) {
1543                 shardReplicaOperationsInProgress.remove(shardName);
1544                 if (failure != null) {
1545                     LOG.debug("{}: ChangeServersVotingStatus request to local shard {} failed", persistenceId(),
1546                         shardActorRef.path(), failure);
1547                     sender.tell(new Status.Failure(new RuntimeException(
1548                         String.format("ChangeServersVotingStatus request to local shard %s failed",
1549                             shardActorRef.path()), failure)), self());
1550                 } else {
1551                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1552
1553                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1554                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1555                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1556                         sender.tell(new Status.Success(null), getSelf());
1557                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1558                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1559                                 "The requested voting state change for shard %s is invalid. At least one member "
1560                                 + "must be voting", shardId.getShardName()))), getSelf());
1561                     } else {
1562                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1563                                 persistenceId(), shardName, replyMsg.getStatus());
1564
1565                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1566                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1567                         sender.tell(new Status.Failure(error), getSelf());
1568                     }
1569                 }
1570             }
1571         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1572     }
1573
1574     private static final class ForwardedAddServerReply {
1575         ShardInformation shardInfo;
1576         AddServerReply addServerReply;
1577         String leaderPath;
1578         boolean removeShardOnFailure;
1579
1580         ForwardedAddServerReply(final ShardInformation shardInfo, final AddServerReply addServerReply,
1581             final String leaderPath, final boolean removeShardOnFailure) {
1582             this.shardInfo = shardInfo;
1583             this.addServerReply = addServerReply;
1584             this.leaderPath = leaderPath;
1585             this.removeShardOnFailure = removeShardOnFailure;
1586         }
1587     }
1588
1589     private static final class ForwardedAddServerFailure {
1590         String shardName;
1591         String failureMessage;
1592         Throwable failure;
1593         boolean removeShardOnFailure;
1594
1595         ForwardedAddServerFailure(final String shardName, final String failureMessage, final Throwable failure,
1596                 final boolean removeShardOnFailure) {
1597             this.shardName = shardName;
1598             this.failureMessage = failureMessage;
1599             this.failure = failure;
1600             this.removeShardOnFailure = removeShardOnFailure;
1601         }
1602     }
1603
1604     static class OnShardInitialized {
1605         private final Runnable replyRunnable;
1606         private Cancellable timeoutSchedule;
1607
1608         OnShardInitialized(final Runnable replyRunnable) {
1609             this.replyRunnable = replyRunnable;
1610         }
1611
1612         Runnable getReplyRunnable() {
1613             return replyRunnable;
1614         }
1615
1616         Cancellable getTimeoutSchedule() {
1617             return timeoutSchedule;
1618         }
1619
1620         void setTimeoutSchedule(final Cancellable timeoutSchedule) {
1621             this.timeoutSchedule = timeoutSchedule;
1622         }
1623     }
1624
1625     static class OnShardReady extends OnShardInitialized {
1626         OnShardReady(final Runnable replyRunnable) {
1627             super(replyRunnable);
1628         }
1629     }
1630
1631     private interface RunnableMessage extends Runnable {
1632     }
1633
1634     /**
1635      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1636      * a remote or local find primary message is processed.
1637      */
1638     private interface FindPrimaryResponseHandler {
1639         /**
1640          * Invoked when a Failure message is received as a response.
1641          *
1642          * @param failure the failure exception
1643          */
1644         void onFailure(Throwable failure);
1645
1646         /**
1647          * Invoked when a RemotePrimaryShardFound response is received.
1648          *
1649          * @param response the response
1650          */
1651         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1652
1653         /**
1654          * Invoked when a LocalPrimaryShardFound response is received.
1655          *
1656          * @param response the response
1657          */
1658         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1659
1660         /**
1661          * Invoked when an unknown response is received. This is another type of failure.
1662          *
1663          * @param response the response
1664          */
1665         void onUnknownResponse(Object response);
1666     }
1667
1668     /**
1669      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1670      * replica and sends a wrapped Failure response to some targetActor.
1671      */
1672     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1673         private final ActorRef targetActor;
1674         private final String shardName;
1675         private final String persistenceId;
1676         private final ActorRef shardManagerActor;
1677
1678         /**
1679          * Constructs an instance.
1680          *
1681          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1682          * @param shardName The name of the shard for which the primary replica had to be found
1683          * @param persistenceId The persistenceId for the ShardManager
1684          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1685          */
1686         protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
1687                 final String persistenceId, final ActorRef shardManagerActor) {
1688             this.targetActor = requireNonNull(targetActor);
1689             this.shardName = requireNonNull(shardName);
1690             this.persistenceId = requireNonNull(persistenceId);
1691             this.shardManagerActor = requireNonNull(shardManagerActor);
1692         }
1693
1694         public ActorRef getTargetActor() {
1695             return targetActor;
1696         }
1697
1698         public String getShardName() {
1699             return shardName;
1700         }
1701
1702         @Override
1703         public void onFailure(final Throwable failure) {
1704             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1705             targetActor.tell(new Status.Failure(new RuntimeException(
1706                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1707         }
1708
1709         @Override
1710         public void onUnknownResponse(final Object response) {
1711             LOG.debug("{}: Failed to find leader for shard {}: received response: {}", persistenceId, shardName,
1712                 response);
1713             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
1714                     : new RuntimeException(String.format("Failed to find leader for shard %s: received response: %s",
1715                         shardName, response))), shardManagerActor);
1716         }
1717     }
1718
1719     /**
1720      * The WrappedShardResponse class wraps a response from a Shard.
1721      */
1722     private static final class WrappedShardResponse {
1723         private final ShardIdentifier shardId;
1724         private final Object response;
1725         private final String leaderPath;
1726
1727         WrappedShardResponse(final ShardIdentifier shardId, final Object response, final String leaderPath) {
1728             this.shardId = shardId;
1729             this.response = response;
1730             this.leaderPath = leaderPath;
1731         }
1732
1733         ShardIdentifier getShardId() {
1734             return shardId;
1735         }
1736
1737         Object getResponse() {
1738             return response;
1739         }
1740
1741         String getLeaderPath() {
1742             return leaderPath;
1743         }
1744     }
1745
1746     private static final class ShardNotInitializedTimeout {
1747         private final ActorRef sender;
1748         private final ShardInformation shardInfo;
1749         private final OnShardInitialized onShardInitialized;
1750
1751         ShardNotInitializedTimeout(final ShardInformation shardInfo, final OnShardInitialized onShardInitialized,
1752             final ActorRef sender) {
1753             this.sender = sender;
1754             this.shardInfo = shardInfo;
1755             this.onShardInitialized = onShardInitialized;
1756         }
1757
1758         ActorRef getSender() {
1759             return sender;
1760         }
1761
1762         ShardInformation getShardInfo() {
1763             return shardInfo;
1764         }
1765
1766         OnShardInitialized getOnShardInitialized() {
1767             return onShardInitialized;
1768         }
1769     }
1770 }
1771
1772
1773