Bug 7521: Convert byte[] to ShardManagerSnapshot in DatastoreSnapshot
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.actor.SupervisorStrategy.Directive;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberWeaklyUp;
23 import akka.cluster.Member;
24 import akka.dispatch.Futures;
25 import akka.dispatch.OnComplete;
26 import akka.japi.Function;
27 import akka.pattern.Patterns;
28 import akka.persistence.RecoveryCompleted;
29 import akka.persistence.SaveSnapshotFailure;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.persistence.SnapshotOffer;
32 import akka.persistence.SnapshotSelectionCriteria;
33 import akka.util.Timeout;
34 import com.google.common.annotations.VisibleForTesting;
35 import com.google.common.base.Preconditions;
36 import java.util.ArrayList;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.HashMap;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.Set;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import java.util.function.Consumer;
49 import java.util.function.Supplier;
50 import org.opendaylight.controller.cluster.access.concepts.MemberName;
51 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
52 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
53 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
54 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
55 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
56 import org.opendaylight.controller.cluster.datastore.Shard;
57 import org.opendaylight.controller.cluster.datastore.config.Configuration;
58 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
59 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
60 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
61 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
63 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
64 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
65 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
66 import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
67 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
68 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
69 import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
70 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
71 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
72 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
73 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
77 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
78 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
79 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
80 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
81 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
82 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
83 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
84 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
85 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
86 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
87 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
88 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
89 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
90 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
91 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
92 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
93 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
94 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
95 import org.opendaylight.controller.cluster.raft.messages.AddServer;
96 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
97 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
98 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
99 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
100 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
101 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
102 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
103 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
104 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
105 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
108 import scala.concurrent.ExecutionContext;
109 import scala.concurrent.Future;
110 import scala.concurrent.duration.Duration;
111 import scala.concurrent.duration.FiniteDuration;
112
113 /**
114  * Manages the shards for a data store. The ShardManager has the following jobs:
115  * <ul>
116  * <li> Create all the local shard replicas that belong on this cluster member
117  * <li> Find the address of the local shard
118  * <li> Find the primary replica for any given shard
119  * <li> Monitor the cluster members and store their addresses
120  * </ul>
121  */
122 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
123     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
124
125     // Stores a mapping between a shard name and it's corresponding information
126     // Shard names look like inventory, topology etc and are as specified in
127     // configuration
128     private final Map<String, ShardInformation> localShards = new HashMap<>();
129
130     // The type of a ShardManager reflects the type of the datastore itself
131     // A data store could be of type config/operational
132     private final String type;
133
134     private final ClusterWrapper cluster;
135
136     private final Configuration configuration;
137
138     private final String shardDispatcherPath;
139
140     private final ShardManagerInfo shardManagerMBean;
141
142     private DatastoreContextFactory datastoreContextFactory;
143
144     private final CountDownLatch waitTillReadyCountdownLatch;
145
146     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
147
148     private final ShardPeerAddressResolver peerAddressResolver;
149
150     private SchemaContext schemaContext;
151
152     private DatastoreSnapshot restoreFromSnapshot;
153
154     private ShardManagerSnapshot currentSnapshot;
155
156     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
157
158     private final String persistenceId;
159
160     ShardManager(AbstractShardManagerCreator<?> builder) {
161         this.cluster = builder.getCluster();
162         this.configuration = builder.getConfiguration();
163         this.datastoreContextFactory = builder.getDatastoreContextFactory();
164         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
165         this.shardDispatcherPath =
166                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
167         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
168         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
169         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
170
171         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
172         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
173
174         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
175
176         // Subscribe this actor to cluster member events
177         cluster.subscribeToMemberEvents(getSelf());
178
179         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
180                 "shard-manager-" + this.type,
181                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
182         shardManagerMBean.registerMBean();
183     }
184
185     @Override
186     public void postStop() {
187         LOG.info("Stopping ShardManager {}", persistenceId());
188
189         shardManagerMBean.unregisterMBean();
190     }
191
192     @Override
193     public void handleCommand(Object message) throws Exception {
194         if (message  instanceof FindPrimary) {
195             findPrimary((FindPrimary)message);
196         } else if (message instanceof FindLocalShard) {
197             findLocalShard((FindLocalShard) message);
198         } else if (message instanceof UpdateSchemaContext) {
199             updateSchemaContext(message);
200         } else if (message instanceof ActorInitialized) {
201             onActorInitialized(message);
202         } else if (message instanceof ClusterEvent.MemberUp) {
203             memberUp((ClusterEvent.MemberUp) message);
204         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
205             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
206         } else if (message instanceof ClusterEvent.MemberExited) {
207             memberExited((ClusterEvent.MemberExited) message);
208         } else if (message instanceof ClusterEvent.MemberRemoved) {
209             memberRemoved((ClusterEvent.MemberRemoved) message);
210         } else if (message instanceof ClusterEvent.UnreachableMember) {
211             memberUnreachable((ClusterEvent.UnreachableMember) message);
212         } else if (message instanceof ClusterEvent.ReachableMember) {
213             memberReachable((ClusterEvent.ReachableMember) message);
214         } else if (message instanceof DatastoreContextFactory) {
215             onDatastoreContextFactory((DatastoreContextFactory) message);
216         } else if (message instanceof RoleChangeNotification) {
217             onRoleChangeNotification((RoleChangeNotification) message);
218         } else if (message instanceof FollowerInitialSyncUpStatus) {
219             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
220         } else if (message instanceof ShardNotInitializedTimeout) {
221             onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
222         } else if (message instanceof ShardLeaderStateChanged) {
223             onLeaderStateChanged((ShardLeaderStateChanged) message);
224         } else if (message instanceof SwitchShardBehavior) {
225             onSwitchShardBehavior((SwitchShardBehavior) message);
226         } else if (message instanceof CreateShard) {
227             onCreateShard((CreateShard)message);
228         } else if (message instanceof AddShardReplica) {
229             onAddShardReplica((AddShardReplica) message);
230         } else if (message instanceof CreatePrefixedShard) {
231             onCreatePrefixedShard((CreatePrefixedShard) message);
232         } else if (message instanceof AddPrefixShardReplica) {
233             onAddPrefixShardReplica((AddPrefixShardReplica) message);
234         } else if (message instanceof ForwardedAddServerReply) {
235             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
236             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
237                     msg.removeShardOnFailure);
238         } else if (message instanceof ForwardedAddServerFailure) {
239             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
240             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
241         } else if (message instanceof RemoveShardReplica) {
242             onRemoveShardReplica((RemoveShardReplica) message);
243         } else if (message instanceof WrappedShardResponse) {
244             onWrappedShardResponse((WrappedShardResponse) message);
245         } else if (message instanceof GetSnapshot) {
246             onGetSnapshot();
247         } else if (message instanceof ServerRemoved) {
248             onShardReplicaRemoved((ServerRemoved) message);
249         } else if (message instanceof ChangeShardMembersVotingStatus) {
250             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
251         } else if (message instanceof FlipShardMembersVotingStatus) {
252             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
253         } else if (message instanceof SaveSnapshotSuccess) {
254             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
255         } else if (message instanceof SaveSnapshotFailure) {
256             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
257                     ((SaveSnapshotFailure) message).cause());
258         } else if (message instanceof Shutdown) {
259             onShutDown();
260         } else if (message instanceof GetLocalShardIds) {
261             onGetLocalShardIds();
262         } else if (message instanceof RunnableMessage) {
263             ((RunnableMessage)message).run();
264         } else {
265             unknownMessage(message);
266         }
267     }
268
269     private void onShutDown() {
270         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
271         for (ShardInformation info : localShards.values()) {
272             if (info.getActor() != null) {
273                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
274
275                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
276                         .getElectionTimeOutInterval().$times(2);
277                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
278             }
279         }
280
281         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
282
283         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
284                 .getDispatcher(Dispatchers.DispatcherType.Client);
285         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
286
287         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
288             @Override
289             public void onComplete(Throwable failure, Iterable<Boolean> results) {
290                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
291
292                 self().tell(PoisonPill.getInstance(), self());
293
294                 if (failure != null) {
295                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
296                 } else {
297                     int nfailed = 0;
298                     for (Boolean result : results) {
299                         if (!result) {
300                             nfailed++;
301                         }
302                     }
303
304                     if (nfailed > 0) {
305                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
306                     }
307                 }
308             }
309         }, dispatcher);
310     }
311
312     private void onWrappedShardResponse(WrappedShardResponse message) {
313         if (message.getResponse() instanceof RemoveServerReply) {
314             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
315                     message.getLeaderPath());
316         }
317     }
318
319     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
320             String leaderPath) {
321         shardReplicaOperationsInProgress.remove(shardId.getShardName());
322
323         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
324
325         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
326             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
327                     shardId.getShardName());
328             originalSender.tell(new Status.Success(null), getSelf());
329         } else {
330             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
331                     persistenceId(), shardId, replyMsg.getStatus());
332
333             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
334             originalSender.tell(new Status.Failure(failure), getSelf());
335         }
336     }
337
338     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
339             final ActorRef sender) {
340         if (isShardReplicaOperationInProgress(shardName, sender)) {
341             return;
342         }
343
344         shardReplicaOperationsInProgress.add(shardName);
345
346         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
347
348         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
349
350         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
351         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
352                 primaryPath, shardId);
353
354         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
355         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
356                 new RemoveServer(shardId.toString()), removeServerTimeout);
357
358         futureObj.onComplete(new OnComplete<Object>() {
359             @Override
360             public void onComplete(Throwable failure, Object response) {
361                 if (failure != null) {
362                     shardReplicaOperationsInProgress.remove(shardName);
363                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
364                             primaryPath, shardName);
365
366                     LOG.debug("{}: {}", persistenceId(), msg, failure);
367
368                     // FAILURE
369                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
370                 } else {
371                     // SUCCESS
372                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
373                 }
374             }
375         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
376     }
377
378     private void onShardReplicaRemoved(ServerRemoved message) {
379         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
380         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
381         if (shardInformation == null) {
382             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
383             return;
384         } else if (shardInformation.getActor() != null) {
385             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
386             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
387         }
388         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
389         persistShardList();
390     }
391
392     private void onGetSnapshot() {
393         LOG.debug("{}: onGetSnapshot", persistenceId());
394
395         List<String> notInitialized = null;
396         for (ShardInformation shardInfo : localShards.values()) {
397             if (!shardInfo.isShardInitialized()) {
398                 if (notInitialized == null) {
399                     notInitialized = new ArrayList<>();
400                 }
401
402                 notInitialized.add(shardInfo.getShardName());
403             }
404         }
405
406         if (notInitialized != null) {
407             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
408                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
409             return;
410         }
411
412         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
413                 new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
414                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
415
416         for (ShardInformation shardInfo: localShards.values()) {
417             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
418         }
419     }
420
421     @SuppressWarnings("checkstyle:IllegalCatch")
422     private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
423         LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
424
425         Object reply;
426         try {
427             final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
428                     createPrefixedShard.getConfig().getPrefix());
429             if (localShards.containsKey(shardId.getShardName())) {
430                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
431                 reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
432             } else {
433                 doCreatePrefixedShard(createPrefixedShard);
434                 reply = new Status.Success(null);
435             }
436         } catch (final Exception e) {
437             LOG.error("{}: onCreateShard failed", persistenceId(), e);
438             reply = new Status.Failure(e);
439         }
440
441         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
442             getSender().tell(reply, getSelf());
443         }
444     }
445
446     @SuppressWarnings("checkstyle:IllegalCatch")
447     private void onCreateShard(CreateShard createShard) {
448         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
449
450         Object reply;
451         try {
452             String shardName = createShard.getModuleShardConfig().getShardName();
453             if (localShards.containsKey(shardName)) {
454                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
455                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
456             } else {
457                 doCreateShard(createShard);
458                 reply = new Status.Success(null);
459             }
460         } catch (Exception e) {
461             LOG.error("{}: onCreateShard failed", persistenceId(), e);
462             reply = new Status.Failure(e);
463         }
464
465         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
466             getSender().tell(reply, getSelf());
467         }
468     }
469
470     private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
471         final PrefixShardConfiguration config = createPrefixedShard.getConfig();
472
473         final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
474                 createPrefixedShard.getConfig().getPrefix());
475         final String shardName = shardId.getShardName();
476
477         configuration.addPrefixShardConfiguration(config);
478
479         DatastoreContext shardDatastoreContext = createPrefixedShard.getContext();
480
481         if (shardDatastoreContext == null) {
482             final Builder builder = newShardDatastoreContextBuilder(shardName);
483             builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
484                     .storeRoot(config.getPrefix().getRootIdentifier());
485             shardDatastoreContext = builder.build();
486         } else {
487             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
488                     peerAddressResolver).build();
489         }
490
491         final boolean shardWasInRecoveredSnapshot = currentSnapshot != null
492                 && currentSnapshot.getShardList().contains(shardName);
493
494         final Map<String, String> peerAddresses = Collections.emptyMap();
495         final boolean isActiveMember = true;
496         LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
497                 persistenceId(), shardId, peerAddresses, isActiveMember);
498
499         final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
500                 shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver);
501         info.setActiveMember(isActiveMember);
502         localShards.put(info.getShardName(), info);
503
504         if (schemaContext != null) {
505             info.setActor(newShardActor(schemaContext, info));
506         }
507     }
508
509     private void doCreateShard(final CreateShard createShard) {
510         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
511         final String shardName = moduleShardConfig.getShardName();
512
513         configuration.addModuleShardConfiguration(moduleShardConfig);
514
515         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
516         if (shardDatastoreContext == null) {
517             shardDatastoreContext = newShardDatastoreContext(shardName);
518         } else {
519             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
520                     peerAddressResolver).build();
521         }
522
523         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
524
525         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
526                 && currentSnapshot.getShardList().contains(shardName);
527
528         Map<String, String> peerAddresses;
529         boolean isActiveMember;
530         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
531                 .contains(cluster.getCurrentMemberName())) {
532             peerAddresses = getPeerAddresses(shardName);
533             isActiveMember = true;
534         } else {
535             // The local member is not in the static shard member configuration and the shard did not
536             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
537             // the shard with no peers and with elections disabled so it stays as follower. A
538             // subsequent AddServer request will be needed to make it an active member.
539             isActiveMember = false;
540             peerAddresses = Collections.emptyMap();
541             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
542                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
543         }
544
545         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
546                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
547                 isActiveMember);
548
549         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
550                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
551         info.setActiveMember(isActiveMember);
552         localShards.put(info.getShardName(), info);
553
554         if (schemaContext != null) {
555             info.setActor(newShardActor(schemaContext, info));
556         }
557     }
558
559     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
560         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
561                 .shardPeerAddressResolver(peerAddressResolver);
562     }
563
564     private DatastoreContext newShardDatastoreContext(String shardName) {
565         return newShardDatastoreContextBuilder(shardName).build();
566     }
567
568     private void checkReady() {
569         if (isReadyWithLeaderId()) {
570             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
571                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
572
573             waitTillReadyCountdownLatch.countDown();
574         }
575     }
576
577     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
578         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
579
580         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
581         if (shardInformation != null) {
582             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
583             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
584             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
585                 primaryShardInfoCache.remove(shardInformation.getShardName());
586             }
587
588             checkReady();
589         } else {
590             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
591         }
592     }
593
594     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
595         ShardInformation shardInfo = message.getShardInfo();
596
597         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
598                 shardInfo.getShardName());
599
600         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
601
602         if (!shardInfo.isShardInitialized()) {
603             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
604             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
605         } else {
606             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
607             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
608         }
609     }
610
611     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
612         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
613                 status.getName(), status.isInitialSyncDone());
614
615         ShardInformation shardInformation = findShardInformation(status.getName());
616
617         if (shardInformation != null) {
618             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
619
620             shardManagerMBean.setSyncStatus(isInSync());
621         }
622
623     }
624
625     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
626         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
627                 roleChanged.getOldRole(), roleChanged.getNewRole());
628
629         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
630         if (shardInformation != null) {
631             shardInformation.setRole(roleChanged.getNewRole());
632             checkReady();
633             shardManagerMBean.setSyncStatus(isInSync());
634         }
635     }
636
637
638     private ShardInformation findShardInformation(String memberId) {
639         for (ShardInformation info : localShards.values()) {
640             if (info.getShardId().toString().equals(memberId)) {
641                 return info;
642             }
643         }
644
645         return null;
646     }
647
648     private boolean isReadyWithLeaderId() {
649         boolean isReady = true;
650         for (ShardInformation info : localShards.values()) {
651             if (!info.isShardReadyWithLeaderId()) {
652                 isReady = false;
653                 break;
654             }
655         }
656         return isReady;
657     }
658
659     private boolean isInSync() {
660         for (ShardInformation info : localShards.values()) {
661             if (!info.isInSync()) {
662                 return false;
663             }
664         }
665         return true;
666     }
667
668     private void onActorInitialized(Object message) {
669         final ActorRef sender = getSender();
670
671         if (sender == null) {
672             return; //why is a non-actor sending this message? Just ignore.
673         }
674
675         String actorName = sender.path().name();
676         //find shard name from actor name; actor name is stringified shardId
677
678         final ShardIdentifier shardId;
679         try {
680             shardId = ShardIdentifier.fromShardIdString(actorName);
681         } catch (IllegalArgumentException e) {
682             LOG.debug("{}: ignoring actor {}", actorName, e);
683             return;
684         }
685
686         markShardAsInitialized(shardId.getShardName());
687     }
688
689     private void markShardAsInitialized(String shardName) {
690         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
691
692         ShardInformation shardInformation = localShards.get(shardName);
693         if (shardInformation != null) {
694             shardInformation.setActorInitialized();
695
696             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
697         }
698     }
699
700     @Override
701     protected void handleRecover(Object message) throws Exception {
702         if (message instanceof RecoveryCompleted) {
703             onRecoveryCompleted();
704         } else if (message instanceof SnapshotOffer) {
705             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
706         }
707     }
708
709     @SuppressWarnings("checkstyle:IllegalCatch")
710     private void onRecoveryCompleted() {
711         LOG.info("Recovery complete : {}", persistenceId());
712
713         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
714         // journal on upgrade from Helium.
715         deleteMessages(lastSequenceNr());
716
717         if (currentSnapshot == null && restoreFromSnapshot != null
718                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
719             ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
720
721             LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
722
723             applyShardManagerSnapshot(snapshot);
724         }
725
726         createLocalShards();
727     }
728
729     private void sendResponse(ShardInformation shardInformation, boolean doWait,
730             boolean wantShardReady, final Supplier<Object> messageSupplier) {
731         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
732             if (doWait) {
733                 final ActorRef sender = getSender();
734                 final ActorRef self = self();
735
736                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
737
738                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
739                     new OnShardInitialized(replyRunnable);
740
741                 shardInformation.addOnShardInitialized(onShardInitialized);
742
743                 FiniteDuration timeout = shardInformation.getDatastoreContext()
744                         .getShardInitializationTimeout().duration();
745                 if (shardInformation.isShardInitialized()) {
746                     // If the shard is already initialized then we'll wait enough time for the shard to
747                     // elect a leader, ie 2 times the election timeout.
748                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
749                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
750                 }
751
752                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
753                         shardInformation.getShardName());
754
755                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
756                         timeout, getSelf(),
757                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
758                         getContext().dispatcher(), getSelf());
759
760                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
761
762             } else if (!shardInformation.isShardInitialized()) {
763                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
764                         shardInformation.getShardName());
765                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
766             } else {
767                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
768                         shardInformation.getShardName());
769                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
770             }
771
772             return;
773         }
774
775         getSender().tell(messageSupplier.get(), getSelf());
776     }
777
778     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
779         return new NoShardLeaderException(null, shardId.toString());
780     }
781
782     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
783         return new NotInitializedException(String.format(
784                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
785     }
786
787     @VisibleForTesting
788     static MemberName memberToName(final Member member) {
789         return MemberName.forName(member.roles().iterator().next());
790     }
791
792     private void memberRemoved(ClusterEvent.MemberRemoved message) {
793         MemberName memberName = memberToName(message.member());
794
795         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
796                 message.member().address());
797
798         peerAddressResolver.removePeerAddress(memberName);
799
800         for (ShardInformation info : localShards.values()) {
801             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
802         }
803     }
804
805     private void memberExited(ClusterEvent.MemberExited message) {
806         MemberName memberName = memberToName(message.member());
807
808         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
809                 message.member().address());
810
811         peerAddressResolver.removePeerAddress(memberName);
812
813         for (ShardInformation info : localShards.values()) {
814             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
815         }
816     }
817
818     private void memberUp(ClusterEvent.MemberUp message) {
819         MemberName memberName = memberToName(message.member());
820
821         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
822                 message.member().address());
823
824         memberUp(memberName, message.member().address());
825     }
826
827     private void memberUp(MemberName memberName, Address address) {
828         addPeerAddress(memberName, address);
829         checkReady();
830     }
831
832     private void memberWeaklyUp(MemberWeaklyUp message) {
833         MemberName memberName = memberToName(message.member());
834
835         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
836                 message.member().address());
837
838         memberUp(memberName, message.member().address());
839     }
840
841     private void addPeerAddress(MemberName memberName, Address address) {
842         peerAddressResolver.addPeerAddress(memberName, address);
843
844         for (ShardInformation info : localShards.values()) {
845             String shardName = info.getShardName();
846             String peerId = getShardIdentifier(memberName, shardName).toString();
847             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
848
849             info.peerUp(memberName, peerId, getSelf());
850         }
851     }
852
853     private void memberReachable(ClusterEvent.ReachableMember message) {
854         MemberName memberName = memberToName(message.member());
855         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
856
857         addPeerAddress(memberName, message.member().address());
858
859         markMemberAvailable(memberName);
860     }
861
862     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
863         MemberName memberName = memberToName(message.member());
864         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
865
866         markMemberUnavailable(memberName);
867     }
868
869     private void markMemberUnavailable(final MemberName memberName) {
870         final String memberStr = memberName.getName();
871         for (ShardInformation info : localShards.values()) {
872             String leaderId = info.getLeaderId();
873             // XXX: why are we using String#contains() here?
874             if (leaderId != null && leaderId.contains(memberStr)) {
875                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
876                 info.setLeaderAvailable(false);
877
878                 primaryShardInfoCache.remove(info.getShardName());
879             }
880
881             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
882         }
883     }
884
885     private void markMemberAvailable(final MemberName memberName) {
886         final String memberStr = memberName.getName();
887         for (ShardInformation info : localShards.values()) {
888             String leaderId = info.getLeaderId();
889             // XXX: why are we using String#contains() here?
890             if (leaderId != null && leaderId.contains(memberStr)) {
891                 LOG.debug("Marking Leader {} as available.", leaderId);
892                 info.setLeaderAvailable(true);
893             }
894
895             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
896         }
897     }
898
899     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
900         datastoreContextFactory = factory;
901         for (ShardInformation info : localShards.values()) {
902             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
903         }
904     }
905
906     private void onGetLocalShardIds() {
907         final List<String> response = new ArrayList<>(localShards.size());
908
909         for (ShardInformation info : localShards.values()) {
910             response.add(info.getShardId().toString());
911         }
912
913         getSender().tell(new Status.Success(response), getSelf());
914     }
915
916     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
917         final ShardIdentifier identifier = message.getShardId();
918
919         if (identifier != null) {
920             final ShardInformation info = localShards.get(identifier.getShardName());
921             if (info == null) {
922                 getSender().tell(new Status.Failure(
923                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
924                 return;
925             }
926
927             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
928         } else {
929             for (ShardInformation info : localShards.values()) {
930                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
931             }
932         }
933
934         getSender().tell(new Status.Success(null), getSelf());
935     }
936
937     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
938         final ActorRef actor = info.getActor();
939         if (actor != null) {
940             actor.tell(switchBehavior, getSelf());
941         } else {
942             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
943                 info.getShardName(), switchBehavior.getNewState());
944         }
945     }
946
947     /**
948      * Notifies all the local shards of a change in the schema context.
949      *
950      * @param message the message to send
951      */
952     private void updateSchemaContext(final Object message) {
953         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
954
955         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
956
957         for (ShardInformation info : localShards.values()) {
958             if (info.getActor() == null) {
959                 LOG.debug("Creating Shard {}", info.getShardId());
960                 info.setActor(newShardActor(schemaContext, info));
961             } else {
962                 info.getActor().tell(message, getSelf());
963             }
964         }
965     }
966
967     @VisibleForTesting
968     protected ClusterWrapper getCluster() {
969         return cluster;
970     }
971
972     @VisibleForTesting
973     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
974         return getContext().actorOf(info.newProps(schemaContext)
975                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
976     }
977
978     private void findPrimary(FindPrimary message) {
979         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
980
981         final String shardName = message.getShardName();
982         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
983
984         // First see if the there is a local replica for the shard
985         final ShardInformation info = localShards.get(shardName);
986         if (info != null && info.isActiveMember()) {
987             sendResponse(info, message.isWaitUntilReady(), true, () -> {
988                 String primaryPath = info.getSerializedLeaderActor();
989                 Object found = canReturnLocalShardState && info.isLeader()
990                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
991                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
992
993                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
994                 return found;
995             });
996
997             return;
998         }
999
1000         final Collection<String> visitedAddresses;
1001         if (message instanceof RemoteFindPrimary) {
1002             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1003         } else {
1004             visitedAddresses = new ArrayList<>(1);
1005         }
1006
1007         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1008
1009         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1010             if (visitedAddresses.contains(address)) {
1011                 continue;
1012             }
1013
1014             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1015                     persistenceId(), shardName, address, visitedAddresses);
1016
1017             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1018                     message.isWaitUntilReady(), visitedAddresses), getContext());
1019             return;
1020         }
1021
1022         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1023
1024         getSender().tell(new PrimaryNotFoundException(
1025                 String.format("No primary shard found for %s.", shardName)), getSelf());
1026     }
1027
1028     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1029         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1030                 .getShardInitializationTimeout().duration().$times(2));
1031
1032         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1033         futureObj.onComplete(new OnComplete<Object>() {
1034             @Override
1035             public void onComplete(Throwable failure, Object response) {
1036                 if (failure != null) {
1037                     handler.onFailure(failure);
1038                 } else {
1039                     if (response instanceof RemotePrimaryShardFound) {
1040                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1041                     } else if (response instanceof LocalPrimaryShardFound) {
1042                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1043                     } else {
1044                         handler.onUnknownResponse(response);
1045                     }
1046                 }
1047             }
1048         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1049     }
1050
1051     /**
1052      * Construct the name of the shard actor given the name of the member on
1053      * which the shard resides and the name of the shard.
1054      *
1055      * @param memberName the member name
1056      * @param shardName the shard name
1057      * @return a b
1058      */
1059     private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
1060         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1061     }
1062
1063     /**
1064      * Create shards that are local to the member on which the ShardManager runs.
1065      */
1066     private void createLocalShards() {
1067         MemberName memberName = this.cluster.getCurrentMemberName();
1068         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
1069
1070         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1071         if (restoreFromSnapshot != null) {
1072             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1073                 shardSnapshots.put(snapshot.getName(), snapshot);
1074             }
1075         }
1076
1077         restoreFromSnapshot = null; // null out to GC
1078
1079         for (String shardName : memberShardNames) {
1080             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1081
1082             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1083
1084             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1085             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
1086                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
1087                         shardSnapshots.get(shardName)), peerAddressResolver));
1088         }
1089     }
1090
1091     /**
1092      * Given the name of the shard find the addresses of all it's peers.
1093      *
1094      * @param shardName the shard name
1095      */
1096     private Map<String, String> getPeerAddresses(String shardName) {
1097         Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1098         Map<String, String> peerAddresses = new HashMap<>();
1099
1100         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1101
1102         for (MemberName memberName : members) {
1103             if (!currentMemberName.equals(memberName)) {
1104                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1105                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1106                 peerAddresses.put(shardId.toString(), address);
1107             }
1108         }
1109         return peerAddresses;
1110     }
1111
1112     @Override
1113     public SupervisorStrategy supervisorStrategy() {
1114
1115         return new OneForOneStrategy(10, Duration.create("1 minute"),
1116                 (Function<Throwable, Directive>) t -> {
1117                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1118                     return SupervisorStrategy.resume();
1119                 });
1120     }
1121
1122     @Override
1123     public String persistenceId() {
1124         return persistenceId;
1125     }
1126
1127     @VisibleForTesting
1128     ShardManagerInfoMBean getMBean() {
1129         return shardManagerMBean;
1130     }
1131
1132     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1133         if (shardReplicaOperationsInProgress.contains(shardName)) {
1134             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1135             LOG.debug("{}: {}", persistenceId(), msg);
1136             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1137             return true;
1138         }
1139
1140         return false;
1141     }
1142
1143
1144     // With this message the shard does NOT have to be preconfigured
1145     // do a dynamic lookup if the shard exists somewhere and replicate
1146     private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
1147         final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
1148
1149         LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
1150
1151         if (schemaContext == null) {
1152             final String msg = String.format(
1153                     "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1154             LOG.debug("{}: {}", persistenceId(), msg);
1155             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1156             return;
1157         }
1158
1159         findPrimary(shardName,
1160                 new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1161                     @Override
1162                     public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1163                         getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
1164                                 getTargetActor());
1165                     }
1166
1167                     @Override
1168                     public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
1169                         sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1170                     }
1171                 }
1172         );
1173     }
1174
1175     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1176         final String shardName = shardReplicaMsg.getShardName();
1177
1178         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1179
1180         // verify the shard with the specified name is present in the cluster configuration
1181         if (!this.configuration.isShardConfigured(shardName)) {
1182             String msg = String.format("No module configuration exists for shard %s", shardName);
1183             LOG.debug("{}: {}", persistenceId(), msg);
1184             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1185             return;
1186         }
1187
1188         // Create the localShard
1189         if (schemaContext == null) {
1190             String msg = String.format(
1191                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1192             LOG.debug("{}: {}", persistenceId(), msg);
1193             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1194             return;
1195         }
1196
1197         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1198                 getSelf()) {
1199             @Override
1200             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1201                 getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
1202                         getTargetActor());
1203             }
1204
1205             @Override
1206             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1207                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1208             }
1209
1210         });
1211     }
1212
1213     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1214         String msg = String.format("Local shard %s already exists", shardName);
1215         LOG.debug("{}: {}", persistenceId(), msg);
1216         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1217     }
1218
1219     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1220         if (isShardReplicaOperationInProgress(shardName, sender)) {
1221             return;
1222         }
1223
1224         shardReplicaOperationsInProgress.add(shardName);
1225
1226         final ShardInformation shardInfo;
1227         final boolean removeShardOnFailure;
1228         ShardInformation existingShardInfo = localShards.get(shardName);
1229         if (existingShardInfo == null) {
1230             removeShardOnFailure = true;
1231             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1232
1233             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1234                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1235
1236             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1237                     Shard.builder(), peerAddressResolver);
1238             shardInfo.setActiveMember(false);
1239             localShards.put(shardName, shardInfo);
1240             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1241         } else {
1242             removeShardOnFailure = false;
1243             shardInfo = existingShardInfo;
1244         }
1245
1246         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1247
1248         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1249         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1250                 response.getPrimaryPath(), shardInfo.getShardId());
1251
1252         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1253                 .getShardLeaderElectionTimeout().duration());
1254         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1255             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1256
1257         futureObj.onComplete(new OnComplete<Object>() {
1258             @Override
1259             public void onComplete(Throwable failure, Object addServerResponse) {
1260                 if (failure != null) {
1261                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1262                             response.getPrimaryPath(), shardName, failure);
1263
1264                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1265                             response.getPrimaryPath(), shardName);
1266                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1267                 } else {
1268                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1269                             response.getPrimaryPath(), removeShardOnFailure), sender);
1270                 }
1271             }
1272         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1273     }
1274
1275     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1276             boolean removeShardOnFailure) {
1277         shardReplicaOperationsInProgress.remove(shardName);
1278
1279         if (removeShardOnFailure) {
1280             ShardInformation shardInfo = localShards.remove(shardName);
1281             if (shardInfo.getActor() != null) {
1282                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1283             }
1284         }
1285
1286         sender.tell(new Status.Failure(message == null ? failure :
1287             new RuntimeException(message, failure)), getSelf());
1288     }
1289
1290     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1291             String leaderPath, boolean removeShardOnFailure) {
1292         String shardName = shardInfo.getShardName();
1293         shardReplicaOperationsInProgress.remove(shardName);
1294
1295         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1296
1297         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1298             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1299
1300             // Make the local shard voting capable
1301             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1302             shardInfo.setActiveMember(true);
1303             persistShardList();
1304
1305             sender.tell(new Status.Success(null), getSelf());
1306         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1307             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1308         } else {
1309             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1310                     persistenceId(), shardName, replyMsg.getStatus());
1311
1312             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1313                     shardInfo.getShardId());
1314
1315             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1316         }
1317     }
1318
1319     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1320                                                String leaderPath, ShardIdentifier shardId) {
1321         Exception failure;
1322         switch (serverChangeStatus) {
1323             case TIMEOUT:
1324                 failure = new TimeoutException(String.format(
1325                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
1326                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
1327                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
1328                 break;
1329             case NO_LEADER:
1330                 failure = createNoShardLeaderException(shardId);
1331                 break;
1332             case NOT_SUPPORTED:
1333                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1334                         serverChange.getSimpleName(), shardId.getShardName()));
1335                 break;
1336             default :
1337                 failure = new RuntimeException(String.format(
1338                         "%s request to leader %s for shard %s failed with status %s",
1339                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1340         }
1341         return failure;
1342     }
1343
1344     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1345         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1346
1347         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1348                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1349             @Override
1350             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1351                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1352             }
1353
1354             @Override
1355             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1356                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1357             }
1358
1359             private void doRemoveShardReplicaAsync(final String primaryPath) {
1360                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1361                         primaryPath, getSender()), getTargetActor());
1362             }
1363         });
1364     }
1365
1366     private void persistShardList() {
1367         List<String> shardList = new ArrayList<>(localShards.keySet());
1368         for (ShardInformation shardInfo : localShards.values()) {
1369             if (!shardInfo.isActiveMember()) {
1370                 shardList.remove(shardInfo.getShardName());
1371             }
1372         }
1373         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1374         saveSnapshot(updateShardManagerSnapshot(shardList));
1375     }
1376
1377     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1378         currentSnapshot = new ShardManagerSnapshot(shardList);
1379         return currentSnapshot;
1380     }
1381
1382     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1383         currentSnapshot = snapshot;
1384
1385         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1386
1387         final MemberName currentMember = cluster.getCurrentMemberName();
1388         Set<String> configuredShardList =
1389             new HashSet<>(configuration.getMemberShardNames(currentMember));
1390         for (String shard : currentSnapshot.getShardList()) {
1391             if (!configuredShardList.contains(shard)) {
1392                 // add the current member as a replica for the shard
1393                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1394                 configuration.addMemberReplicaForShard(shard, currentMember);
1395             } else {
1396                 configuredShardList.remove(shard);
1397             }
1398         }
1399         for (String shard : configuredShardList) {
1400             // remove the member as a replica for the shard
1401             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1402             configuration.removeMemberReplicaForShard(shard, currentMember);
1403         }
1404     }
1405
1406     private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
1407         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1408             persistenceId());
1409         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1410             0, 0));
1411     }
1412
1413     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1414         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1415
1416         String shardName = changeMembersVotingStatus.getShardName();
1417         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1418         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1419             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1420                     e.getValue());
1421         }
1422
1423         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1424
1425         findLocalShard(shardName, getSender(),
1426             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1427             localShardFound.getPath(), getSender()));
1428     }
1429
1430     private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
1431         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1432
1433         ActorRef sender = getSender();
1434         final String shardName = flipMembersVotingStatus.getShardName();
1435         findLocalShard(shardName, sender, localShardFound -> {
1436             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1437                     Timeout.apply(30, TimeUnit.SECONDS));
1438
1439             future.onComplete(new OnComplete<Object>() {
1440                 @Override
1441                 public void onComplete(Throwable failure, Object response) {
1442                     if (failure != null) {
1443                         sender.tell(new Status.Failure(new RuntimeException(
1444                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1445                         return;
1446                     }
1447
1448                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1449                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1450                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1451                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1452                     }
1453
1454                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1455                             .toString(), !raftState.isVoting());
1456
1457                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1458                             shardName, localShardFound.getPath(), sender);
1459                 }
1460             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1461         });
1462
1463     }
1464
1465     private void findLocalShard(FindLocalShard message) {
1466         final ShardInformation shardInformation = localShards.get(message.getShardName());
1467
1468         if (shardInformation == null) {
1469             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1470             return;
1471         }
1472
1473         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1474             () -> new LocalShardFound(shardInformation.getActor()));
1475     }
1476
1477     private void findLocalShard(final String shardName, final ActorRef sender,
1478             final Consumer<LocalShardFound> onLocalShardFound) {
1479         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1480                 .getShardInitializationTimeout().duration().$times(2));
1481
1482         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1483         futureObj.onComplete(new OnComplete<Object>() {
1484             @Override
1485             public void onComplete(Throwable failure, Object response) {
1486                 if (failure != null) {
1487                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1488                             failure);
1489                     sender.tell(new Status.Failure(new RuntimeException(
1490                             String.format("Failed to find local shard %s", shardName), failure)), self());
1491                 } else {
1492                     if (response instanceof LocalShardFound) {
1493                         getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
1494                                 sender);
1495                     } else if (response instanceof LocalShardNotFound) {
1496                         String msg = String.format("Local shard %s does not exist", shardName);
1497                         LOG.debug("{}: {}", persistenceId, msg);
1498                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
1499                     } else {
1500                         String msg = String.format("Failed to find local shard %s: received response: %s",
1501                                 shardName, response);
1502                         LOG.debug("{}: {}", persistenceId, msg);
1503                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1504                                 new RuntimeException(msg)), self());
1505                     }
1506                 }
1507             }
1508         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1509     }
1510
1511     private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
1512             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1513         if (isShardReplicaOperationInProgress(shardName, sender)) {
1514             return;
1515         }
1516
1517         shardReplicaOperationsInProgress.add(shardName);
1518
1519         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1520         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1521
1522         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1523                 changeServersVotingStatus, shardActorRef.path());
1524
1525         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1526         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1527
1528         futureObj.onComplete(new OnComplete<Object>() {
1529             @Override
1530             public void onComplete(Throwable failure, Object response) {
1531                 shardReplicaOperationsInProgress.remove(shardName);
1532                 if (failure != null) {
1533                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
1534                             shardActorRef.path());
1535                     LOG.debug("{}: {}", persistenceId(), msg, failure);
1536                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
1537                 } else {
1538                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1539
1540                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1541                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1542                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1543                         sender.tell(new Status.Success(null), getSelf());
1544                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1545                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1546                                 "The requested voting state change for shard %s is invalid. At least one member "
1547                                 + "must be voting", shardId.getShardName()))), getSelf());
1548                     } else {
1549                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1550                                 persistenceId(), shardName, replyMsg.getStatus());
1551
1552                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1553                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1554                         sender.tell(new Status.Failure(error), getSelf());
1555                     }
1556                 }
1557             }
1558         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1559     }
1560
1561     private static final class ForwardedAddServerReply {
1562         ShardInformation shardInfo;
1563         AddServerReply addServerReply;
1564         String leaderPath;
1565         boolean removeShardOnFailure;
1566
1567         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1568                 boolean removeShardOnFailure) {
1569             this.shardInfo = shardInfo;
1570             this.addServerReply = addServerReply;
1571             this.leaderPath = leaderPath;
1572             this.removeShardOnFailure = removeShardOnFailure;
1573         }
1574     }
1575
1576     private static final class ForwardedAddServerFailure {
1577         String shardName;
1578         String failureMessage;
1579         Throwable failure;
1580         boolean removeShardOnFailure;
1581
1582         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1583                 boolean removeShardOnFailure) {
1584             this.shardName = shardName;
1585             this.failureMessage = failureMessage;
1586             this.failure = failure;
1587             this.removeShardOnFailure = removeShardOnFailure;
1588         }
1589     }
1590
1591     static class OnShardInitialized {
1592         private final Runnable replyRunnable;
1593         private Cancellable timeoutSchedule;
1594
1595         OnShardInitialized(Runnable replyRunnable) {
1596             this.replyRunnable = replyRunnable;
1597         }
1598
1599         Runnable getReplyRunnable() {
1600             return replyRunnable;
1601         }
1602
1603         Cancellable getTimeoutSchedule() {
1604             return timeoutSchedule;
1605         }
1606
1607         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1608             this.timeoutSchedule = timeoutSchedule;
1609         }
1610     }
1611
1612     static class OnShardReady extends OnShardInitialized {
1613         OnShardReady(Runnable replyRunnable) {
1614             super(replyRunnable);
1615         }
1616     }
1617
1618     private interface RunnableMessage extends Runnable {
1619     }
1620
1621     /**
1622      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1623      * a remote or local find primary message is processed.
1624      */
1625     private interface FindPrimaryResponseHandler {
1626         /**
1627          * Invoked when a Failure message is received as a response.
1628          *
1629          * @param failure the failure exception
1630          */
1631         void onFailure(Throwable failure);
1632
1633         /**
1634          * Invoked when a RemotePrimaryShardFound response is received.
1635          *
1636          * @param response the response
1637          */
1638         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1639
1640         /**
1641          * Invoked when a LocalPrimaryShardFound response is received.
1642          *
1643          * @param response the response
1644          */
1645         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1646
1647         /**
1648          * Invoked when an unknown response is received. This is another type of failure.
1649          *
1650          * @param response the response
1651          */
1652         void onUnknownResponse(Object response);
1653     }
1654
1655     /**
1656      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1657      * replica and sends a wrapped Failure response to some targetActor.
1658      */
1659     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1660         private final ActorRef targetActor;
1661         private final String shardName;
1662         private final String persistenceId;
1663         private final ActorRef shardManagerActor;
1664
1665         /**
1666          * Constructs an instance.
1667          *
1668          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1669          * @param shardName The name of the shard for which the primary replica had to be found
1670          * @param persistenceId The persistenceId for the ShardManager
1671          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1672          */
1673         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
1674                 ActorRef shardManagerActor) {
1675             this.targetActor = Preconditions.checkNotNull(targetActor);
1676             this.shardName = Preconditions.checkNotNull(shardName);
1677             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1678             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1679         }
1680
1681         public ActorRef getTargetActor() {
1682             return targetActor;
1683         }
1684
1685         public String getShardName() {
1686             return shardName;
1687         }
1688
1689         @Override
1690         public void onFailure(Throwable failure) {
1691             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1692             targetActor.tell(new Status.Failure(new RuntimeException(
1693                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1694         }
1695
1696         @Override
1697         public void onUnknownResponse(Object response) {
1698             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1699                     shardName, response);
1700             LOG.debug("{}: {}", persistenceId, msg);
1701             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1702                     new RuntimeException(msg)), shardManagerActor);
1703         }
1704     }
1705
1706     /**
1707      * The WrappedShardResponse class wraps a response from a Shard.
1708      */
1709     private static final class WrappedShardResponse {
1710         private final ShardIdentifier shardId;
1711         private final Object response;
1712         private final String leaderPath;
1713
1714         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1715             this.shardId = shardId;
1716             this.response = response;
1717             this.leaderPath = leaderPath;
1718         }
1719
1720         ShardIdentifier getShardId() {
1721             return shardId;
1722         }
1723
1724         Object getResponse() {
1725             return response;
1726         }
1727
1728         String getLeaderPath() {
1729             return leaderPath;
1730         }
1731     }
1732
1733     private static final class ShardNotInitializedTimeout {
1734         private final ActorRef sender;
1735         private final ShardInformation shardInfo;
1736         private final OnShardInitialized onShardInitialized;
1737
1738         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1739             this.sender = sender;
1740             this.shardInfo = shardInfo;
1741             this.onShardInitialized = onShardInitialized;
1742         }
1743
1744         ActorRef getSender() {
1745             return sender;
1746         }
1747
1748         ShardInformation getShardInfo() {
1749             return shardInfo;
1750         }
1751
1752         OnShardInitialized getOnShardInitialized() {
1753             return onShardInitialized;
1754         }
1755     }
1756 }
1757
1758
1759