0e7d3a3dc07c0bb0027225098435fcbe8fb211e5
[controller.git] /
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.shardmanager;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.util.concurrent.SettableFuture;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.HashMap;
17 import java.util.HashSet;
18 import java.util.List;
19 import java.util.Map;
20 import java.util.Map.Entry;
21 import java.util.Set;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import java.util.function.Consumer;
25 import java.util.function.Supplier;
26 import org.apache.pekko.actor.ActorRef;
27 import org.apache.pekko.actor.Address;
28 import org.apache.pekko.actor.Cancellable;
29 import org.apache.pekko.actor.OneForOneStrategy;
30 import org.apache.pekko.actor.PoisonPill;
31 import org.apache.pekko.actor.Status;
32 import org.apache.pekko.actor.SupervisorStrategy;
33 import org.apache.pekko.actor.SupervisorStrategy.Directive;
34 import org.apache.pekko.cluster.ClusterEvent;
35 import org.apache.pekko.cluster.ClusterEvent.MemberWeaklyUp;
36 import org.apache.pekko.cluster.Member;
37 import org.apache.pekko.dispatch.Futures;
38 import org.apache.pekko.dispatch.OnComplete;
39 import org.apache.pekko.japi.Function;
40 import org.apache.pekko.pattern.Patterns;
41 import org.apache.pekko.persistence.DeleteSnapshotsFailure;
42 import org.apache.pekko.persistence.DeleteSnapshotsSuccess;
43 import org.apache.pekko.persistence.RecoveryCompleted;
44 import org.apache.pekko.persistence.SaveSnapshotFailure;
45 import org.apache.pekko.persistence.SaveSnapshotSuccess;
46 import org.apache.pekko.persistence.SnapshotOffer;
47 import org.apache.pekko.persistence.SnapshotSelectionCriteria;
48 import org.apache.pekko.util.Timeout;
49 import org.opendaylight.controller.cluster.access.concepts.MemberName;
50 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
51 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
52 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
53 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
54 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
55 import org.opendaylight.controller.cluster.datastore.Shard;
56 import org.opendaylight.controller.cluster.datastore.config.Configuration;
57 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
58 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
59 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
60 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
61 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
64 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
65 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
66 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
67 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
68 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
69 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
70 import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
71 import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
72 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
73 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
75 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
76 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
77 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
78 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
79 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
80 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
81 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
82 import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
83 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
84 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
86 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
87 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
88 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
89 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
90 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
91 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
92 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
93 import org.opendaylight.controller.cluster.raft.messages.AddServer;
94 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
95 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
96 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
97 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
98 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
99 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
100 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
101 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
102 import org.opendaylight.yangtools.concepts.Registration;
103 import org.opendaylight.yangtools.yang.common.Empty;
104 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
105 import org.slf4j.Logger;
106 import org.slf4j.LoggerFactory;
107 import scala.concurrent.Future;
108 import scala.concurrent.duration.FiniteDuration;
109
110 /**
111  * Manages the shards for a data store. The ShardManager has the following jobs:
112  * <ul>
113  * <li> Create all the local shard replicas that belong on this cluster member
114  * <li> Find the address of the local shard
115  * <li> Find the primary replica for any given shard
116  * <li> Monitor the cluster members and store their addresses
117  * </ul>
118  */
119 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
120     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
121
122     // Stores a mapping between a shard name and it's corresponding information
123     // Shard names look like inventory, topology etc and are as specified in
124     // configuration
125     @VisibleForTesting
126     final Map<String, ShardInformation> localShards = new HashMap<>();
127
128     // The type of a ShardManager reflects the type of the datastore itself
129     // A data store could be of type config/operational
130     private final String type;
131
132     private final ClusterWrapper cluster;
133
134     private final Configuration configuration;
135
136     @VisibleForTesting
137     final String shardDispatcherPath;
138
139     private final ShardManagerInfo shardManagerMBean;
140
141     private DatastoreContextFactory datastoreContextFactory;
142
143     private final SettableFuture<Empty> readinessFuture;
144
145     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
146
147     @VisibleForTesting
148     final ShardPeerAddressResolver peerAddressResolver;
149
150     private EffectiveModelContext modelContext;
151
152     private DatastoreSnapshot restoreFromSnapshot;
153
154     private ShardManagerSnapshot currentSnapshot;
155
156     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
157
158     private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
159
160     private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
161
162     private ShardManager(final  String possiblePersistenceId, final AbstractShardManagerCreator<?> builder) {
163         super(possiblePersistenceId != null ? possiblePersistenceId
164             : "shard-manager-" + builder.getDatastoreContextFactory().getBaseDatastoreContext().getDataStoreName());
165
166         cluster = builder.getCluster();
167         configuration = builder.getConfiguration();
168         datastoreContextFactory = builder.getDatastoreContextFactory();
169         type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
170         shardDispatcherPath = new Dispatchers(getContext().system().dispatchers())
171             .getDispatcherPath(Dispatchers.DispatcherType.Shard);
172         readinessFuture = builder.getReadinessFuture();
173         primaryShardInfoCache = builder.getPrimaryShardInfoCache();
174         restoreFromSnapshot = builder.getRestoreFromSnapshot();
175
176         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
177
178         // Subscribe this actor to cluster member events
179         cluster.subscribeToMemberEvents(self());
180
181         shardManagerMBean = new ShardManagerInfo(self(), cluster.getCurrentMemberName(),
182                 "shard-manager-" + type,
183                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
184         shardManagerMBean.registerMBean();
185     }
186
187     ShardManager(final AbstractShardManagerCreator<?> builder) {
188         this(builder.getDatastoreContextFactory().getBaseDatastoreContext().getShardManagerPersistenceId(),
189             builder);
190     }
191
192     @Override
193     @Deprecated(since = "11.0.0", forRemoval = true)
194     public final ActorRef getSender() {
195         return super.getSender();
196     }
197
198     @Override
199     public void preStart() {
200         LOG.info("Starting ShardManager {}", persistenceId());
201     }
202
203     @Override
204     public void postStop() {
205         LOG.info("Stopping ShardManager {}", persistenceId());
206         shardManagerMBean.unregisterMBean();
207     }
208
209     @Override
210     public void handleCommand(final Object message) throws Exception {
211         if (message instanceof FindPrimary msg) {
212             findPrimary(msg);
213         } else if (message instanceof FindLocalShard msg) {
214             findLocalShard(msg);
215         } else if (message instanceof UpdateSchemaContext msg) {
216             updateSchemaContext(msg);
217         } else if (message instanceof ActorInitialized msg) {
218             onActorInitialized(msg);
219         } else if (message instanceof ClusterEvent.MemberUp msg) {
220             memberUp(msg);
221         } else if (message instanceof ClusterEvent.MemberWeaklyUp msg) {
222             memberWeaklyUp(msg);
223         } else if (message instanceof ClusterEvent.MemberExited msg) {
224             memberExited(msg);
225         } else if (message instanceof ClusterEvent.MemberRemoved msg) {
226             memberRemoved(msg);
227         } else if (message instanceof ClusterEvent.UnreachableMember msg) {
228             memberUnreachable(msg);
229         } else if (message instanceof ClusterEvent.ReachableMember msg) {
230             memberReachable(msg);
231         } else if (message instanceof DatastoreContextFactory msg) {
232             onDatastoreContextFactory(msg);
233         } else if (message instanceof RoleChangeNotification msg) {
234             onRoleChangeNotification(msg);
235         } else if (message instanceof FollowerInitialSyncUpStatus msg) {
236             onFollowerInitialSyncStatus(msg);
237         } else if (message instanceof ShardNotInitializedTimeout msg) {
238             onShardNotInitializedTimeout(msg);
239         } else if (message instanceof ShardLeaderStateChanged msg) {
240             onLeaderStateChanged(msg);
241         } else if (message instanceof SwitchShardBehavior msg) {
242             onSwitchShardBehavior(msg);
243         } else if (message instanceof CreateShard msg) {
244             onCreateShard(msg);
245         } else if (message instanceof AddShardReplica msg) {
246             onAddShardReplica(msg);
247         } else if (message instanceof ForwardedAddServerReply msg) {
248             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure);
249         } else if (message instanceof ForwardedAddServerFailure msg) {
250             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
251         } else if (message instanceof RemoveShardReplica msg) {
252             onRemoveShardReplica(msg);
253         } else if (message instanceof WrappedShardResponse msg) {
254             onWrappedShardResponse(msg);
255         } else if (message instanceof GetSnapshot msg) {
256             onGetSnapshot(msg);
257         } else if (message instanceof ServerRemoved msg) {
258             onShardReplicaRemoved(msg);
259         } else if (message instanceof ChangeShardMembersVotingStatus msg) {
260             onChangeShardServersVotingStatus(msg);
261         } else if (message instanceof FlipShardMembersVotingStatus msg) {
262             onFlipShardMembersVotingStatus(msg);
263         } else if (message instanceof SaveSnapshotSuccess msg) {
264             onSaveSnapshotSuccess(msg);
265         } else if (message instanceof SaveSnapshotFailure msg) {
266             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), msg.cause());
267         } else if (message instanceof Shutdown) {
268             onShutDown();
269         } else if (message instanceof GetLocalShardIds) {
270             onGetLocalShardIds();
271         } else if (message instanceof GetShardRole msg) {
272             onGetShardRole(msg);
273         } else if (message instanceof RunnableMessage msg) {
274             msg.run();
275         } else if (message instanceof RegisterForShardAvailabilityChanges msg) {
276             onRegisterForShardAvailabilityChanges(msg);
277         } else if (message instanceof DeleteSnapshotsFailure msg) {
278             LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), msg.cause());
279         } else if (message instanceof DeleteSnapshotsSuccess) {
280             LOG.debug("{}: Successfully deleted prior snapshots", persistenceId());
281         } else if (message instanceof RegisterRoleChangeListenerReply) {
282             LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
283         } else if (message instanceof ClusterEvent.MemberEvent msg) {
284             LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), msg);
285         } else {
286             unknownMessage(message);
287         }
288     }
289
290     private void onRegisterForShardAvailabilityChanges(final RegisterForShardAvailabilityChanges message) {
291         LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
292
293         final Consumer<String> callback = message.getCallback();
294         shardAvailabilityCallbacks.add(callback);
295
296         getSender().tell(new Status.Success((Registration)
297             () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self());
298     }
299
300     private void onGetShardRole(final GetShardRole message) {
301         LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
302
303         final String name = message.getName();
304
305         final ShardInformation shardInformation = localShards.get(name);
306
307         if (shardInformation == null) {
308             LOG.info("{}: no shard information for {} found", persistenceId(), name);
309             getSender().tell(new Status.Failure(
310                     new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender());
311             return;
312         }
313
314         getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
315     }
316
317     void onShutDown() {
318         final var stopFutures = new ArrayList<Future<Boolean>>(localShards.size());
319         for (var info : localShards.values()) {
320             if (info.getActor() != null) {
321                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
322                 stopFutures.add(Patterns.gracefulStop(info.getActor(), info.getDatastoreContext().getShardRaftConfig()
323                     .getElectionTimeOutInterval().$times(2), Shutdown.INSTANCE));
324             }
325         }
326
327         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
328
329         final var dispatcher = new Dispatchers(context().system().dispatchers())
330                 .getDispatcher(Dispatchers.DispatcherType.Client);
331         Futures.sequence(stopFutures, dispatcher).onComplete(new OnComplete<Iterable<Boolean>>() {
332             @Override
333             public void onComplete(final Throwable failure, final Iterable<Boolean> results) {
334                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
335
336                 self().tell(PoisonPill.getInstance(), self());
337
338                 if (failure != null) {
339                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
340                     return;
341                 }
342
343                 int nfailed = 0;
344                 for (Boolean result : results) {
345                     if (!result) {
346                         nfailed++;
347                     }
348                 }
349
350                 if (nfailed > 0) {
351                     LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
352                 }
353             }
354         }, dispatcher);
355     }
356
357     private void onWrappedShardResponse(final WrappedShardResponse message) {
358         if (message.getResponse() instanceof RemoveServerReply) {
359             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
360                     message.getLeaderPath());
361         }
362     }
363
364     private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId,
365             final RemoveServerReply replyMsg, final String leaderPath) {
366         shardReplicaOperationsInProgress.remove(shardId.getShardName());
367
368         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
369
370         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
371             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
372                     shardId.getShardName());
373             originalSender.tell(new Status.Success(null), self());
374         } else {
375             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
376                     persistenceId(), shardId, replyMsg.getStatus());
377
378             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
379             originalSender.tell(new Status.Failure(failure), self());
380         }
381     }
382
383     private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
384             final String primaryPath, final ActorRef sender) {
385         if (isShardReplicaOperationInProgress(shardName, sender)) {
386             return;
387         }
388
389         shardReplicaOperationsInProgress.add(shardName);
390
391         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
392
393         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
394
395         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
396         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
397                 primaryPath, shardId);
398
399         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
400         Future<Object> futureObj = Patterns.ask(getContext().actorSelection(primaryPath),
401                 new RemoveServer(shardId.toString()), removeServerTimeout);
402
403         futureObj.onComplete(new OnComplete<>() {
404             @Override
405             public void onComplete(final Throwable failure, final Object response) {
406                 if (failure != null) {
407                     shardReplicaOperationsInProgress.remove(shardName);
408                     LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
409                         shardName, failure);
410
411                     // FAILURE
412                     sender.tell(new Status.Failure(new RuntimeException(
413                         String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
414                         failure)), self());
415                 } else {
416                     // SUCCESS
417                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
418                 }
419             }
420         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
421     }
422
423     private void onShardReplicaRemoved(final ServerRemoved message) {
424         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
425     }
426
427     @SuppressWarnings("checkstyle:IllegalCatch")
428     private void removeShard(final ShardIdentifier shardId) {
429         final String shardName = shardId.getShardName();
430         final ShardInformation shardInformation = localShards.remove(shardName);
431         if (shardInformation == null) {
432             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
433             return;
434         }
435
436         final ActorRef shardActor = shardInformation.getActor();
437         if (shardActor != null) {
438             long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
439                     .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
440
441             LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
442                     timeoutInMS);
443
444             final var stopFuture = Patterns.gracefulStop(shardActor,
445                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
446
447             final var onComplete = new CompositeOnComplete<Boolean>() {
448                 @Override
449                 public void onComplete(final Throwable failure, final Boolean result) {
450                     if (failure == null) {
451                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
452                     } else {
453                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
454                     }
455
456                     self().tell((RunnableMessage) () -> {
457                         // At any rate, invalidate primaryShardInfo cache
458                         primaryShardInfoCache.remove(shardName);
459
460                         shardActorsStopping.remove(shardName);
461                         notifyOnCompleteTasks(failure, result);
462                     }, ActorRef.noSender());
463                 }
464             };
465
466             shardActorsStopping.put(shardName, onComplete);
467             stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
468                     .getDispatcher(Dispatchers.DispatcherType.Client));
469         }
470
471         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
472         persistShardList();
473     }
474
475     private void onGetSnapshot(final GetSnapshot getSnapshot) {
476         LOG.debug("{}: onGetSnapshot", persistenceId());
477
478         List<String> notInitialized = null;
479         for (var shardInfo : localShards.values()) {
480             if (!shardInfo.isShardInitialized()) {
481                 if (notInitialized == null) {
482                     notInitialized = new ArrayList<>();
483                 }
484
485                 notInitialized.add(shardInfo.getShardName());
486             }
487         }
488
489         if (notInitialized != null) {
490             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
491                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), self());
492             return;
493         }
494
495         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
496                 new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
497                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
498
499         for (var shardInfo : localShards.values()) {
500             shardInfo.getActor().tell(getSnapshot, replyActor);
501         }
502     }
503
504     @SuppressWarnings("checkstyle:IllegalCatch")
505     private void onCreateShard(final CreateShard createShard) {
506         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
507
508         Object reply;
509         try {
510             String shardName = createShard.getModuleShardConfig().getShardName();
511             if (localShards.containsKey(shardName)) {
512                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
513                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
514             } else {
515                 doCreateShard(createShard);
516                 reply = new Status.Success(null);
517             }
518         } catch (Exception e) {
519             LOG.error("{}: onCreateShard failed", persistenceId(), e);
520             reply = new Status.Failure(e);
521         }
522
523         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
524             getSender().tell(reply, self());
525         }
526     }
527
528     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
529         final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
530         if (stopOnComplete == null) {
531             return false;
532         }
533
534         LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
535                 shardName, messageToDefer);
536         final ActorRef sender = getSender();
537         stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
538             @Override
539             public void onComplete(final Throwable failure, final Boolean result) {
540                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
541                 self().tell(messageToDefer, sender);
542             }
543         });
544
545         return true;
546     }
547
548     private void doCreateShard(final CreateShard createShard) {
549         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
550         final String shardName = moduleShardConfig.getShardName();
551
552         configuration.addModuleShardConfiguration(moduleShardConfig);
553
554         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
555         if (shardDatastoreContext == null) {
556             shardDatastoreContext = newShardDatastoreContext(shardName);
557         } else {
558             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
559                     peerAddressResolver).build();
560         }
561
562         final var shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
563
564         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
565                 && currentSnapshot.getShardList().contains(shardName);
566
567         Map<String, String> peerAddresses;
568         boolean isActiveMember;
569         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
570                 .contains(cluster.getCurrentMemberName())) {
571             peerAddresses = getPeerAddresses(shardName);
572             isActiveMember = true;
573         } else {
574             // The local member is not in the static shard member configuration and the shard did not
575             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
576             // the shard with no peers and with elections disabled so it stays as follower. A
577             // subsequent AddServer request will be needed to make it an active member.
578             isActiveMember = false;
579             peerAddresses = Map.of();
580             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
581                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
582         }
583
584         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
585                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
586                 isActiveMember);
587
588         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
589                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
590         info.setActiveMember(isActiveMember);
591         localShards.put(info.getShardName(), info);
592
593         if (modelContext != null) {
594             info.setSchemaContext(modelContext);
595             info.setActor(newShardActor(info));
596         }
597     }
598
599     private DatastoreContext.Builder newShardDatastoreContextBuilder(final String shardName) {
600         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
601                 .shardPeerAddressResolver(peerAddressResolver);
602     }
603
604     private DatastoreContext newShardDatastoreContext(final String shardName) {
605         return newShardDatastoreContextBuilder(shardName).build();
606     }
607
608     private void checkReady() {
609         if (isReadyWithLeaderId()) {
610             LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type);
611             readinessFuture.set(Empty.value());
612         }
613     }
614
615     private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) {
616         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
617
618         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
619         if (shardInformation != null) {
620             shardInformation.setLocalDataTree(leaderStateChanged.localShardDataTree());
621             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
622             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
623                 primaryShardInfoCache.remove(shardInformation.getShardName());
624
625                 notifyShardAvailabilityCallbacks(shardInformation);
626             }
627
628             checkReady();
629         } else {
630             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
631         }
632     }
633
634     private void notifyShardAvailabilityCallbacks(final ShardInformation shardInformation) {
635         shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName()));
636     }
637
638     private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
639         ShardInformation shardInfo = message.getShardInfo();
640
641         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
642                 shardInfo.getShardName());
643
644         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
645
646         if (!shardInfo.isShardInitialized()) {
647             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
648             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), self());
649         } else {
650             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
651             message.getSender().tell(new NoShardLeaderException(shardInfo.getShardId()), self());
652         }
653     }
654
655     private void onFollowerInitialSyncStatus(final FollowerInitialSyncUpStatus status) {
656         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
657                 status.getName(), status.isInitialSyncDone());
658
659         ShardInformation shardInformation = findShardInformation(status.getName());
660
661         if (shardInformation != null) {
662             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
663
664             shardManagerMBean.setSyncStatus(isInSync());
665         }
666
667     }
668
669     private void onRoleChangeNotification(final RoleChangeNotification roleChanged) {
670         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
671                 roleChanged.getOldRole(), roleChanged.getNewRole());
672
673         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
674         if (shardInformation != null) {
675             shardInformation.setRole(roleChanged.getNewRole());
676             checkReady();
677             shardManagerMBean.setSyncStatus(isInSync());
678         }
679     }
680
681
682     private ShardInformation findShardInformation(final String memberId) {
683         for (ShardInformation info : localShards.values()) {
684             if (info.getShardId().toString().equals(memberId)) {
685                 return info;
686             }
687         }
688
689         return null;
690     }
691
692     private boolean isReadyWithLeaderId() {
693         boolean isReady = true;
694         for (ShardInformation info : localShards.values()) {
695             if (!info.isShardReadyWithLeaderId()) {
696                 isReady = false;
697                 break;
698             }
699         }
700         return isReady;
701     }
702
703     private boolean isInSync() {
704         for (ShardInformation info : localShards.values()) {
705             if (!info.isInSync()) {
706                 return false;
707             }
708         }
709         return true;
710     }
711
712     private void onActorInitialized(final ActorInitialized message) {
713         final var sender = message.actorRef();
714
715         String actorName = sender.path().name();
716         //find shard name from actor name; actor name is stringified shardId
717
718         final ShardIdentifier shardId;
719         try {
720             shardId = ShardIdentifier.fromShardIdString(actorName);
721         } catch (IllegalArgumentException e) {
722             LOG.debug("{}: ignoring actor {}", persistenceId(), actorName, e);
723             return;
724         }
725
726         markShardAsInitialized(shardId.getShardName());
727     }
728
729     private void markShardAsInitialized(final String shardName) {
730         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
731
732         ShardInformation shardInformation = localShards.get(shardName);
733         if (shardInformation != null) {
734             shardInformation.setActorInitialized();
735
736             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
737         }
738     }
739
740     @Override
741     protected void handleRecover(final Object message) throws Exception {
742         if (message instanceof RecoveryCompleted) {
743             onRecoveryCompleted();
744         } else if (message instanceof SnapshotOffer msg) {
745             applyShardManagerSnapshot((ShardManagerSnapshot) msg.snapshot());
746         }
747     }
748
749     @SuppressWarnings("checkstyle:IllegalCatch")
750     private void onRecoveryCompleted() {
751         LOG.info("Recovery complete : {}", persistenceId());
752
753         if (currentSnapshot == null && restoreFromSnapshot != null
754                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
755             ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
756
757             LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
758
759             applyShardManagerSnapshot(snapshot);
760         }
761
762         createLocalShards();
763     }
764
765     private void sendResponse(final ShardInformation shardInformation, final boolean doWait,
766             final boolean wantShardReady, final Supplier<Object> messageSupplier) {
767         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
768             if (doWait) {
769                 final ActorRef sender = getSender();
770                 final ActorRef self = self();
771
772                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
773
774                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
775                     new OnShardInitialized(replyRunnable);
776
777                 shardInformation.addOnShardInitialized(onShardInitialized);
778
779                 FiniteDuration timeout = shardInformation.getDatastoreContext()
780                         .getShardInitializationTimeout().duration();
781                 if (shardInformation.isShardInitialized()) {
782                     // If the shard is already initialized then we'll wait enough time for the shard to
783                     // elect a leader, ie 2 times the election timeout.
784                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
785                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
786                 }
787
788                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
789                         shardInformation);
790
791                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
792                         timeout, self(),
793                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
794                         getContext().dispatcher(), self());
795
796                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
797
798             } else if (!shardInformation.isShardInitialized()) {
799                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
800                         shardInformation.getShardName());
801                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), self());
802             } else {
803                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
804                         shardInformation.getShardName());
805                 getSender().tell(new NoShardLeaderException(shardInformation.getShardId()), self());
806             }
807
808             return;
809         }
810
811         getSender().tell(messageSupplier.get(), self());
812     }
813
814     private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
815         return new NotInitializedException(String.format(
816                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
817     }
818
819     @VisibleForTesting
820     static MemberName memberToName(final Member member) {
821         return MemberName.forName(member.roles().iterator().next());
822     }
823
824     private void memberRemoved(final ClusterEvent.MemberRemoved message) {
825         MemberName memberName = memberToName(message.member());
826
827         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
828                 message.member().address());
829
830         peerAddressResolver.removePeerAddress(memberName);
831     }
832
833     private void memberExited(final ClusterEvent.MemberExited message) {
834         MemberName memberName = memberToName(message.member());
835
836         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
837                 message.member().address());
838
839         peerAddressResolver.removePeerAddress(memberName);
840     }
841
842     private void memberUp(final ClusterEvent.MemberUp message) {
843         MemberName memberName = memberToName(message.member());
844
845         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
846                 message.member().address());
847
848         memberUp(memberName, message.member().address());
849     }
850
851     private void memberUp(final MemberName memberName, final Address address) {
852         addPeerAddress(memberName, address);
853         checkReady();
854     }
855
856     private void memberWeaklyUp(final MemberWeaklyUp message) {
857         MemberName memberName = memberToName(message.member());
858
859         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
860                 message.member().address());
861
862         memberUp(memberName, message.member().address());
863     }
864
865     private void addPeerAddress(final MemberName memberName, final Address address) {
866         peerAddressResolver.addPeerAddress(memberName, address);
867
868         for (ShardInformation info : localShards.values()) {
869             String shardName = info.getShardName();
870             String peerId = getShardIdentifier(memberName, shardName).toString();
871             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), self());
872         }
873     }
874
875     private void memberReachable(final ClusterEvent.ReachableMember message) {
876         MemberName memberName = memberToName(message.member());
877         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
878
879         addPeerAddress(memberName, message.member().address());
880
881         markMemberAvailable(memberName);
882     }
883
884     private void memberUnreachable(final ClusterEvent.UnreachableMember message) {
885         MemberName memberName = memberToName(message.member());
886         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
887
888         markMemberUnavailable(memberName);
889     }
890
891     private void markMemberUnavailable(final MemberName memberName) {
892         for (ShardInformation info : localShards.values()) {
893             String leaderId = info.getLeaderId();
894             if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
895                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
896                 info.setLeaderAvailable(false);
897
898                 primaryShardInfoCache.remove(info.getShardName());
899
900                 notifyShardAvailabilityCallbacks(info);
901             }
902         }
903     }
904
905     private void markMemberAvailable(final MemberName memberName) {
906         for (ShardInformation info : localShards.values()) {
907             String leaderId = info.getLeaderId();
908             if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
909                 LOG.debug("Marking Leader {} as available.", leaderId);
910                 info.setLeaderAvailable(true);
911             }
912         }
913     }
914
915     private void onDatastoreContextFactory(final DatastoreContextFactory factory) {
916         datastoreContextFactory = factory;
917         for (ShardInformation info : localShards.values()) {
918             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), self());
919         }
920     }
921
922     private void onGetLocalShardIds() {
923         final List<String> response = new ArrayList<>(localShards.size());
924
925         for (ShardInformation info : localShards.values()) {
926             response.add(info.getShardId().toString());
927         }
928
929         getSender().tell(new Status.Success(response), self());
930     }
931
932     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
933         final var shardId = message.shardId();
934         final var switchBehavior = new SwitchBehavior(message.newState(), message.term());
935
936         if (shardId != null) {
937             final var info = localShards.get(shardId.getShardName());
938             if (info == null) {
939                 getSender().tell(new Status.Failure(new IllegalArgumentException("Shard " + shardId + " is not local")),
940                     self());
941                 return;
942             }
943
944             switchShardBehavior(info, switchBehavior);
945         } else {
946             for (var info : localShards.values()) {
947                 switchShardBehavior(info, switchBehavior);
948             }
949         }
950
951         getSender().tell(new Status.Success(null), self());
952     }
953
954     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
955         final ActorRef actor = info.getActor();
956         if (actor != null) {
957             actor.tell(switchBehavior, self());
958         } else {
959             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
960                 info.getShardName(), switchBehavior.newState());
961         }
962     }
963
964     /**
965      * Notifies all the local shards of a change in the schema context.
966      *
967      * @param message the message to send
968      */
969     private void updateSchemaContext(final UpdateSchemaContext message) {
970         modelContext = message.modelContext();
971
972         LOG.debug("Got updated SchemaContext: # of modules {}", modelContext.getModules().size());
973
974         for (ShardInformation info : localShards.values()) {
975             info.setSchemaContext(modelContext);
976
977             if (info.getActor() == null) {
978                 LOG.debug("Creating Shard {}", info.getShardId());
979                 info.setActor(newShardActor(info));
980                 // Update peer address for every existing peer memeber to avoid missing sending
981                 // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp.
982                 String shardName = info.getShardName();
983                 for (MemberName memberName : peerAddressResolver.getPeerMembers()) {
984                     String peerId = getShardIdentifier(memberName, shardName).toString() ;
985                     String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName);
986                     info.updatePeerAddress(peerId, peerAddress, self());
987                     LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}",
988                             persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
989                 }
990             } else {
991                 info.getActor().tell(message, self());
992             }
993         }
994     }
995
996     @VisibleForTesting
997     protected ClusterWrapper getCluster() {
998         return cluster;
999     }
1000
1001     @VisibleForTesting
1002     protected ActorRef newShardActor(final ShardInformation info) {
1003         return getContext().actorOf(info.newProps().withDispatcher(shardDispatcherPath),
1004                 info.getShardId().toString());
1005     }
1006
1007     private void findPrimary(final FindPrimary message) {
1008         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
1009
1010         final String shardName = message.getShardName();
1011         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
1012
1013         // First see if the there is a local replica for the shard
1014         final ShardInformation info = localShards.get(shardName);
1015         if (info != null && info.isActiveMember()) {
1016             sendResponse(info, message.isWaitUntilReady(), true, () -> {
1017                 String primaryPath = info.getSerializedLeaderActor();
1018                 Object found = canReturnLocalShardState && info.isLeader()
1019                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().orElseThrow()) :
1020                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
1021
1022                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
1023                 return found;
1024             });
1025
1026             return;
1027         }
1028
1029         final Collection<String> visitedAddresses;
1030         if (message instanceof RemoteFindPrimary) {
1031             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1032         } else {
1033             visitedAddresses = new ArrayList<>(1);
1034         }
1035
1036         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1037
1038         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1039             if (visitedAddresses.contains(address)) {
1040                 continue;
1041             }
1042
1043             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1044                     persistenceId(), shardName, address, visitedAddresses);
1045
1046             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1047                     message.isWaitUntilReady(), visitedAddresses), getContext());
1048             return;
1049         }
1050
1051         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1052
1053         getSender().tell(new PrimaryNotFoundException(
1054                 String.format("No primary shard found for %s.", shardName)), self());
1055     }
1056
1057     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1058         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1059                 .getShardInitializationTimeout().duration().$times(2));
1060
1061         Future<Object> futureObj = Patterns.ask(self(), new FindPrimary(shardName, true), findPrimaryTimeout);
1062         futureObj.onComplete(new OnComplete<>() {
1063             @Override
1064             public void onComplete(final Throwable failure, final Object response) {
1065                 if (failure != null) {
1066                     handler.onFailure(failure);
1067                 } else if (response instanceof RemotePrimaryShardFound msg) {
1068                     handler.onRemotePrimaryShardFound(msg);
1069                 } else if (response instanceof LocalPrimaryShardFound msg) {
1070                     handler.onLocalPrimaryFound(msg);
1071                 } else {
1072                     handler.onUnknownResponse(response);
1073                 }
1074             }
1075         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1076     }
1077
1078     /**
1079      * Construct the name of the shard actor given the name of the member on
1080      * which the shard resides and the name of the shard.
1081      *
1082      * @param memberName the member name
1083      * @param shardName the shard name
1084      * @return a b
1085      */
1086     private ShardIdentifier getShardIdentifier(final MemberName memberName, final String shardName) {
1087         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1088     }
1089
1090     /**
1091      * Create shards that are local to the member on which the ShardManager runs.
1092      */
1093     private void createLocalShards() {
1094         MemberName memberName = cluster.getCurrentMemberName();
1095         Collection<String> memberShardNames = configuration.getMemberShardNames(memberName);
1096
1097         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1098         if (restoreFromSnapshot != null) {
1099             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1100                 shardSnapshots.put(snapshot.getName(), snapshot);
1101             }
1102         }
1103
1104         // null out to GC
1105         restoreFromSnapshot = null;
1106
1107         for (String shardName : memberShardNames) {
1108             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1109
1110             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1111
1112             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1113             localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
1114                     newShardDatastoreContext(shardName), shardSnapshots));
1115         }
1116     }
1117
1118     @VisibleForTesting
1119     ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId,
1120                                         final Map<String, String> peerAddresses,
1121                                         final DatastoreContext datastoreContext,
1122                                         final Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
1123         return new ShardInformation(shardName, shardId, peerAddresses,
1124                 datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
1125                 peerAddressResolver);
1126     }
1127
1128     /**
1129      * Given the name of the shard find the addresses of all it's peers.
1130      *
1131      * @param shardName the shard name
1132      */
1133     Map<String, String> getPeerAddresses(final String shardName) {
1134         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1135         return getPeerAddresses(shardName, members);
1136     }
1137
1138     private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
1139         Map<String, String> peerAddresses = new HashMap<>();
1140         MemberName currentMemberName = cluster.getCurrentMemberName();
1141
1142         for (MemberName memberName : members) {
1143             if (!currentMemberName.equals(memberName)) {
1144                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1145                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1146                 peerAddresses.put(shardId.toString(), address);
1147             }
1148         }
1149         return peerAddresses;
1150     }
1151
1152     @Override
1153     public SupervisorStrategy supervisorStrategy() {
1154
1155         return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES),
1156                 (Function<Throwable, Directive>) t -> {
1157                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1158                     return SupervisorStrategy.resume();
1159                 });
1160     }
1161
1162     @VisibleForTesting
1163     ShardManagerInfoMBean getMBean() {
1164         return shardManagerMBean;
1165     }
1166
1167     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1168         if (shardReplicaOperationsInProgress.contains(shardName)) {
1169             LOG.debug("{}: A shard replica operation for {} is already in progress", persistenceId(), shardName);
1170             sender.tell(new Status.Failure(new IllegalStateException(
1171                 String.format("A shard replica operation for %s is already in progress", shardName))), self());
1172             return true;
1173         }
1174
1175         return false;
1176     }
1177
1178     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1179         final String shardName = shardReplicaMsg.getShardName();
1180
1181         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1182
1183         // verify the shard with the specified name is present in the cluster configuration
1184         if (!configuration.isShardConfigured(shardName)) {
1185             LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName);
1186             getSender().tell(new Status.Failure(new IllegalArgumentException(
1187                 "No module configuration exists for shard " + shardName)), self());
1188             return;
1189         }
1190
1191         // Create the localShard
1192         if (modelContext == null) {
1193             LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
1194                 persistenceId(), shardName);
1195             getSender().tell(new Status.Failure(new IllegalStateException(
1196                 "No SchemaContext is available in order to create a local shard instance for " + shardName)), self());
1197             return;
1198         }
1199
1200         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1201                 self()) {
1202             @Override
1203             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1204                 final RunnableMessage runnable = (RunnableMessage) () ->
1205                     addShard(getShardName(), response, getSender());
1206                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1207                     self().tell(runnable, getTargetActor());
1208                 }
1209             }
1210
1211             @Override
1212             public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
1213                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1214             }
1215         });
1216     }
1217
1218     private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
1219         LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
1220         sender.tell(new Status.Failure(new AlreadyExistsException(
1221             String.format("Local shard %s already exists", shardName))), self());
1222     }
1223
1224     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1225         if (isShardReplicaOperationInProgress(shardName, sender)) {
1226             return;
1227         }
1228
1229         shardReplicaOperationsInProgress.add(shardName);
1230
1231         final ShardInformation shardInfo;
1232         final boolean removeShardOnFailure;
1233         ShardInformation existingShardInfo = localShards.get(shardName);
1234         if (existingShardInfo == null) {
1235             removeShardOnFailure = true;
1236             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1237
1238             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1239                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1240
1241             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1242                     Shard.builder(), peerAddressResolver);
1243             shardInfo.setActiveMember(false);
1244             shardInfo.setSchemaContext(modelContext);
1245             localShards.put(shardName, shardInfo);
1246             shardInfo.setActor(newShardActor(shardInfo));
1247         } else {
1248             removeShardOnFailure = false;
1249             shardInfo = existingShardInfo;
1250         }
1251
1252         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1253     }
1254
1255     private void execAddShard(final String shardName,
1256                               final ShardInformation shardInfo,
1257                               final RemotePrimaryShardFound response,
1258                               final boolean removeShardOnFailure,
1259                               final ActorRef sender) {
1260
1261         final String localShardAddress =
1262                 peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1263
1264         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1265         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1266                 response.getPrimaryPath(), shardInfo.getShardId());
1267
1268         final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1269                 .getShardLeaderElectionTimeout().duration());
1270         final Future<Object> futureObj = Patterns.ask(getContext().actorSelection(response.getPrimaryPath()),
1271                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1272
1273         futureObj.onComplete(new OnComplete<>() {
1274             @Override
1275             public void onComplete(final Throwable failure, final Object addServerResponse) {
1276                 if (failure != null) {
1277                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1278                             response.getPrimaryPath(), shardName, failure);
1279
1280                     final String msg = String.format("AddServer request to leader %s for shard %s failed",
1281                             response.getPrimaryPath(), shardName);
1282                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1283                 } else {
1284                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1285                             response.getPrimaryPath(), removeShardOnFailure), sender);
1286                 }
1287             }
1288         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1289     }
1290
1291     private void onAddServerFailure(final String shardName, final String message, final Throwable failure,
1292             final ActorRef sender, final boolean removeShardOnFailure) {
1293         shardReplicaOperationsInProgress.remove(shardName);
1294
1295         if (removeShardOnFailure) {
1296             ShardInformation shardInfo = localShards.remove(shardName);
1297             if (shardInfo.getActor() != null) {
1298                 shardInfo.getActor().tell(PoisonPill.getInstance(), self());
1299             }
1300         }
1301
1302         sender.tell(new Status.Failure(message == null ? failure : new RuntimeException(message, failure)), self());
1303     }
1304
1305     private void onAddServerReply(final ShardInformation shardInfo, final AddServerReply replyMsg,
1306             final ActorRef sender, final String leaderPath, final boolean removeShardOnFailure) {
1307         String shardName = shardInfo.getShardName();
1308         shardReplicaOperationsInProgress.remove(shardName);
1309
1310         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1311
1312         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1313             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1314
1315             // Make the local shard voting capable
1316             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), self());
1317             shardInfo.setActiveMember(true);
1318             persistShardList();
1319
1320             sender.tell(new Status.Success(null), self());
1321         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1322             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1323         } else {
1324             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1325                     persistenceId(), shardName, replyMsg.getStatus());
1326
1327             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1328                     shardInfo.getShardId());
1329
1330             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1331         }
1332     }
1333
1334     private static Exception getServerChangeException(final Class<?> serverChange,
1335             final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) {
1336         return switch (serverChangeStatus) {
1337             case TIMEOUT -> new TimeoutException("""
1338                 The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible \
1339                 causes - there was a problem replicating the data or shard leadership changed while replicating the \
1340                 shard data""".formatted(leaderPath, shardId.getShardName()));
1341             case NO_LEADER -> new NoShardLeaderException(shardId);
1342             case NOT_SUPPORTED -> new UnsupportedOperationException(
1343                 "%s request is not supported for shard %s".formatted(
1344                     serverChange.getSimpleName(), shardId.getShardName()));
1345             default -> new RuntimeException("%s request to leader %s for shard %s failed with status %s".formatted(
1346                 serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1347         };
1348     }
1349
1350     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1351         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1352
1353         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1354                 shardReplicaMsg.getShardName(), persistenceId(), self()) {
1355             @Override
1356             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1357                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1358             }
1359
1360             @Override
1361             public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
1362                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1363             }
1364
1365             private void doRemoveShardReplicaAsync(final String primaryPath) {
1366                 self().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1367                         primaryPath, getSender()), getTargetActor());
1368             }
1369         });
1370     }
1371
1372     private void persistShardList() {
1373         List<String> shardList = new ArrayList<>(localShards.keySet());
1374         for (ShardInformation shardInfo : localShards.values()) {
1375             if (!shardInfo.isActiveMember()) {
1376                 shardList.remove(shardInfo.getShardName());
1377             }
1378         }
1379         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1380         saveSnapshot(updateShardManagerSnapshot(shardList));
1381     }
1382
1383     private ShardManagerSnapshot updateShardManagerSnapshot(final List<String> shardList) {
1384         currentSnapshot = new ShardManagerSnapshot(shardList);
1385         return currentSnapshot;
1386     }
1387
1388     private void applyShardManagerSnapshot(final ShardManagerSnapshot snapshot) {
1389         currentSnapshot = snapshot;
1390
1391         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1392
1393         final MemberName currentMember = cluster.getCurrentMemberName();
1394         Set<String> configuredShardList =
1395             new HashSet<>(configuration.getMemberShardNames(currentMember));
1396         for (String shard : currentSnapshot.getShardList()) {
1397             if (!configuredShardList.contains(shard)) {
1398                 // add the current member as a replica for the shard
1399                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1400                 configuration.addMemberReplicaForShard(shard, currentMember);
1401             } else {
1402                 configuredShardList.remove(shard);
1403             }
1404         }
1405         for (String shard : configuredShardList) {
1406             // remove the member as a replica for the shard
1407             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1408             configuration.removeMemberReplicaForShard(shard, currentMember);
1409         }
1410     }
1411
1412     private void onSaveSnapshotSuccess(final SaveSnapshotSuccess successMessage) {
1413         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1414             persistenceId());
1415         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1416             0, 0));
1417     }
1418
1419     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1420         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1421
1422         String shardName = changeMembersVotingStatus.getShardName();
1423         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1424         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1425             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1426                     e.getValue());
1427         }
1428
1429         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1430
1431         findLocalShard(shardName, getSender(),
1432             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1433             localShardFound.getPath(), getSender()));
1434     }
1435
1436     private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) {
1437         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1438
1439         ActorRef sender = getSender();
1440         final String shardName = flipMembersVotingStatus.getShardName();
1441         findLocalShard(shardName, sender, localShardFound -> {
1442             Future<Object> future = Patterns.ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1443                     Timeout.apply(30, TimeUnit.SECONDS));
1444
1445             future.onComplete(new OnComplete<>() {
1446                 @Override
1447                 public void onComplete(final Throwable failure, final Object response) {
1448                     if (failure != null) {
1449                         sender.tell(new Status.Failure(new RuntimeException(
1450                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1451                         return;
1452                     }
1453
1454                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1455                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1456                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1457                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1458                     }
1459
1460                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1461                             .toString(), !raftState.isVoting());
1462
1463                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1464                             shardName, localShardFound.getPath(), sender);
1465                 }
1466             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1467         });
1468
1469     }
1470
1471     private void findLocalShard(final FindLocalShard message) {
1472         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
1473
1474         final ShardInformation shardInformation = localShards.get(message.getShardName());
1475
1476         if (shardInformation == null) {
1477             LOG.debug("{}: Local shard {} not found - shards present: {}",
1478                     persistenceId(), message.getShardName(), localShards.keySet());
1479
1480             getSender().tell(new LocalShardNotFound(message.getShardName()), self());
1481             return;
1482         }
1483
1484         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1485             () -> new LocalShardFound(shardInformation.getActor()));
1486     }
1487
1488     private void findLocalShard(final String shardName, final ActorRef sender,
1489             final Consumer<LocalShardFound> onLocalShardFound) {
1490         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1491                 .getShardInitializationTimeout().duration().$times(2));
1492
1493         Future<Object> futureObj = Patterns.ask(self(), new FindLocalShard(shardName, true), findLocalTimeout);
1494         futureObj.onComplete(new OnComplete<>() {
1495             @Override
1496             public void onComplete(final Throwable failure, final Object response) {
1497                 if (failure != null) {
1498                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId(), shardName,
1499                         failure);
1500                     sender.tell(new Status.Failure(new RuntimeException(
1501                         String.format("Failed to find local shard %s", shardName), failure)), self());
1502                 } if (response instanceof LocalShardFound msg) {
1503                     self().tell((RunnableMessage) () -> onLocalShardFound.accept(msg), sender);
1504                 } else if (response instanceof LocalShardNotFound) {
1505                     LOG.debug("{}: Local shard {} does not exist", persistenceId(), shardName);
1506                     sender.tell(new Status.Failure(new IllegalArgumentException(
1507                         String.format("Local shard %s does not exist", shardName))), self());
1508                 } else {
1509                     LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId(), shardName,
1510                         response);
1511                     sender.tell(new Status.Failure(response instanceof Throwable throwable ? throwable
1512                         : new RuntimeException(String.format("Failed to find local shard %s: received response: %s",
1513                             shardName, response))), self());
1514                 }
1515             }
1516         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1517     }
1518
1519     private void changeShardMembersVotingStatus(final ChangeServersVotingStatus changeServersVotingStatus,
1520             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1521         if (isShardReplicaOperationInProgress(shardName, sender)) {
1522             return;
1523         }
1524
1525         shardReplicaOperationsInProgress.add(shardName);
1526
1527         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1528         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1529
1530         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1531                 changeServersVotingStatus, shardActorRef.path());
1532
1533         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1534         Patterns.ask(shardActorRef, changeServersVotingStatus, timeout).onComplete(new OnComplete<>() {
1535             @Override
1536             public void onComplete(final Throwable failure, final Object response) {
1537                 shardReplicaOperationsInProgress.remove(shardName);
1538                 if (failure != null) {
1539                     LOG.debug("{}: ChangeServersVotingStatus request to local shard {} failed", persistenceId(),
1540                         shardActorRef.path(), failure);
1541                     sender.tell(new Status.Failure(new RuntimeException(
1542                         String.format("ChangeServersVotingStatus request to local shard %s failed",
1543                             shardActorRef.path()), failure)), self());
1544                     return;
1545                 }
1546
1547                 LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1548
1549                 final var replyMsg = (ServerChangeReply) response;
1550                 if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1551                     LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1552                     sender.tell(new Status.Success(null), self());
1553                 } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1554                     sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1555                         "The requested voting state change for shard %s is invalid. At least one member "
1556                             + "must be voting", shardId.getShardName()))), self());
1557                 } else {
1558                     LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1559                         persistenceId(), shardName, replyMsg.getStatus());
1560
1561                     Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1562                         replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1563                     sender.tell(new Status.Failure(error), self());
1564                 }
1565             }
1566         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1567     }
1568
1569     private static final class ForwardedAddServerReply {
1570         ShardInformation shardInfo;
1571         AddServerReply addServerReply;
1572         String leaderPath;
1573         boolean removeShardOnFailure;
1574
1575         ForwardedAddServerReply(final ShardInformation shardInfo, final AddServerReply addServerReply,
1576             final String leaderPath, final boolean removeShardOnFailure) {
1577             this.shardInfo = shardInfo;
1578             this.addServerReply = addServerReply;
1579             this.leaderPath = leaderPath;
1580             this.removeShardOnFailure = removeShardOnFailure;
1581         }
1582     }
1583
1584     private static final class ForwardedAddServerFailure {
1585         String shardName;
1586         String failureMessage;
1587         Throwable failure;
1588         boolean removeShardOnFailure;
1589
1590         ForwardedAddServerFailure(final String shardName, final String failureMessage, final Throwable failure,
1591                 final boolean removeShardOnFailure) {
1592             this.shardName = shardName;
1593             this.failureMessage = failureMessage;
1594             this.failure = failure;
1595             this.removeShardOnFailure = removeShardOnFailure;
1596         }
1597     }
1598
1599     static class OnShardInitialized {
1600         private final Runnable replyRunnable;
1601         private Cancellable timeoutSchedule;
1602
1603         OnShardInitialized(final Runnable replyRunnable) {
1604             this.replyRunnable = replyRunnable;
1605         }
1606
1607         Runnable getReplyRunnable() {
1608             return replyRunnable;
1609         }
1610
1611         Cancellable getTimeoutSchedule() {
1612             return timeoutSchedule;
1613         }
1614
1615         void setTimeoutSchedule(final Cancellable timeoutSchedule) {
1616             this.timeoutSchedule = timeoutSchedule;
1617         }
1618     }
1619
1620     static class OnShardReady extends OnShardInitialized {
1621         OnShardReady(final Runnable replyRunnable) {
1622             super(replyRunnable);
1623         }
1624     }
1625
1626     private interface RunnableMessage extends Runnable {
1627     }
1628
1629     /**
1630      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1631      * a remote or local find primary message is processed.
1632      */
1633     private interface FindPrimaryResponseHandler {
1634         /**
1635          * Invoked when a Failure message is received as a response.
1636          *
1637          * @param failure the failure exception
1638          */
1639         void onFailure(Throwable failure);
1640
1641         /**
1642          * Invoked when a RemotePrimaryShardFound response is received.
1643          *
1644          * @param response the response
1645          */
1646         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1647
1648         /**
1649          * Invoked when a LocalPrimaryShardFound response is received.
1650          *
1651          * @param response the response
1652          */
1653         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1654
1655         /**
1656          * Invoked when an unknown response is received. This is another type of failure.
1657          *
1658          * @param response the response
1659          */
1660         void onUnknownResponse(Object response);
1661     }
1662
1663     /**
1664      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1665      * replica and sends a wrapped Failure response to some targetActor.
1666      */
1667     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1668         private final ActorRef targetActor;
1669         private final String shardName;
1670         private final String persistenceId;
1671         private final ActorRef shardManagerActor;
1672
1673         /**
1674          * Constructs an instance.
1675          *
1676          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1677          * @param shardName The name of the shard for which the primary replica had to be found
1678          * @param persistenceId The persistenceId for the ShardManager
1679          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1680          */
1681         protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
1682                 final String persistenceId, final ActorRef shardManagerActor) {
1683             this.targetActor = requireNonNull(targetActor);
1684             this.shardName = requireNonNull(shardName);
1685             this.persistenceId = requireNonNull(persistenceId);
1686             this.shardManagerActor = requireNonNull(shardManagerActor);
1687         }
1688
1689         public ActorRef getTargetActor() {
1690             return targetActor;
1691         }
1692
1693         public String getShardName() {
1694             return shardName;
1695         }
1696
1697         @Override
1698         public void onFailure(final Throwable failure) {
1699             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1700             targetActor.tell(new Status.Failure(new RuntimeException(
1701                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1702         }
1703
1704         @Override
1705         public void onUnknownResponse(final Object response) {
1706             LOG.debug("{}: Failed to find leader for shard {}: received response: {}", persistenceId, shardName,
1707                 response);
1708             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
1709                     : new RuntimeException(String.format("Failed to find leader for shard %s: received response: %s",
1710                         shardName, response))), shardManagerActor);
1711         }
1712     }
1713
1714     /**
1715      * The WrappedShardResponse class wraps a response from a Shard.
1716      */
1717     private static final class WrappedShardResponse {
1718         private final ShardIdentifier shardId;
1719         private final Object response;
1720         private final String leaderPath;
1721
1722         WrappedShardResponse(final ShardIdentifier shardId, final Object response, final String leaderPath) {
1723             this.shardId = shardId;
1724             this.response = response;
1725             this.leaderPath = leaderPath;
1726         }
1727
1728         ShardIdentifier getShardId() {
1729             return shardId;
1730         }
1731
1732         Object getResponse() {
1733             return response;
1734         }
1735
1736         String getLeaderPath() {
1737             return leaderPath;
1738         }
1739     }
1740
1741     private static final class ShardNotInitializedTimeout {
1742         private final ActorRef sender;
1743         private final ShardInformation shardInfo;
1744         private final OnShardInitialized onShardInitialized;
1745
1746         ShardNotInitializedTimeout(final ShardInformation shardInfo, final OnShardInitialized onShardInitialized,
1747             final ActorRef sender) {
1748             this.sender = sender;
1749             this.shardInfo = shardInfo;
1750             this.onShardInitialized = onShardInitialized;
1751         }
1752
1753         ActorRef getSender() {
1754             return sender;
1755         }
1756
1757         ShardInformation getShardInfo() {
1758             return shardInfo;
1759         }
1760
1761         OnShardInitialized getOnShardInitialized() {
1762             return onShardInitialized;
1763         }
1764     }
1765 }
1766
1767
1768