dd647606228baf9f0c3e66493f74d2ddd856d5f3
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static java.util.Objects.requireNonNull;
11
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Status;
18 import akka.actor.SupervisorStrategy;
19 import akka.actor.SupervisorStrategy.Directive;
20 import akka.cluster.ClusterEvent;
21 import akka.cluster.ClusterEvent.MemberWeaklyUp;
22 import akka.cluster.Member;
23 import akka.dispatch.Futures;
24 import akka.dispatch.OnComplete;
25 import akka.japi.Function;
26 import akka.pattern.Patterns;
27 import akka.persistence.DeleteSnapshotsFailure;
28 import akka.persistence.DeleteSnapshotsSuccess;
29 import akka.persistence.RecoveryCompleted;
30 import akka.persistence.SaveSnapshotFailure;
31 import akka.persistence.SaveSnapshotSuccess;
32 import akka.persistence.SnapshotOffer;
33 import akka.persistence.SnapshotSelectionCriteria;
34 import akka.util.Timeout;
35 import com.google.common.annotations.VisibleForTesting;
36 import com.google.common.util.concurrent.SettableFuture;
37 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.HashMap;
41 import java.util.HashSet;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Map.Entry;
45 import java.util.Set;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import java.util.function.Consumer;
49 import java.util.function.Supplier;
50 import org.opendaylight.controller.cluster.access.concepts.MemberName;
51 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
52 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
53 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
54 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
55 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
56 import org.opendaylight.controller.cluster.datastore.Shard;
57 import org.opendaylight.controller.cluster.datastore.config.Configuration;
58 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
59 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
60 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
61 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
63 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
64 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
65 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
66 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
67 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
68 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
69 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
70 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
71 import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
72 import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
73 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
76 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
77 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
78 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
79 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
80 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
81 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
82 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
83 import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
84 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
87 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
88 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
89 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
90 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
91 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
92 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
93 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
94 import org.opendaylight.controller.cluster.raft.messages.AddServer;
95 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
96 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
97 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
98 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
99 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
100 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
101 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
102 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
103 import org.opendaylight.yangtools.concepts.Registration;
104 import org.opendaylight.yangtools.yang.common.Empty;
105 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
108 import scala.concurrent.ExecutionContext;
109 import scala.concurrent.Future;
110 import scala.concurrent.duration.FiniteDuration;
111
112 /**
113  * Manages the shards for a data store. The ShardManager has the following jobs:
114  * <ul>
115  * <li> Create all the local shard replicas that belong on this cluster member
116  * <li> Find the address of the local shard
117  * <li> Find the primary replica for any given shard
118  * <li> Monitor the cluster members and store their addresses
119  * </ul>
120  */
121 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
122     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
123
124     // Stores a mapping between a shard name and it's corresponding information
125     // Shard names look like inventory, topology etc and are as specified in
126     // configuration
127     @VisibleForTesting
128     final Map<String, ShardInformation> localShards = new HashMap<>();
129
130     // The type of a ShardManager reflects the type of the datastore itself
131     // A data store could be of type config/operational
132     private final String type;
133
134     private final ClusterWrapper cluster;
135
136     private final Configuration configuration;
137
138     @VisibleForTesting
139     final String shardDispatcherPath;
140
141     private final ShardManagerInfo shardManagerMBean;
142
143     private DatastoreContextFactory datastoreContextFactory;
144
145     private final SettableFuture<Empty> readinessFuture;
146
147     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
148
149     @VisibleForTesting
150     final ShardPeerAddressResolver peerAddressResolver;
151
152     private EffectiveModelContext schemaContext;
153
154     private DatastoreSnapshot restoreFromSnapshot;
155
156     private ShardManagerSnapshot currentSnapshot;
157
158     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
159
160     private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
161
162     private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
163
164     private final String persistenceId;
165
166     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Akka class design")
167     ShardManager(final AbstractShardManagerCreator<?> builder) {
168         cluster = builder.getCluster();
169         configuration = builder.getConfiguration();
170         datastoreContextFactory = builder.getDatastoreContextFactory();
171         type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
172         shardDispatcherPath = new Dispatchers(context().system().dispatchers())
173             .getDispatcherPath(Dispatchers.DispatcherType.Shard);
174         readinessFuture = builder.getReadinessFuture();
175         primaryShardInfoCache = builder.getPrimaryShardInfoCache();
176         restoreFromSnapshot = builder.getRestoreFromSnapshot();
177
178         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
179         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
180
181         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
182
183         // Subscribe this actor to cluster member events
184         cluster.subscribeToMemberEvents(getSelf());
185
186         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
187                 "shard-manager-" + type,
188                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
189         shardManagerMBean.registerMBean();
190     }
191
192     @Override
193     public void preStart() {
194         LOG.info("Starting ShardManager {}", persistenceId);
195     }
196
197     @Override
198     public void postStop() {
199         LOG.info("Stopping ShardManager {}", persistenceId());
200
201         shardManagerMBean.unregisterMBean();
202     }
203
204     @Override
205     public void handleCommand(final Object message) throws Exception {
206         if (message instanceof FindPrimary msg) {
207             findPrimary(msg);
208         } else if (message instanceof FindLocalShard msg) {
209             findLocalShard(msg);
210         } else if (message instanceof UpdateSchemaContext msg) {
211             updateSchemaContext(msg);
212         } else if (message instanceof ActorInitialized msg) {
213             onActorInitialized(msg);
214         } else if (message instanceof ClusterEvent.MemberUp msg) {
215             memberUp(msg);
216         } else if (message instanceof ClusterEvent.MemberWeaklyUp msg) {
217             memberWeaklyUp(msg);
218         } else if (message instanceof ClusterEvent.MemberExited msg) {
219             memberExited(msg);
220         } else if (message instanceof ClusterEvent.MemberRemoved msg) {
221             memberRemoved(msg);
222         } else if (message instanceof ClusterEvent.UnreachableMember msg) {
223             memberUnreachable(msg);
224         } else if (message instanceof ClusterEvent.ReachableMember msg) {
225             memberReachable(msg);
226         } else if (message instanceof DatastoreContextFactory msg) {
227             onDatastoreContextFactory(msg);
228         } else if (message instanceof RoleChangeNotification msg) {
229             onRoleChangeNotification(msg);
230         } else if (message instanceof FollowerInitialSyncUpStatus msg) {
231             onFollowerInitialSyncStatus(msg);
232         } else if (message instanceof ShardNotInitializedTimeout msg) {
233             onShardNotInitializedTimeout(msg);
234         } else if (message instanceof ShardLeaderStateChanged msg) {
235             onLeaderStateChanged(msg);
236         } else if (message instanceof SwitchShardBehavior msg) {
237             onSwitchShardBehavior(msg);
238         } else if (message instanceof CreateShard msg) {
239             onCreateShard(msg);
240         } else if (message instanceof AddShardReplica msg) {
241             onAddShardReplica(msg);
242         } else if (message instanceof ForwardedAddServerReply msg) {
243             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure);
244         } else if (message instanceof ForwardedAddServerFailure msg) {
245             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
246         } else if (message instanceof RemoveShardReplica msg) {
247             onRemoveShardReplica(msg);
248         } else if (message instanceof WrappedShardResponse msg) {
249             onWrappedShardResponse(msg);
250         } else if (message instanceof GetSnapshot msg) {
251             onGetSnapshot(msg);
252         } else if (message instanceof ServerRemoved msg) {
253             onShardReplicaRemoved(msg);
254         } else if (message instanceof ChangeShardMembersVotingStatus msg) {
255             onChangeShardServersVotingStatus(msg);
256         } else if (message instanceof FlipShardMembersVotingStatus msg) {
257             onFlipShardMembersVotingStatus(msg);
258         } else if (message instanceof SaveSnapshotSuccess msg) {
259             onSaveSnapshotSuccess(msg);
260         } else if (message instanceof SaveSnapshotFailure msg) {
261             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), msg.cause());
262         } else if (message instanceof Shutdown) {
263             onShutDown();
264         } else if (message instanceof GetLocalShardIds) {
265             onGetLocalShardIds();
266         } else if (message instanceof GetShardRole msg) {
267             onGetShardRole(msg);
268         } else if (message instanceof RunnableMessage msg) {
269             msg.run();
270         } else if (message instanceof RegisterForShardAvailabilityChanges msg) {
271             onRegisterForShardAvailabilityChanges(msg);
272         } else if (message instanceof DeleteSnapshotsFailure msg) {
273             LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), msg.cause());
274         } else if (message instanceof DeleteSnapshotsSuccess) {
275             LOG.debug("{}: Successfully deleted prior snapshots", persistenceId());
276         } else if (message instanceof RegisterRoleChangeListenerReply) {
277             LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
278         } else if (message instanceof ClusterEvent.MemberEvent msg) {
279             LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), msg);
280         } else {
281             unknownMessage(message);
282         }
283     }
284
285     private void onRegisterForShardAvailabilityChanges(final RegisterForShardAvailabilityChanges message) {
286         LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
287
288         final Consumer<String> callback = message.getCallback();
289         shardAvailabilityCallbacks.add(callback);
290
291         getSender().tell(new Status.Success((Registration)
292             () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self());
293     }
294
295     private void onGetShardRole(final GetShardRole message) {
296         LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
297
298         final String name = message.getName();
299
300         final ShardInformation shardInformation = localShards.get(name);
301
302         if (shardInformation == null) {
303             LOG.info("{}: no shard information for {} found", persistenceId(), name);
304             getSender().tell(new Status.Failure(
305                     new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender());
306             return;
307         }
308
309         getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
310     }
311
312     void onShutDown() {
313         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
314         for (ShardInformation info : localShards.values()) {
315             if (info.getActor() != null) {
316                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
317
318                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
319                         .getElectionTimeOutInterval().$times(2);
320                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
321             }
322         }
323
324         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
325
326         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
327                 .getDispatcher(Dispatchers.DispatcherType.Client);
328         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
329
330         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
331             @Override
332             public void onComplete(final Throwable failure, final Iterable<Boolean> results) {
333                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
334
335                 self().tell(PoisonPill.getInstance(), self());
336
337                 if (failure != null) {
338                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
339                 } else {
340                     int nfailed = 0;
341                     for (Boolean result : results) {
342                         if (!result) {
343                             nfailed++;
344                         }
345                     }
346
347                     if (nfailed > 0) {
348                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
349                     }
350                 }
351             }
352         }, dispatcher);
353     }
354
355     private void onWrappedShardResponse(final WrappedShardResponse message) {
356         if (message.getResponse() instanceof RemoveServerReply) {
357             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
358                     message.getLeaderPath());
359         }
360     }
361
362     private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId,
363             final RemoveServerReply replyMsg, final String leaderPath) {
364         shardReplicaOperationsInProgress.remove(shardId.getShardName());
365
366         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
367
368         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
369             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
370                     shardId.getShardName());
371             originalSender.tell(new Status.Success(null), getSelf());
372         } else {
373             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
374                     persistenceId(), shardId, replyMsg.getStatus());
375
376             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
377             originalSender.tell(new Status.Failure(failure), getSelf());
378         }
379     }
380
381     private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
382             final String primaryPath, final ActorRef sender) {
383         if (isShardReplicaOperationInProgress(shardName, sender)) {
384             return;
385         }
386
387         shardReplicaOperationsInProgress.add(shardName);
388
389         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
390
391         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
392
393         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
394         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
395                 primaryPath, shardId);
396
397         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
398         Future<Object> futureObj = Patterns.ask(getContext().actorSelection(primaryPath),
399                 new RemoveServer(shardId.toString()), removeServerTimeout);
400
401         futureObj.onComplete(new OnComplete<>() {
402             @Override
403             public void onComplete(final Throwable failure, final Object response) {
404                 if (failure != null) {
405                     shardReplicaOperationsInProgress.remove(shardName);
406                     LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
407                         shardName, failure);
408
409                     // FAILURE
410                     sender.tell(new Status.Failure(new RuntimeException(
411                         String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
412                         failure)), self());
413                 } else {
414                     // SUCCESS
415                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
416                 }
417             }
418         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
419     }
420
421     private void onShardReplicaRemoved(final ServerRemoved message) {
422         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
423     }
424
425     @SuppressWarnings("checkstyle:IllegalCatch")
426     private void removeShard(final ShardIdentifier shardId) {
427         final String shardName = shardId.getShardName();
428         final ShardInformation shardInformation = localShards.remove(shardName);
429         if (shardInformation == null) {
430             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
431             return;
432         }
433
434         final ActorRef shardActor = shardInformation.getActor();
435         if (shardActor != null) {
436             long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
437                     .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
438
439             LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
440                     timeoutInMS);
441
442             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
443                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
444
445             final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<>() {
446                 @Override
447                 public void onComplete(final Throwable failure, final Boolean result) {
448                     if (failure == null) {
449                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
450                     } else {
451                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
452                     }
453
454                     self().tell((RunnableMessage) () -> {
455                         // At any rate, invalidate primaryShardInfo cache
456                         primaryShardInfoCache.remove(shardName);
457
458                         shardActorsStopping.remove(shardName);
459                         notifyOnCompleteTasks(failure, result);
460                     }, ActorRef.noSender());
461                 }
462             };
463
464             shardActorsStopping.put(shardName, onComplete);
465             stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
466                     .getDispatcher(Dispatchers.DispatcherType.Client));
467         }
468
469         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
470         persistShardList();
471     }
472
473     private void onGetSnapshot(final GetSnapshot getSnapshot) {
474         LOG.debug("{}: onGetSnapshot", persistenceId());
475
476         List<String> notInitialized = null;
477         for (ShardInformation shardInfo : localShards.values()) {
478             if (!shardInfo.isShardInitialized()) {
479                 if (notInitialized == null) {
480                     notInitialized = new ArrayList<>();
481                 }
482
483                 notInitialized.add(shardInfo.getShardName());
484             }
485         }
486
487         if (notInitialized != null) {
488             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
489                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
490             return;
491         }
492
493         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
494                 new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
495                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
496
497         for (ShardInformation shardInfo: localShards.values()) {
498             shardInfo.getActor().tell(getSnapshot, replyActor);
499         }
500     }
501
502     @SuppressWarnings("checkstyle:IllegalCatch")
503     private void onCreateShard(final CreateShard createShard) {
504         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
505
506         Object reply;
507         try {
508             String shardName = createShard.getModuleShardConfig().getShardName();
509             if (localShards.containsKey(shardName)) {
510                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
511                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
512             } else {
513                 doCreateShard(createShard);
514                 reply = new Status.Success(null);
515             }
516         } catch (Exception e) {
517             LOG.error("{}: onCreateShard failed", persistenceId(), e);
518             reply = new Status.Failure(e);
519         }
520
521         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
522             getSender().tell(reply, getSelf());
523         }
524     }
525
526     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
527         final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
528         if (stopOnComplete == null) {
529             return false;
530         }
531
532         LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
533                 shardName, messageToDefer);
534         final ActorRef sender = getSender();
535         stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
536             @Override
537             public void onComplete(final Throwable failure, final Boolean result) {
538                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
539                 self().tell(messageToDefer, sender);
540             }
541         });
542
543         return true;
544     }
545
546     private void doCreateShard(final CreateShard createShard) {
547         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
548         final String shardName = moduleShardConfig.getShardName();
549
550         configuration.addModuleShardConfiguration(moduleShardConfig);
551
552         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
553         if (shardDatastoreContext == null) {
554             shardDatastoreContext = newShardDatastoreContext(shardName);
555         } else {
556             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
557                     peerAddressResolver).build();
558         }
559
560         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
561
562         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
563                 && currentSnapshot.getShardList().contains(shardName);
564
565         Map<String, String> peerAddresses;
566         boolean isActiveMember;
567         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
568                 .contains(cluster.getCurrentMemberName())) {
569             peerAddresses = getPeerAddresses(shardName);
570             isActiveMember = true;
571         } else {
572             // The local member is not in the static shard member configuration and the shard did not
573             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
574             // the shard with no peers and with elections disabled so it stays as follower. A
575             // subsequent AddServer request will be needed to make it an active member.
576             isActiveMember = false;
577             peerAddresses = Map.of();
578             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
579                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
580         }
581
582         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
583                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
584                 isActiveMember);
585
586         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
587                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
588         info.setActiveMember(isActiveMember);
589         localShards.put(info.getShardName(), info);
590
591         if (schemaContext != null) {
592             info.setSchemaContext(schemaContext);
593             info.setActor(newShardActor(info));
594         }
595     }
596
597     private DatastoreContext.Builder newShardDatastoreContextBuilder(final String shardName) {
598         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
599                 .shardPeerAddressResolver(peerAddressResolver);
600     }
601
602     private DatastoreContext newShardDatastoreContext(final String shardName) {
603         return newShardDatastoreContextBuilder(shardName).build();
604     }
605
606     private void checkReady() {
607         if (isReadyWithLeaderId()) {
608             LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type);
609             readinessFuture.set(Empty.value());
610         }
611     }
612
613     private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) {
614         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
615
616         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
617         if (shardInformation != null) {
618             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
619             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
620             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
621                 primaryShardInfoCache.remove(shardInformation.getShardName());
622
623                 notifyShardAvailabilityCallbacks(shardInformation);
624             }
625
626             checkReady();
627         } else {
628             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
629         }
630     }
631
632     private void notifyShardAvailabilityCallbacks(final ShardInformation shardInformation) {
633         shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName()));
634     }
635
636     private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
637         ShardInformation shardInfo = message.getShardInfo();
638
639         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
640                 shardInfo.getShardName());
641
642         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
643
644         if (!shardInfo.isShardInitialized()) {
645             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
646             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
647         } else {
648             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
649             message.getSender().tell(new NoShardLeaderException(shardInfo.getShardId()), getSelf());
650         }
651     }
652
653     private void onFollowerInitialSyncStatus(final FollowerInitialSyncUpStatus status) {
654         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
655                 status.getName(), status.isInitialSyncDone());
656
657         ShardInformation shardInformation = findShardInformation(status.getName());
658
659         if (shardInformation != null) {
660             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
661
662             shardManagerMBean.setSyncStatus(isInSync());
663         }
664
665     }
666
667     private void onRoleChangeNotification(final RoleChangeNotification roleChanged) {
668         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
669                 roleChanged.getOldRole(), roleChanged.getNewRole());
670
671         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
672         if (shardInformation != null) {
673             shardInformation.setRole(roleChanged.getNewRole());
674             checkReady();
675             shardManagerMBean.setSyncStatus(isInSync());
676         }
677     }
678
679
680     private ShardInformation findShardInformation(final String memberId) {
681         for (ShardInformation info : localShards.values()) {
682             if (info.getShardId().toString().equals(memberId)) {
683                 return info;
684             }
685         }
686
687         return null;
688     }
689
690     private boolean isReadyWithLeaderId() {
691         boolean isReady = true;
692         for (ShardInformation info : localShards.values()) {
693             if (!info.isShardReadyWithLeaderId()) {
694                 isReady = false;
695                 break;
696             }
697         }
698         return isReady;
699     }
700
701     private boolean isInSync() {
702         for (ShardInformation info : localShards.values()) {
703             if (!info.isInSync()) {
704                 return false;
705             }
706         }
707         return true;
708     }
709
710     private void onActorInitialized(final ActorInitialized message) {
711         final ActorRef sender = getSender();
712
713         if (sender == null) {
714             // why is a non-actor sending this message? Just ignore.
715             return;
716         }
717
718         String actorName = sender.path().name();
719         //find shard name from actor name; actor name is stringified shardId
720
721         final ShardIdentifier shardId;
722         try {
723             shardId = ShardIdentifier.fromShardIdString(actorName);
724         } catch (IllegalArgumentException e) {
725             LOG.debug("{}: ignoring actor {}", persistenceId, actorName, e);
726             return;
727         }
728
729         markShardAsInitialized(shardId.getShardName());
730     }
731
732     private void markShardAsInitialized(final String shardName) {
733         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
734
735         ShardInformation shardInformation = localShards.get(shardName);
736         if (shardInformation != null) {
737             shardInformation.setActorInitialized();
738
739             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
740         }
741     }
742
743     @Override
744     protected void handleRecover(final Object message) throws Exception {
745         if (message instanceof RecoveryCompleted) {
746             onRecoveryCompleted();
747         } else if (message instanceof SnapshotOffer) {
748             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
749         }
750     }
751
752     @SuppressWarnings("checkstyle:IllegalCatch")
753     private void onRecoveryCompleted() {
754         LOG.info("Recovery complete : {}", persistenceId());
755
756         if (currentSnapshot == null && restoreFromSnapshot != null
757                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
758             ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
759
760             LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
761
762             applyShardManagerSnapshot(snapshot);
763         }
764
765         createLocalShards();
766     }
767
768     private void sendResponse(final ShardInformation shardInformation, final boolean doWait,
769             final boolean wantShardReady, final Supplier<Object> messageSupplier) {
770         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
771             if (doWait) {
772                 final ActorRef sender = getSender();
773                 final ActorRef self = self();
774
775                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
776
777                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
778                     new OnShardInitialized(replyRunnable);
779
780                 shardInformation.addOnShardInitialized(onShardInitialized);
781
782                 FiniteDuration timeout = shardInformation.getDatastoreContext()
783                         .getShardInitializationTimeout().duration();
784                 if (shardInformation.isShardInitialized()) {
785                     // If the shard is already initialized then we'll wait enough time for the shard to
786                     // elect a leader, ie 2 times the election timeout.
787                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
788                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
789                 }
790
791                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
792                         shardInformation);
793
794                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
795                         timeout, getSelf(),
796                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
797                         getContext().dispatcher(), getSelf());
798
799                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
800
801             } else if (!shardInformation.isShardInitialized()) {
802                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
803                         shardInformation.getShardName());
804                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
805             } else {
806                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
807                         shardInformation.getShardName());
808                 getSender().tell(new NoShardLeaderException(shardInformation.getShardId()), getSelf());
809             }
810
811             return;
812         }
813
814         getSender().tell(messageSupplier.get(), getSelf());
815     }
816
817     private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
818         return new NotInitializedException(String.format(
819                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
820     }
821
822     @VisibleForTesting
823     static MemberName memberToName(final Member member) {
824         return MemberName.forName(member.roles().iterator().next());
825     }
826
827     private void memberRemoved(final ClusterEvent.MemberRemoved message) {
828         MemberName memberName = memberToName(message.member());
829
830         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
831                 message.member().address());
832
833         peerAddressResolver.removePeerAddress(memberName);
834     }
835
836     private void memberExited(final ClusterEvent.MemberExited message) {
837         MemberName memberName = memberToName(message.member());
838
839         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
840                 message.member().address());
841
842         peerAddressResolver.removePeerAddress(memberName);
843     }
844
845     private void memberUp(final ClusterEvent.MemberUp message) {
846         MemberName memberName = memberToName(message.member());
847
848         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
849                 message.member().address());
850
851         memberUp(memberName, message.member().address());
852     }
853
854     private void memberUp(final MemberName memberName, final Address address) {
855         addPeerAddress(memberName, address);
856         checkReady();
857     }
858
859     private void memberWeaklyUp(final MemberWeaklyUp message) {
860         MemberName memberName = memberToName(message.member());
861
862         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
863                 message.member().address());
864
865         memberUp(memberName, message.member().address());
866     }
867
868     private void addPeerAddress(final MemberName memberName, final Address address) {
869         peerAddressResolver.addPeerAddress(memberName, address);
870
871         for (ShardInformation info : localShards.values()) {
872             String shardName = info.getShardName();
873             String peerId = getShardIdentifier(memberName, shardName).toString();
874             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
875         }
876     }
877
878     private void memberReachable(final ClusterEvent.ReachableMember message) {
879         MemberName memberName = memberToName(message.member());
880         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
881
882         addPeerAddress(memberName, message.member().address());
883
884         markMemberAvailable(memberName);
885     }
886
887     private void memberUnreachable(final ClusterEvent.UnreachableMember message) {
888         MemberName memberName = memberToName(message.member());
889         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
890
891         markMemberUnavailable(memberName);
892     }
893
894     private void markMemberUnavailable(final MemberName memberName) {
895         for (ShardInformation info : localShards.values()) {
896             String leaderId = info.getLeaderId();
897             if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
898                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
899                 info.setLeaderAvailable(false);
900
901                 primaryShardInfoCache.remove(info.getShardName());
902
903                 notifyShardAvailabilityCallbacks(info);
904             }
905         }
906     }
907
908     private void markMemberAvailable(final MemberName memberName) {
909         for (ShardInformation info : localShards.values()) {
910             String leaderId = info.getLeaderId();
911             if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
912                 LOG.debug("Marking Leader {} as available.", leaderId);
913                 info.setLeaderAvailable(true);
914             }
915         }
916     }
917
918     private void onDatastoreContextFactory(final DatastoreContextFactory factory) {
919         datastoreContextFactory = factory;
920         for (ShardInformation info : localShards.values()) {
921             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
922         }
923     }
924
925     private void onGetLocalShardIds() {
926         final List<String> response = new ArrayList<>(localShards.size());
927
928         for (ShardInformation info : localShards.values()) {
929             response.add(info.getShardId().toString());
930         }
931
932         getSender().tell(new Status.Success(response), getSelf());
933     }
934
935     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
936         final ShardIdentifier identifier = message.getShardId();
937
938         if (identifier != null) {
939             final ShardInformation info = localShards.get(identifier.getShardName());
940             if (info == null) {
941                 getSender().tell(new Status.Failure(
942                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
943                 return;
944             }
945
946             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
947         } else {
948             for (ShardInformation info : localShards.values()) {
949                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
950             }
951         }
952
953         getSender().tell(new Status.Success(null), getSelf());
954     }
955
956     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
957         final ActorRef actor = info.getActor();
958         if (actor != null) {
959             actor.tell(switchBehavior, getSelf());
960         } else {
961             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
962                 info.getShardName(), switchBehavior.getNewState());
963         }
964     }
965
966     /**
967      * Notifies all the local shards of a change in the schema context.
968      *
969      * @param message the message to send
970      */
971     private void updateSchemaContext(final UpdateSchemaContext message) {
972         schemaContext = message.getEffectiveModelContext();
973
974         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
975
976         for (ShardInformation info : localShards.values()) {
977             info.setSchemaContext(schemaContext);
978
979             if (info.getActor() == null) {
980                 LOG.debug("Creating Shard {}", info.getShardId());
981                 info.setActor(newShardActor(info));
982                 // Update peer address for every existing peer memeber to avoid missing sending
983                 // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp.
984                 String shardName = info.getShardName();
985                 for (MemberName memberName : peerAddressResolver.getPeerMembers()) {
986                     String peerId = getShardIdentifier(memberName, shardName).toString() ;
987                     String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName);
988                     info.updatePeerAddress(peerId, peerAddress, getSelf());
989                     LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}",
990                             persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
991                 }
992             } else {
993                 info.getActor().tell(message, getSelf());
994             }
995         }
996     }
997
998     @VisibleForTesting
999     protected ClusterWrapper getCluster() {
1000         return cluster;
1001     }
1002
1003     @VisibleForTesting
1004     protected ActorRef newShardActor(final ShardInformation info) {
1005         return getContext().actorOf(info.newProps().withDispatcher(shardDispatcherPath),
1006                 info.getShardId().toString());
1007     }
1008
1009     private void findPrimary(final FindPrimary message) {
1010         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
1011
1012         final String shardName = message.getShardName();
1013         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
1014
1015         // First see if the there is a local replica for the shard
1016         final ShardInformation info = localShards.get(shardName);
1017         if (info != null && info.isActiveMember()) {
1018             sendResponse(info, message.isWaitUntilReady(), true, () -> {
1019                 String primaryPath = info.getSerializedLeaderActor();
1020                 Object found = canReturnLocalShardState && info.isLeader()
1021                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().orElseThrow()) :
1022                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
1023
1024                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
1025                 return found;
1026             });
1027
1028             return;
1029         }
1030
1031         final Collection<String> visitedAddresses;
1032         if (message instanceof RemoteFindPrimary) {
1033             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1034         } else {
1035             visitedAddresses = new ArrayList<>(1);
1036         }
1037
1038         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1039
1040         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1041             if (visitedAddresses.contains(address)) {
1042                 continue;
1043             }
1044
1045             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1046                     persistenceId(), shardName, address, visitedAddresses);
1047
1048             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1049                     message.isWaitUntilReady(), visitedAddresses), getContext());
1050             return;
1051         }
1052
1053         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1054
1055         getSender().tell(new PrimaryNotFoundException(
1056                 String.format("No primary shard found for %s.", shardName)), getSelf());
1057     }
1058
1059     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1060         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1061                 .getShardInitializationTimeout().duration().$times(2));
1062
1063         Future<Object> futureObj = Patterns.ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1064         futureObj.onComplete(new OnComplete<>() {
1065             @Override
1066             public void onComplete(final Throwable failure, final Object response) {
1067                 if (failure != null) {
1068                     handler.onFailure(failure);
1069                 } else if (response instanceof RemotePrimaryShardFound msg) {
1070                     handler.onRemotePrimaryShardFound(msg);
1071                 } else if (response instanceof LocalPrimaryShardFound msg) {
1072                     handler.onLocalPrimaryFound(msg);
1073                 } else {
1074                     handler.onUnknownResponse(response);
1075                 }
1076             }
1077         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1078     }
1079
1080     /**
1081      * Construct the name of the shard actor given the name of the member on
1082      * which the shard resides and the name of the shard.
1083      *
1084      * @param memberName the member name
1085      * @param shardName the shard name
1086      * @return a b
1087      */
1088     private ShardIdentifier getShardIdentifier(final MemberName memberName, final String shardName) {
1089         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1090     }
1091
1092     /**
1093      * Create shards that are local to the member on which the ShardManager runs.
1094      */
1095     private void createLocalShards() {
1096         MemberName memberName = cluster.getCurrentMemberName();
1097         Collection<String> memberShardNames = configuration.getMemberShardNames(memberName);
1098
1099         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1100         if (restoreFromSnapshot != null) {
1101             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1102                 shardSnapshots.put(snapshot.getName(), snapshot);
1103             }
1104         }
1105
1106         // null out to GC
1107         restoreFromSnapshot = null;
1108
1109         for (String shardName : memberShardNames) {
1110             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1111
1112             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1113
1114             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1115             localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
1116                     newShardDatastoreContext(shardName), shardSnapshots));
1117         }
1118     }
1119
1120     @VisibleForTesting
1121     ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId,
1122                                         final Map<String, String> peerAddresses,
1123                                         final DatastoreContext datastoreContext,
1124                                         final Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
1125         return new ShardInformation(shardName, shardId, peerAddresses,
1126                 datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
1127                 peerAddressResolver);
1128     }
1129
1130     /**
1131      * Given the name of the shard find the addresses of all it's peers.
1132      *
1133      * @param shardName the shard name
1134      */
1135     Map<String, String> getPeerAddresses(final String shardName) {
1136         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1137         return getPeerAddresses(shardName, members);
1138     }
1139
1140     private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
1141         Map<String, String> peerAddresses = new HashMap<>();
1142         MemberName currentMemberName = cluster.getCurrentMemberName();
1143
1144         for (MemberName memberName : members) {
1145             if (!currentMemberName.equals(memberName)) {
1146                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1147                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1148                 peerAddresses.put(shardId.toString(), address);
1149             }
1150         }
1151         return peerAddresses;
1152     }
1153
1154     @Override
1155     public SupervisorStrategy supervisorStrategy() {
1156
1157         return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES),
1158                 (Function<Throwable, Directive>) t -> {
1159                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1160                     return SupervisorStrategy.resume();
1161                 });
1162     }
1163
1164     @Override
1165     public String persistenceId() {
1166         return persistenceId;
1167     }
1168
1169     @VisibleForTesting
1170     ShardManagerInfoMBean getMBean() {
1171         return shardManagerMBean;
1172     }
1173
1174     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1175         if (shardReplicaOperationsInProgress.contains(shardName)) {
1176             LOG.debug("{}: A shard replica operation for {} is already in progress", persistenceId(), shardName);
1177             sender.tell(new Status.Failure(new IllegalStateException(
1178                 String.format("A shard replica operation for %s is already in progress", shardName))), getSelf());
1179             return true;
1180         }
1181
1182         return false;
1183     }
1184
1185     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1186         final String shardName = shardReplicaMsg.getShardName();
1187
1188         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1189
1190         // verify the shard with the specified name is present in the cluster configuration
1191         if (!configuration.isShardConfigured(shardName)) {
1192             LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName);
1193             getSender().tell(new Status.Failure(new IllegalArgumentException(
1194                 "No module configuration exists for shard " + shardName)), getSelf());
1195             return;
1196         }
1197
1198         // Create the localShard
1199         if (schemaContext == null) {
1200             LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
1201                 persistenceId(), shardName);
1202             getSender().tell(new Status.Failure(new IllegalStateException(
1203                 "No SchemaContext is available in order to create a local shard instance for " + shardName)),
1204                 getSelf());
1205             return;
1206         }
1207
1208         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1209                 getSelf()) {
1210             @Override
1211             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1212                 final RunnableMessage runnable = (RunnableMessage) () ->
1213                     addShard(getShardName(), response, getSender());
1214                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1215                     getSelf().tell(runnable, getTargetActor());
1216                 }
1217             }
1218
1219             @Override
1220             public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
1221                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1222             }
1223         });
1224     }
1225
1226     private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
1227         LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
1228         sender.tell(new Status.Failure(new AlreadyExistsException(
1229             String.format("Local shard %s already exists", shardName))), getSelf());
1230     }
1231
1232     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1233         if (isShardReplicaOperationInProgress(shardName, sender)) {
1234             return;
1235         }
1236
1237         shardReplicaOperationsInProgress.add(shardName);
1238
1239         final ShardInformation shardInfo;
1240         final boolean removeShardOnFailure;
1241         ShardInformation existingShardInfo = localShards.get(shardName);
1242         if (existingShardInfo == null) {
1243             removeShardOnFailure = true;
1244             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1245
1246             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1247                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1248
1249             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1250                     Shard.builder(), peerAddressResolver);
1251             shardInfo.setActiveMember(false);
1252             shardInfo.setSchemaContext(schemaContext);
1253             localShards.put(shardName, shardInfo);
1254             shardInfo.setActor(newShardActor(shardInfo));
1255         } else {
1256             removeShardOnFailure = false;
1257             shardInfo = existingShardInfo;
1258         }
1259
1260         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1261     }
1262
1263     private void execAddShard(final String shardName,
1264                               final ShardInformation shardInfo,
1265                               final RemotePrimaryShardFound response,
1266                               final boolean removeShardOnFailure,
1267                               final ActorRef sender) {
1268
1269         final String localShardAddress =
1270                 peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1271
1272         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1273         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1274                 response.getPrimaryPath(), shardInfo.getShardId());
1275
1276         final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1277                 .getShardLeaderElectionTimeout().duration());
1278         final Future<Object> futureObj = Patterns.ask(getContext().actorSelection(response.getPrimaryPath()),
1279                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1280
1281         futureObj.onComplete(new OnComplete<>() {
1282             @Override
1283             public void onComplete(final Throwable failure, final Object addServerResponse) {
1284                 if (failure != null) {
1285                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1286                             response.getPrimaryPath(), shardName, failure);
1287
1288                     final String msg = String.format("AddServer request to leader %s for shard %s failed",
1289                             response.getPrimaryPath(), shardName);
1290                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1291                 } else {
1292                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1293                             response.getPrimaryPath(), removeShardOnFailure), sender);
1294                 }
1295             }
1296         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1297     }
1298
1299     private void onAddServerFailure(final String shardName, final String message, final Throwable failure,
1300             final ActorRef sender, final boolean removeShardOnFailure) {
1301         shardReplicaOperationsInProgress.remove(shardName);
1302
1303         if (removeShardOnFailure) {
1304             ShardInformation shardInfo = localShards.remove(shardName);
1305             if (shardInfo.getActor() != null) {
1306                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1307             }
1308         }
1309
1310         sender.tell(new Status.Failure(message == null ? failure :
1311             new RuntimeException(message, failure)), getSelf());
1312     }
1313
1314     private void onAddServerReply(final ShardInformation shardInfo, final AddServerReply replyMsg,
1315             final ActorRef sender, final String leaderPath, final boolean removeShardOnFailure) {
1316         String shardName = shardInfo.getShardName();
1317         shardReplicaOperationsInProgress.remove(shardName);
1318
1319         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1320
1321         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1322             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1323
1324             // Make the local shard voting capable
1325             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1326             shardInfo.setActiveMember(true);
1327             persistShardList();
1328
1329             sender.tell(new Status.Success(null), getSelf());
1330         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1331             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1332         } else {
1333             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1334                     persistenceId(), shardName, replyMsg.getStatus());
1335
1336             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1337                     shardInfo.getShardId());
1338
1339             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1340         }
1341     }
1342
1343     private static Exception getServerChangeException(final Class<?> serverChange,
1344             final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) {
1345         return switch (serverChangeStatus) {
1346             case TIMEOUT -> new TimeoutException("""
1347                 The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible \
1348                 causes - there was a problem replicating the data or shard leadership changed while replicating the \
1349                 shard data""".formatted(leaderPath, shardId.getShardName()));
1350             case NO_LEADER -> new NoShardLeaderException(shardId);
1351             case NOT_SUPPORTED -> new UnsupportedOperationException(
1352                 "%s request is not supported for shard %s".formatted(
1353                     serverChange.getSimpleName(), shardId.getShardName()));
1354             default -> new RuntimeException("%s request to leader %s for shard %s failed with status %s".formatted(
1355                 serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1356         };
1357     }
1358
1359     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1360         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1361
1362         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1363                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1364             @Override
1365             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1366                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1367             }
1368
1369             @Override
1370             public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
1371                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1372             }
1373
1374             private void doRemoveShardReplicaAsync(final String primaryPath) {
1375                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1376                         primaryPath, getSender()), getTargetActor());
1377             }
1378         });
1379     }
1380
1381     private void persistShardList() {
1382         List<String> shardList = new ArrayList<>(localShards.keySet());
1383         for (ShardInformation shardInfo : localShards.values()) {
1384             if (!shardInfo.isActiveMember()) {
1385                 shardList.remove(shardInfo.getShardName());
1386             }
1387         }
1388         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1389         saveSnapshot(updateShardManagerSnapshot(shardList));
1390     }
1391
1392     private ShardManagerSnapshot updateShardManagerSnapshot(final List<String> shardList) {
1393         currentSnapshot = new ShardManagerSnapshot(shardList);
1394         return currentSnapshot;
1395     }
1396
1397     private void applyShardManagerSnapshot(final ShardManagerSnapshot snapshot) {
1398         currentSnapshot = snapshot;
1399
1400         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1401
1402         final MemberName currentMember = cluster.getCurrentMemberName();
1403         Set<String> configuredShardList =
1404             new HashSet<>(configuration.getMemberShardNames(currentMember));
1405         for (String shard : currentSnapshot.getShardList()) {
1406             if (!configuredShardList.contains(shard)) {
1407                 // add the current member as a replica for the shard
1408                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1409                 configuration.addMemberReplicaForShard(shard, currentMember);
1410             } else {
1411                 configuredShardList.remove(shard);
1412             }
1413         }
1414         for (String shard : configuredShardList) {
1415             // remove the member as a replica for the shard
1416             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1417             configuration.removeMemberReplicaForShard(shard, currentMember);
1418         }
1419     }
1420
1421     private void onSaveSnapshotSuccess(final SaveSnapshotSuccess successMessage) {
1422         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1423             persistenceId());
1424         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1425             0, 0));
1426     }
1427
1428     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1429         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1430
1431         String shardName = changeMembersVotingStatus.getShardName();
1432         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1433         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1434             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1435                     e.getValue());
1436         }
1437
1438         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1439
1440         findLocalShard(shardName, getSender(),
1441             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1442             localShardFound.getPath(), getSender()));
1443     }
1444
1445     private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) {
1446         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1447
1448         ActorRef sender = getSender();
1449         final String shardName = flipMembersVotingStatus.getShardName();
1450         findLocalShard(shardName, sender, localShardFound -> {
1451             Future<Object> future = Patterns.ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1452                     Timeout.apply(30, TimeUnit.SECONDS));
1453
1454             future.onComplete(new OnComplete<>() {
1455                 @Override
1456                 public void onComplete(final Throwable failure, final Object response) {
1457                     if (failure != null) {
1458                         sender.tell(new Status.Failure(new RuntimeException(
1459                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1460                         return;
1461                     }
1462
1463                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1464                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1465                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1466                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1467                     }
1468
1469                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1470                             .toString(), !raftState.isVoting());
1471
1472                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1473                             shardName, localShardFound.getPath(), sender);
1474                 }
1475             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1476         });
1477
1478     }
1479
1480     private void findLocalShard(final FindLocalShard message) {
1481         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
1482
1483         final ShardInformation shardInformation = localShards.get(message.getShardName());
1484
1485         if (shardInformation == null) {
1486             LOG.debug("{}: Local shard {} not found - shards present: {}",
1487                     persistenceId(), message.getShardName(), localShards.keySet());
1488
1489             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1490             return;
1491         }
1492
1493         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1494             () -> new LocalShardFound(shardInformation.getActor()));
1495     }
1496
1497     private void findLocalShard(final String shardName, final ActorRef sender,
1498             final Consumer<LocalShardFound> onLocalShardFound) {
1499         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1500                 .getShardInitializationTimeout().duration().$times(2));
1501
1502         Future<Object> futureObj = Patterns.ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1503         futureObj.onComplete(new OnComplete<>() {
1504             @Override
1505             public void onComplete(final Throwable failure, final Object response) {
1506                 if (failure != null) {
1507                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1508                         failure);
1509                     sender.tell(new Status.Failure(new RuntimeException(
1510                         String.format("Failed to find local shard %s", shardName), failure)), self());
1511                 } if (response instanceof LocalShardFound msg) {
1512                     getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept(msg), sender);
1513                 } else if (response instanceof LocalShardNotFound) {
1514                     LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName);
1515                     sender.tell(new Status.Failure(new IllegalArgumentException(
1516                         String.format("Local shard %s does not exist", shardName))), self());
1517                 } else {
1518                     LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName,
1519                         response);
1520                     sender.tell(new Status.Failure(response instanceof Throwable throwable ? throwable
1521                         : new RuntimeException(String.format("Failed to find local shard %s: received response: %s",
1522                             shardName, response))), self());
1523                 }
1524             }
1525         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1526     }
1527
1528     private void changeShardMembersVotingStatus(final ChangeServersVotingStatus changeServersVotingStatus,
1529             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1530         if (isShardReplicaOperationInProgress(shardName, sender)) {
1531             return;
1532         }
1533
1534         shardReplicaOperationsInProgress.add(shardName);
1535
1536         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1537         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1538
1539         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1540                 changeServersVotingStatus, shardActorRef.path());
1541
1542         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1543         Future<Object> futureObj = Patterns.ask(shardActorRef, changeServersVotingStatus, timeout);
1544
1545         futureObj.onComplete(new OnComplete<>() {
1546             @Override
1547             public void onComplete(final Throwable failure, final Object response) {
1548                 shardReplicaOperationsInProgress.remove(shardName);
1549                 if (failure != null) {
1550                     LOG.debug("{}: ChangeServersVotingStatus request to local shard {} failed", persistenceId(),
1551                         shardActorRef.path(), failure);
1552                     sender.tell(new Status.Failure(new RuntimeException(
1553                         String.format("ChangeServersVotingStatus request to local shard %s failed",
1554                             shardActorRef.path()), failure)), self());
1555                 } else {
1556                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1557
1558                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1559                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1560                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1561                         sender.tell(new Status.Success(null), getSelf());
1562                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1563                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1564                                 "The requested voting state change for shard %s is invalid. At least one member "
1565                                 + "must be voting", shardId.getShardName()))), getSelf());
1566                     } else {
1567                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1568                                 persistenceId(), shardName, replyMsg.getStatus());
1569
1570                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1571                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1572                         sender.tell(new Status.Failure(error), getSelf());
1573                     }
1574                 }
1575             }
1576         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1577     }
1578
1579     private static final class ForwardedAddServerReply {
1580         ShardInformation shardInfo;
1581         AddServerReply addServerReply;
1582         String leaderPath;
1583         boolean removeShardOnFailure;
1584
1585         ForwardedAddServerReply(final ShardInformation shardInfo, final AddServerReply addServerReply,
1586             final String leaderPath, final boolean removeShardOnFailure) {
1587             this.shardInfo = shardInfo;
1588             this.addServerReply = addServerReply;
1589             this.leaderPath = leaderPath;
1590             this.removeShardOnFailure = removeShardOnFailure;
1591         }
1592     }
1593
1594     private static final class ForwardedAddServerFailure {
1595         String shardName;
1596         String failureMessage;
1597         Throwable failure;
1598         boolean removeShardOnFailure;
1599
1600         ForwardedAddServerFailure(final String shardName, final String failureMessage, final Throwable failure,
1601                 final boolean removeShardOnFailure) {
1602             this.shardName = shardName;
1603             this.failureMessage = failureMessage;
1604             this.failure = failure;
1605             this.removeShardOnFailure = removeShardOnFailure;
1606         }
1607     }
1608
1609     static class OnShardInitialized {
1610         private final Runnable replyRunnable;
1611         private Cancellable timeoutSchedule;
1612
1613         OnShardInitialized(final Runnable replyRunnable) {
1614             this.replyRunnable = replyRunnable;
1615         }
1616
1617         Runnable getReplyRunnable() {
1618             return replyRunnable;
1619         }
1620
1621         Cancellable getTimeoutSchedule() {
1622             return timeoutSchedule;
1623         }
1624
1625         void setTimeoutSchedule(final Cancellable timeoutSchedule) {
1626             this.timeoutSchedule = timeoutSchedule;
1627         }
1628     }
1629
1630     static class OnShardReady extends OnShardInitialized {
1631         OnShardReady(final Runnable replyRunnable) {
1632             super(replyRunnable);
1633         }
1634     }
1635
1636     private interface RunnableMessage extends Runnable {
1637     }
1638
1639     /**
1640      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1641      * a remote or local find primary message is processed.
1642      */
1643     private interface FindPrimaryResponseHandler {
1644         /**
1645          * Invoked when a Failure message is received as a response.
1646          *
1647          * @param failure the failure exception
1648          */
1649         void onFailure(Throwable failure);
1650
1651         /**
1652          * Invoked when a RemotePrimaryShardFound response is received.
1653          *
1654          * @param response the response
1655          */
1656         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1657
1658         /**
1659          * Invoked when a LocalPrimaryShardFound response is received.
1660          *
1661          * @param response the response
1662          */
1663         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1664
1665         /**
1666          * Invoked when an unknown response is received. This is another type of failure.
1667          *
1668          * @param response the response
1669          */
1670         void onUnknownResponse(Object response);
1671     }
1672
1673     /**
1674      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1675      * replica and sends a wrapped Failure response to some targetActor.
1676      */
1677     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1678         private final ActorRef targetActor;
1679         private final String shardName;
1680         private final String persistenceId;
1681         private final ActorRef shardManagerActor;
1682
1683         /**
1684          * Constructs an instance.
1685          *
1686          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1687          * @param shardName The name of the shard for which the primary replica had to be found
1688          * @param persistenceId The persistenceId for the ShardManager
1689          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1690          */
1691         protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
1692                 final String persistenceId, final ActorRef shardManagerActor) {
1693             this.targetActor = requireNonNull(targetActor);
1694             this.shardName = requireNonNull(shardName);
1695             this.persistenceId = requireNonNull(persistenceId);
1696             this.shardManagerActor = requireNonNull(shardManagerActor);
1697         }
1698
1699         public ActorRef getTargetActor() {
1700             return targetActor;
1701         }
1702
1703         public String getShardName() {
1704             return shardName;
1705         }
1706
1707         @Override
1708         public void onFailure(final Throwable failure) {
1709             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1710             targetActor.tell(new Status.Failure(new RuntimeException(
1711                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1712         }
1713
1714         @Override
1715         public void onUnknownResponse(final Object response) {
1716             LOG.debug("{}: Failed to find leader for shard {}: received response: {}", persistenceId, shardName,
1717                 response);
1718             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
1719                     : new RuntimeException(String.format("Failed to find leader for shard %s: received response: %s",
1720                         shardName, response))), shardManagerActor);
1721         }
1722     }
1723
1724     /**
1725      * The WrappedShardResponse class wraps a response from a Shard.
1726      */
1727     private static final class WrappedShardResponse {
1728         private final ShardIdentifier shardId;
1729         private final Object response;
1730         private final String leaderPath;
1731
1732         WrappedShardResponse(final ShardIdentifier shardId, final Object response, final String leaderPath) {
1733             this.shardId = shardId;
1734             this.response = response;
1735             this.leaderPath = leaderPath;
1736         }
1737
1738         ShardIdentifier getShardId() {
1739             return shardId;
1740         }
1741
1742         Object getResponse() {
1743             return response;
1744         }
1745
1746         String getLeaderPath() {
1747             return leaderPath;
1748         }
1749     }
1750
1751     private static final class ShardNotInitializedTimeout {
1752         private final ActorRef sender;
1753         private final ShardInformation shardInfo;
1754         private final OnShardInitialized onShardInitialized;
1755
1756         ShardNotInitializedTimeout(final ShardInformation shardInfo, final OnShardInitialized onShardInitialized,
1757             final ActorRef sender) {
1758             this.sender = sender;
1759             this.shardInfo = shardInfo;
1760             this.onShardInitialized = onShardInitialized;
1761         }
1762
1763         ActorRef getSender() {
1764             return sender;
1765         }
1766
1767         ShardInformation getShardInfo() {
1768             return shardInfo;
1769         }
1770
1771         OnShardInitialized getOnShardInitialized() {
1772             return onShardInitialized;
1773         }
1774     }
1775 }
1776
1777
1778