Checkstyle: fix ParenPad violations
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.actor.SupervisorStrategy.Directive;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberWeaklyUp;
23 import akka.cluster.Member;
24 import akka.dispatch.Futures;
25 import akka.dispatch.OnComplete;
26 import akka.japi.Function;
27 import akka.pattern.Patterns;
28 import akka.persistence.RecoveryCompleted;
29 import akka.persistence.SaveSnapshotFailure;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.persistence.SnapshotOffer;
32 import akka.persistence.SnapshotSelectionCriteria;
33 import akka.util.Timeout;
34 import com.google.common.annotations.VisibleForTesting;
35 import com.google.common.base.Preconditions;
36 import java.io.ByteArrayInputStream;
37 import java.io.IOException;
38 import java.io.ObjectInputStream;
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Map.Entry;
47 import java.util.Set;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import java.util.function.Consumer;
52 import java.util.function.Supplier;
53 import org.apache.commons.lang3.SerializationUtils;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
56 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
57 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
58 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
59 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
60 import org.opendaylight.controller.cluster.datastore.Shard;
61 import org.opendaylight.controller.cluster.datastore.config.Configuration;
62 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
63 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
64 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
65 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
66 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
67 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
68 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
69 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
70 import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
71 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
72 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
73 import org.opendaylight.controller.cluster.datastore.messages.CreatePrefixedShard;
74 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
75 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
76 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
77 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
78 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
79 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
80 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
81 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
82 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
83 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
84 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
85 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
86 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
87 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
88 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
89 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
90 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
91 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
92 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
93 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
94 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
95 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
96 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
97 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
98 import org.opendaylight.controller.cluster.raft.messages.AddServer;
99 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
100 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
101 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
102 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
103 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
104 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
105 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
106 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
107 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
108 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
109 import org.slf4j.Logger;
110 import org.slf4j.LoggerFactory;
111 import scala.concurrent.ExecutionContext;
112 import scala.concurrent.Future;
113 import scala.concurrent.duration.Duration;
114 import scala.concurrent.duration.FiniteDuration;
115
116 /**
117  * Manages the shards for a data store. The ShardManager has the following jobs:
118  * <ul>
119  * <li> Create all the local shard replicas that belong on this cluster member
120  * <li> Find the address of the local shard
121  * <li> Find the primary replica for any given shard
122  * <li> Monitor the cluster members and store their addresses
123  * </ul>
124  */
125 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
126     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
127
128     // Stores a mapping between a shard name and it's corresponding information
129     // Shard names look like inventory, topology etc and are as specified in
130     // configuration
131     private final Map<String, ShardInformation> localShards = new HashMap<>();
132
133     // The type of a ShardManager reflects the type of the datastore itself
134     // A data store could be of type config/operational
135     private final String type;
136
137     private final ClusterWrapper cluster;
138
139     private final Configuration configuration;
140
141     private final String shardDispatcherPath;
142
143     private final ShardManagerInfo shardManagerMBean;
144
145     private DatastoreContextFactory datastoreContextFactory;
146
147     private final CountDownLatch waitTillReadyCountdownLatch;
148
149     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
150
151     private final ShardPeerAddressResolver peerAddressResolver;
152
153     private SchemaContext schemaContext;
154
155     private DatastoreSnapshot restoreFromSnapshot;
156
157     private ShardManagerSnapshot currentSnapshot;
158
159     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
160
161     private final String persistenceId;
162
163     ShardManager(AbstractShardManagerCreator<?> builder) {
164         this.cluster = builder.getCluster();
165         this.configuration = builder.getConfiguration();
166         this.datastoreContextFactory = builder.getDatastoreContextFactory();
167         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
168         this.shardDispatcherPath =
169                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
170         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
171         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
172         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
173
174         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
175         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
176
177         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
178
179         // Subscribe this actor to cluster member events
180         cluster.subscribeToMemberEvents(getSelf());
181
182         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
183                 "shard-manager-" + this.type,
184                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
185         shardManagerMBean.registerMBean();
186     }
187
188     @Override
189     public void postStop() {
190         LOG.info("Stopping ShardManager {}", persistenceId());
191
192         shardManagerMBean.unregisterMBean();
193     }
194
195     @Override
196     public void handleCommand(Object message) throws Exception {
197         if (message  instanceof FindPrimary) {
198             findPrimary((FindPrimary)message);
199         } else if (message instanceof FindLocalShard) {
200             findLocalShard((FindLocalShard) message);
201         } else if (message instanceof UpdateSchemaContext) {
202             updateSchemaContext(message);
203         } else if (message instanceof ActorInitialized) {
204             onActorInitialized(message);
205         } else if (message instanceof ClusterEvent.MemberUp) {
206             memberUp((ClusterEvent.MemberUp) message);
207         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
208             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
209         } else if (message instanceof ClusterEvent.MemberExited) {
210             memberExited((ClusterEvent.MemberExited) message);
211         } else if (message instanceof ClusterEvent.MemberRemoved) {
212             memberRemoved((ClusterEvent.MemberRemoved) message);
213         } else if (message instanceof ClusterEvent.UnreachableMember) {
214             memberUnreachable((ClusterEvent.UnreachableMember) message);
215         } else if (message instanceof ClusterEvent.ReachableMember) {
216             memberReachable((ClusterEvent.ReachableMember) message);
217         } else if (message instanceof DatastoreContextFactory) {
218             onDatastoreContextFactory((DatastoreContextFactory) message);
219         } else if (message instanceof RoleChangeNotification) {
220             onRoleChangeNotification((RoleChangeNotification) message);
221         } else if (message instanceof FollowerInitialSyncUpStatus) {
222             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
223         } else if (message instanceof ShardNotInitializedTimeout) {
224             onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
225         } else if (message instanceof ShardLeaderStateChanged) {
226             onLeaderStateChanged((ShardLeaderStateChanged) message);
227         } else if (message instanceof SwitchShardBehavior) {
228             onSwitchShardBehavior((SwitchShardBehavior) message);
229         } else if (message instanceof CreateShard) {
230             onCreateShard((CreateShard)message);
231         } else if (message instanceof AddShardReplica) {
232             onAddShardReplica((AddShardReplica) message);
233         } else if (message instanceof CreatePrefixedShard) {
234             onCreatePrefixedShard((CreatePrefixedShard) message);
235         } else if (message instanceof AddPrefixShardReplica) {
236             onAddPrefixShardReplica((AddPrefixShardReplica) message);
237         } else if (message instanceof ForwardedAddServerReply) {
238             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
239             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
240                     msg.removeShardOnFailure);
241         } else if (message instanceof ForwardedAddServerFailure) {
242             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
243             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
244         } else if (message instanceof RemoveShardReplica) {
245             onRemoveShardReplica((RemoveShardReplica) message);
246         } else if (message instanceof WrappedShardResponse) {
247             onWrappedShardResponse((WrappedShardResponse) message);
248         } else if (message instanceof GetSnapshot) {
249             onGetSnapshot();
250         } else if (message instanceof ServerRemoved) {
251             onShardReplicaRemoved((ServerRemoved) message);
252         } else if (message instanceof ChangeShardMembersVotingStatus) {
253             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
254         } else if (message instanceof FlipShardMembersVotingStatus) {
255             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
256         } else if (message instanceof SaveSnapshotSuccess) {
257             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
258         } else if (message instanceof SaveSnapshotFailure) {
259             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
260                     ((SaveSnapshotFailure) message).cause());
261         } else if (message instanceof Shutdown) {
262             onShutDown();
263         } else if (message instanceof GetLocalShardIds) {
264             onGetLocalShardIds();
265         } else if (message instanceof RunnableMessage) {
266             ((RunnableMessage)message).run();
267         } else {
268             unknownMessage(message);
269         }
270     }
271
272     private void onShutDown() {
273         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
274         for (ShardInformation info : localShards.values()) {
275             if (info.getActor() != null) {
276                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
277
278                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
279                         .getElectionTimeOutInterval().$times(2);
280                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
281             }
282         }
283
284         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
285
286         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
287                 .getDispatcher(Dispatchers.DispatcherType.Client);
288         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
289
290         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
291             @Override
292             public void onComplete(Throwable failure, Iterable<Boolean> results) {
293                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
294
295                 self().tell(PoisonPill.getInstance(), self());
296
297                 if (failure != null) {
298                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
299                 } else {
300                     int nfailed = 0;
301                     for (Boolean result : results) {
302                         if (!result) {
303                             nfailed++;
304                         }
305                     }
306
307                     if (nfailed > 0) {
308                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
309                     }
310                 }
311             }
312         }, dispatcher);
313     }
314
315     private void onWrappedShardResponse(WrappedShardResponse message) {
316         if (message.getResponse() instanceof RemoveServerReply) {
317             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
318                     message.getLeaderPath());
319         }
320     }
321
322     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
323             String leaderPath) {
324         shardReplicaOperationsInProgress.remove(shardId.getShardName());
325
326         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
327
328         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
329             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
330                     shardId.getShardName());
331             originalSender.tell(new Status.Success(null), getSelf());
332         } else {
333             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
334                     persistenceId(), shardId, replyMsg.getStatus());
335
336             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
337             originalSender.tell(new Status.Failure(failure), getSelf());
338         }
339     }
340
341     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
342             final ActorRef sender) {
343         if (isShardReplicaOperationInProgress(shardName, sender)) {
344             return;
345         }
346
347         shardReplicaOperationsInProgress.add(shardName);
348
349         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
350
351         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
352
353         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
354         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
355                 primaryPath, shardId);
356
357         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
358         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
359                 new RemoveServer(shardId.toString()), removeServerTimeout);
360
361         futureObj.onComplete(new OnComplete<Object>() {
362             @Override
363             public void onComplete(Throwable failure, Object response) {
364                 if (failure != null) {
365                     shardReplicaOperationsInProgress.remove(shardName);
366                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
367                             primaryPath, shardName);
368
369                     LOG.debug("{}: {}", persistenceId(), msg, failure);
370
371                     // FAILURE
372                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
373                 } else {
374                     // SUCCESS
375                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
376                 }
377             }
378         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
379     }
380
381     private void onShardReplicaRemoved(ServerRemoved message) {
382         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
383         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
384         if (shardInformation == null) {
385             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
386             return;
387         } else if (shardInformation.getActor() != null) {
388             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
389             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
390         }
391         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
392         persistShardList();
393     }
394
395     private void onGetSnapshot() {
396         LOG.debug("{}: onGetSnapshot", persistenceId());
397
398         List<String> notInitialized = null;
399         for (ShardInformation shardInfo : localShards.values()) {
400             if (!shardInfo.isShardInitialized()) {
401                 if (notInitialized == null) {
402                     notInitialized = new ArrayList<>();
403                 }
404
405                 notInitialized.add(shardInfo.getShardName());
406             }
407         }
408
409         if (notInitialized != null) {
410             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
411                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
412             return;
413         }
414
415         byte[] shardManagerSnapshot = null;
416         if (currentSnapshot != null) {
417             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
418         }
419
420         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
421                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
422                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
423
424         for (ShardInformation shardInfo: localShards.values()) {
425             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
426         }
427     }
428
429     @SuppressWarnings("checkstyle:IllegalCatch")
430     private void onCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
431         LOG.debug("{}: onCreatePrefixedShard: {}", persistenceId(), createPrefixedShard);
432
433         Object reply;
434         try {
435             final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
436                     createPrefixedShard.getConfig().getPrefix());
437             if (localShards.containsKey(shardId.getShardName())) {
438                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardId);
439                 reply = new Status.Success(String.format("Shard with name %s already exists", shardId));
440             } else {
441                 doCreatePrefixedShard(createPrefixedShard);
442                 reply = new Status.Success(null);
443             }
444         } catch (final Exception e) {
445             LOG.error("{}: onCreateShard failed", persistenceId(), e);
446             reply = new Status.Failure(e);
447         }
448
449         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
450             getSender().tell(reply, getSelf());
451         }
452     }
453
454     @SuppressWarnings("checkstyle:IllegalCatch")
455     private void onCreateShard(CreateShard createShard) {
456         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
457
458         Object reply;
459         try {
460             String shardName = createShard.getModuleShardConfig().getShardName();
461             if (localShards.containsKey(shardName)) {
462                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
463                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
464             } else {
465                 doCreateShard(createShard);
466                 reply = new Status.Success(null);
467             }
468         } catch (Exception e) {
469             LOG.error("{}: onCreateShard failed", persistenceId(), e);
470             reply = new Status.Failure(e);
471         }
472
473         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
474             getSender().tell(reply, getSelf());
475         }
476     }
477
478     private void doCreatePrefixedShard(final CreatePrefixedShard createPrefixedShard) {
479         final PrefixShardConfiguration config = createPrefixedShard.getConfig();
480
481         final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
482                 createPrefixedShard.getConfig().getPrefix());
483         final String shardName = shardId.getShardName();
484
485         configuration.addPrefixShardConfiguration(config);
486
487         DatastoreContext shardDatastoreContext = createPrefixedShard.getContext();
488
489         if (shardDatastoreContext == null) {
490             final Builder builder = newShardDatastoreContextBuilder(shardName);
491             builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
492                     .storeRoot(config.getPrefix().getRootIdentifier());
493             shardDatastoreContext = builder.build();
494         } else {
495             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
496                     peerAddressResolver).build();
497         }
498
499         final boolean shardWasInRecoveredSnapshot = currentSnapshot != null
500                 && currentSnapshot.getShardList().contains(shardName);
501
502         final Map<String, String> peerAddresses = Collections.emptyMap();
503         final boolean isActiveMember = true;
504         LOG.debug("{} doCreatePrefixedShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
505                 persistenceId(), shardId, peerAddresses, isActiveMember);
506
507         final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
508                 shardDatastoreContext, createPrefixedShard.getShardBuilder(), peerAddressResolver);
509         info.setActiveMember(isActiveMember);
510         localShards.put(info.getShardName(), info);
511
512         if (schemaContext != null) {
513             info.setActor(newShardActor(schemaContext, info));
514         }
515     }
516
517     private void doCreateShard(final CreateShard createShard) {
518         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
519         final String shardName = moduleShardConfig.getShardName();
520
521         configuration.addModuleShardConfiguration(moduleShardConfig);
522
523         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
524         if (shardDatastoreContext == null) {
525             shardDatastoreContext = newShardDatastoreContext(shardName);
526         } else {
527             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
528                     peerAddressResolver).build();
529         }
530
531         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
532
533         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
534                 && currentSnapshot.getShardList().contains(shardName);
535
536         Map<String, String> peerAddresses;
537         boolean isActiveMember;
538         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
539                 .contains(cluster.getCurrentMemberName())) {
540             peerAddresses = getPeerAddresses(shardName);
541             isActiveMember = true;
542         } else {
543             // The local member is not in the static shard member configuration and the shard did not
544             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
545             // the shard with no peers and with elections disabled so it stays as follower. A
546             // subsequent AddServer request will be needed to make it an active member.
547             isActiveMember = false;
548             peerAddresses = Collections.emptyMap();
549             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
550                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
551         }
552
553         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
554                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
555                 isActiveMember);
556
557         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
558                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
559         info.setActiveMember(isActiveMember);
560         localShards.put(info.getShardName(), info);
561
562         if (schemaContext != null) {
563             info.setActor(newShardActor(schemaContext, info));
564         }
565     }
566
567     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
568         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
569                 .shardPeerAddressResolver(peerAddressResolver);
570     }
571
572     private DatastoreContext newShardDatastoreContext(String shardName) {
573         return newShardDatastoreContextBuilder(shardName).build();
574     }
575
576     private void checkReady() {
577         if (isReadyWithLeaderId()) {
578             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
579                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
580
581             waitTillReadyCountdownLatch.countDown();
582         }
583     }
584
585     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
586         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
587
588         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
589         if (shardInformation != null) {
590             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
591             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
592             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
593                 primaryShardInfoCache.remove(shardInformation.getShardName());
594             }
595
596             checkReady();
597         } else {
598             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
599         }
600     }
601
602     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
603         ShardInformation shardInfo = message.getShardInfo();
604
605         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
606                 shardInfo.getShardName());
607
608         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
609
610         if (!shardInfo.isShardInitialized()) {
611             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
612             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
613         } else {
614             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
615             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
616         }
617     }
618
619     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
620         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
621                 status.getName(), status.isInitialSyncDone());
622
623         ShardInformation shardInformation = findShardInformation(status.getName());
624
625         if (shardInformation != null) {
626             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
627
628             shardManagerMBean.setSyncStatus(isInSync());
629         }
630
631     }
632
633     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
634         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
635                 roleChanged.getOldRole(), roleChanged.getNewRole());
636
637         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
638         if (shardInformation != null) {
639             shardInformation.setRole(roleChanged.getNewRole());
640             checkReady();
641             shardManagerMBean.setSyncStatus(isInSync());
642         }
643     }
644
645
646     private ShardInformation findShardInformation(String memberId) {
647         for (ShardInformation info : localShards.values()) {
648             if (info.getShardId().toString().equals(memberId)) {
649                 return info;
650             }
651         }
652
653         return null;
654     }
655
656     private boolean isReadyWithLeaderId() {
657         boolean isReady = true;
658         for (ShardInformation info : localShards.values()) {
659             if (!info.isShardReadyWithLeaderId()) {
660                 isReady = false;
661                 break;
662             }
663         }
664         return isReady;
665     }
666
667     private boolean isInSync() {
668         for (ShardInformation info : localShards.values()) {
669             if (!info.isInSync()) {
670                 return false;
671             }
672         }
673         return true;
674     }
675
676     private void onActorInitialized(Object message) {
677         final ActorRef sender = getSender();
678
679         if (sender == null) {
680             return; //why is a non-actor sending this message? Just ignore.
681         }
682
683         String actorName = sender.path().name();
684         //find shard name from actor name; actor name is stringified shardId
685
686         final ShardIdentifier shardId;
687         try {
688             shardId = ShardIdentifier.fromShardIdString(actorName);
689         } catch (IllegalArgumentException e) {
690             LOG.debug("{}: ignoring actor {}", actorName, e);
691             return;
692         }
693
694         markShardAsInitialized(shardId.getShardName());
695     }
696
697     private void markShardAsInitialized(String shardName) {
698         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
699
700         ShardInformation shardInformation = localShards.get(shardName);
701         if (shardInformation != null) {
702             shardInformation.setActorInitialized();
703
704             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
705         }
706     }
707
708     @Override
709     protected void handleRecover(Object message) throws Exception {
710         if (message instanceof RecoveryCompleted) {
711             onRecoveryCompleted();
712         } else if (message instanceof SnapshotOffer) {
713             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
714         }
715     }
716
717     @SuppressWarnings("checkstyle:IllegalCatch")
718     private void onRecoveryCompleted() {
719         LOG.info("Recovery complete : {}", persistenceId());
720
721         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
722         // journal on upgrade from Helium.
723         deleteMessages(lastSequenceNr());
724
725         if (currentSnapshot == null && restoreFromSnapshot != null
726                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
727             try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
728                     restoreFromSnapshot.getShardManagerSnapshot()))) {
729                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
730
731                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
732
733                 applyShardManagerSnapshot(snapshot);
734             } catch (ClassNotFoundException | IOException e) {
735                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
736             }
737         }
738
739         createLocalShards();
740     }
741
742     private void sendResponse(ShardInformation shardInformation, boolean doWait,
743             boolean wantShardReady, final Supplier<Object> messageSupplier) {
744         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
745             if (doWait) {
746                 final ActorRef sender = getSender();
747                 final ActorRef self = self();
748
749                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
750
751                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
752                     new OnShardInitialized(replyRunnable);
753
754                 shardInformation.addOnShardInitialized(onShardInitialized);
755
756                 FiniteDuration timeout = shardInformation.getDatastoreContext()
757                         .getShardInitializationTimeout().duration();
758                 if (shardInformation.isShardInitialized()) {
759                     // If the shard is already initialized then we'll wait enough time for the shard to
760                     // elect a leader, ie 2 times the election timeout.
761                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
762                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
763                 }
764
765                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
766                         shardInformation.getShardName());
767
768                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
769                         timeout, getSelf(),
770                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
771                         getContext().dispatcher(), getSelf());
772
773                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
774
775             } else if (!shardInformation.isShardInitialized()) {
776                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
777                         shardInformation.getShardName());
778                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
779             } else {
780                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
781                         shardInformation.getShardName());
782                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
783             }
784
785             return;
786         }
787
788         getSender().tell(messageSupplier.get(), getSelf());
789     }
790
791     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
792         return new NoShardLeaderException(null, shardId.toString());
793     }
794
795     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
796         return new NotInitializedException(String.format(
797                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
798     }
799
800     @VisibleForTesting
801     static MemberName memberToName(final Member member) {
802         return MemberName.forName(member.roles().iterator().next());
803     }
804
805     private void memberRemoved(ClusterEvent.MemberRemoved message) {
806         MemberName memberName = memberToName(message.member());
807
808         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
809                 message.member().address());
810
811         peerAddressResolver.removePeerAddress(memberName);
812
813         for (ShardInformation info : localShards.values()) {
814             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
815         }
816     }
817
818     private void memberExited(ClusterEvent.MemberExited message) {
819         MemberName memberName = memberToName(message.member());
820
821         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
822                 message.member().address());
823
824         peerAddressResolver.removePeerAddress(memberName);
825
826         for (ShardInformation info : localShards.values()) {
827             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
828         }
829     }
830
831     private void memberUp(ClusterEvent.MemberUp message) {
832         MemberName memberName = memberToName(message.member());
833
834         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
835                 message.member().address());
836
837         memberUp(memberName, message.member().address());
838     }
839
840     private void memberUp(MemberName memberName, Address address) {
841         addPeerAddress(memberName, address);
842         checkReady();
843     }
844
845     private void memberWeaklyUp(MemberWeaklyUp message) {
846         MemberName memberName = memberToName(message.member());
847
848         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
849                 message.member().address());
850
851         memberUp(memberName, message.member().address());
852     }
853
854     private void addPeerAddress(MemberName memberName, Address address) {
855         peerAddressResolver.addPeerAddress(memberName, address);
856
857         for (ShardInformation info : localShards.values()) {
858             String shardName = info.getShardName();
859             String peerId = getShardIdentifier(memberName, shardName).toString();
860             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
861
862             info.peerUp(memberName, peerId, getSelf());
863         }
864     }
865
866     private void memberReachable(ClusterEvent.ReachableMember message) {
867         MemberName memberName = memberToName(message.member());
868         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
869
870         addPeerAddress(memberName, message.member().address());
871
872         markMemberAvailable(memberName);
873     }
874
875     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
876         MemberName memberName = memberToName(message.member());
877         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
878
879         markMemberUnavailable(memberName);
880     }
881
882     private void markMemberUnavailable(final MemberName memberName) {
883         final String memberStr = memberName.getName();
884         for (ShardInformation info : localShards.values()) {
885             String leaderId = info.getLeaderId();
886             // XXX: why are we using String#contains() here?
887             if (leaderId != null && leaderId.contains(memberStr)) {
888                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
889                 info.setLeaderAvailable(false);
890
891                 primaryShardInfoCache.remove(info.getShardName());
892             }
893
894             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
895         }
896     }
897
898     private void markMemberAvailable(final MemberName memberName) {
899         final String memberStr = memberName.getName();
900         for (ShardInformation info : localShards.values()) {
901             String leaderId = info.getLeaderId();
902             // XXX: why are we using String#contains() here?
903             if (leaderId != null && leaderId.contains(memberStr)) {
904                 LOG.debug("Marking Leader {} as available.", leaderId);
905                 info.setLeaderAvailable(true);
906             }
907
908             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
909         }
910     }
911
912     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
913         datastoreContextFactory = factory;
914         for (ShardInformation info : localShards.values()) {
915             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
916         }
917     }
918
919     private void onGetLocalShardIds() {
920         final List<String> response = new ArrayList<>(localShards.size());
921
922         for (ShardInformation info : localShards.values()) {
923             response.add(info.getShardId().toString());
924         }
925
926         getSender().tell(new Status.Success(response), getSelf());
927     }
928
929     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
930         final ShardIdentifier identifier = message.getShardId();
931
932         if (identifier != null) {
933             final ShardInformation info = localShards.get(identifier.getShardName());
934             if (info == null) {
935                 getSender().tell(new Status.Failure(
936                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
937                 return;
938             }
939
940             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
941         } else {
942             for (ShardInformation info : localShards.values()) {
943                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
944             }
945         }
946
947         getSender().tell(new Status.Success(null), getSelf());
948     }
949
950     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
951         final ActorRef actor = info.getActor();
952         if (actor != null) {
953             actor.tell(switchBehavior, getSelf());
954         } else {
955             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
956                 info.getShardName(), switchBehavior.getNewState());
957         }
958     }
959
960     /**
961      * Notifies all the local shards of a change in the schema context.
962      *
963      * @param message the message to send
964      */
965     private void updateSchemaContext(final Object message) {
966         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
967
968         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
969
970         for (ShardInformation info : localShards.values()) {
971             if (info.getActor() == null) {
972                 LOG.debug("Creating Shard {}", info.getShardId());
973                 info.setActor(newShardActor(schemaContext, info));
974             } else {
975                 info.getActor().tell(message, getSelf());
976             }
977         }
978     }
979
980     @VisibleForTesting
981     protected ClusterWrapper getCluster() {
982         return cluster;
983     }
984
985     @VisibleForTesting
986     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
987         return getContext().actorOf(info.newProps(schemaContext)
988                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
989     }
990
991     private void findPrimary(FindPrimary message) {
992         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
993
994         final String shardName = message.getShardName();
995         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
996
997         // First see if the there is a local replica for the shard
998         final ShardInformation info = localShards.get(shardName);
999         if (info != null && info.isActiveMember()) {
1000             sendResponse(info, message.isWaitUntilReady(), true, () -> {
1001                 String primaryPath = info.getSerializedLeaderActor();
1002                 Object found = canReturnLocalShardState && info.isLeader()
1003                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
1004                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
1005
1006                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
1007                 return found;
1008             });
1009
1010             return;
1011         }
1012
1013         final Collection<String> visitedAddresses;
1014         if (message instanceof RemoteFindPrimary) {
1015             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1016         } else {
1017             visitedAddresses = new ArrayList<>(1);
1018         }
1019
1020         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1021
1022         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1023             if (visitedAddresses.contains(address)) {
1024                 continue;
1025             }
1026
1027             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1028                     persistenceId(), shardName, address, visitedAddresses);
1029
1030             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1031                     message.isWaitUntilReady(), visitedAddresses), getContext());
1032             return;
1033         }
1034
1035         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1036
1037         getSender().tell(new PrimaryNotFoundException(
1038                 String.format("No primary shard found for %s.", shardName)), getSelf());
1039     }
1040
1041     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1042         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1043                 .getShardInitializationTimeout().duration().$times(2));
1044
1045         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1046         futureObj.onComplete(new OnComplete<Object>() {
1047             @Override
1048             public void onComplete(Throwable failure, Object response) {
1049                 if (failure != null) {
1050                     handler.onFailure(failure);
1051                 } else {
1052                     if (response instanceof RemotePrimaryShardFound) {
1053                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1054                     } else if (response instanceof LocalPrimaryShardFound) {
1055                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1056                     } else {
1057                         handler.onUnknownResponse(response);
1058                     }
1059                 }
1060             }
1061         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1062     }
1063
1064     /**
1065      * Construct the name of the shard actor given the name of the member on
1066      * which the shard resides and the name of the shard.
1067      *
1068      * @param memberName the member name
1069      * @param shardName the shard name
1070      * @return a b
1071      */
1072     private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
1073         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1074     }
1075
1076     /**
1077      * Create shards that are local to the member on which the ShardManager runs.
1078      */
1079     private void createLocalShards() {
1080         MemberName memberName = this.cluster.getCurrentMemberName();
1081         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
1082
1083         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1084         if (restoreFromSnapshot != null) {
1085             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1086                 shardSnapshots.put(snapshot.getName(), snapshot);
1087             }
1088         }
1089
1090         restoreFromSnapshot = null; // null out to GC
1091
1092         for (String shardName : memberShardNames) {
1093             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1094
1095             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1096
1097             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1098             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
1099                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
1100                         shardSnapshots.get(shardName)), peerAddressResolver));
1101         }
1102     }
1103
1104     /**
1105      * Given the name of the shard find the addresses of all it's peers.
1106      *
1107      * @param shardName the shard name
1108      */
1109     private Map<String, String> getPeerAddresses(String shardName) {
1110         Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1111         Map<String, String> peerAddresses = new HashMap<>();
1112
1113         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1114
1115         for (MemberName memberName : members) {
1116             if (!currentMemberName.equals(memberName)) {
1117                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1118                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1119                 peerAddresses.put(shardId.toString(), address);
1120             }
1121         }
1122         return peerAddresses;
1123     }
1124
1125     @Override
1126     public SupervisorStrategy supervisorStrategy() {
1127
1128         return new OneForOneStrategy(10, Duration.create("1 minute"),
1129                 (Function<Throwable, Directive>) t -> {
1130                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1131                     return SupervisorStrategy.resume();
1132                 });
1133     }
1134
1135     @Override
1136     public String persistenceId() {
1137         return persistenceId;
1138     }
1139
1140     @VisibleForTesting
1141     ShardManagerInfoMBean getMBean() {
1142         return shardManagerMBean;
1143     }
1144
1145     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1146         if (shardReplicaOperationsInProgress.contains(shardName)) {
1147             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1148             LOG.debug("{}: {}", persistenceId(), msg);
1149             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1150             return true;
1151         }
1152
1153         return false;
1154     }
1155
1156
1157     // With this message the shard does NOT have to be preconfigured
1158     // do a dynamic lookup if the shard exists somewhere and replicate
1159     private void onAddPrefixShardReplica(final AddPrefixShardReplica shardReplicaMsg) {
1160         final String shardName = ClusterUtils.getCleanShardName(shardReplicaMsg.getPrefix());
1161
1162         LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), shardReplicaMsg);
1163
1164         if (schemaContext == null) {
1165             final String msg = String.format(
1166                     "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1167             LOG.debug("{}: {}", persistenceId(), msg);
1168             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1169             return;
1170         }
1171
1172         findPrimary(shardName,
1173                 new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1174                     @Override
1175                     public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1176                         getSelf().tell(new RunnableMessage() {
1177                             @Override
1178                             public void run() {
1179                                 addShard(getShardName(), response, getSender());
1180                             }
1181                         }, getTargetActor());
1182                     }
1183
1184                     @Override
1185                     public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
1186                         sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1187                     }
1188                 }
1189         );
1190     }
1191
1192     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1193         final String shardName = shardReplicaMsg.getShardName();
1194
1195         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1196
1197         // verify the shard with the specified name is present in the cluster configuration
1198         if (!this.configuration.isShardConfigured(shardName)) {
1199             String msg = String.format("No module configuration exists for shard %s", shardName);
1200             LOG.debug("{}: {}", persistenceId(), msg);
1201             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1202             return;
1203         }
1204
1205         // Create the localShard
1206         if (schemaContext == null) {
1207             String msg = String.format(
1208                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1209             LOG.debug("{}: {}", persistenceId(), msg);
1210             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1211             return;
1212         }
1213
1214         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1215                 getSelf()) {
1216             @Override
1217             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1218                 getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
1219                         getTargetActor());
1220             }
1221
1222             @Override
1223             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1224                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1225             }
1226
1227         });
1228     }
1229
1230     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1231         String msg = String.format("Local shard %s already exists", shardName);
1232         LOG.debug("{}: {}", persistenceId(), msg);
1233         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1234     }
1235
1236     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1237         if (isShardReplicaOperationInProgress(shardName, sender)) {
1238             return;
1239         }
1240
1241         shardReplicaOperationsInProgress.add(shardName);
1242
1243         final ShardInformation shardInfo;
1244         final boolean removeShardOnFailure;
1245         ShardInformation existingShardInfo = localShards.get(shardName);
1246         if (existingShardInfo == null) {
1247             removeShardOnFailure = true;
1248             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1249
1250             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1251                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1252
1253             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1254                     Shard.builder(), peerAddressResolver);
1255             shardInfo.setActiveMember(false);
1256             localShards.put(shardName, shardInfo);
1257             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1258         } else {
1259             removeShardOnFailure = false;
1260             shardInfo = existingShardInfo;
1261         }
1262
1263         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1264
1265         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1266         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1267                 response.getPrimaryPath(), shardInfo.getShardId());
1268
1269         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1270                 .getShardLeaderElectionTimeout().duration());
1271         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1272             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1273
1274         futureObj.onComplete(new OnComplete<Object>() {
1275             @Override
1276             public void onComplete(Throwable failure, Object addServerResponse) {
1277                 if (failure != null) {
1278                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1279                             response.getPrimaryPath(), shardName, failure);
1280
1281                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1282                             response.getPrimaryPath(), shardName);
1283                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1284                 } else {
1285                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1286                             response.getPrimaryPath(), removeShardOnFailure), sender);
1287                 }
1288             }
1289         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1290     }
1291
1292     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1293             boolean removeShardOnFailure) {
1294         shardReplicaOperationsInProgress.remove(shardName);
1295
1296         if (removeShardOnFailure) {
1297             ShardInformation shardInfo = localShards.remove(shardName);
1298             if (shardInfo.getActor() != null) {
1299                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1300             }
1301         }
1302
1303         sender.tell(new Status.Failure(message == null ? failure :
1304             new RuntimeException(message, failure)), getSelf());
1305     }
1306
1307     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1308             String leaderPath, boolean removeShardOnFailure) {
1309         String shardName = shardInfo.getShardName();
1310         shardReplicaOperationsInProgress.remove(shardName);
1311
1312         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1313
1314         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1315             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1316
1317             // Make the local shard voting capable
1318             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1319             shardInfo.setActiveMember(true);
1320             persistShardList();
1321
1322             sender.tell(new Status.Success(null), getSelf());
1323         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1324             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1325         } else {
1326             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1327                     persistenceId(), shardName, replyMsg.getStatus());
1328
1329             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1330                     shardInfo.getShardId());
1331
1332             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1333         }
1334     }
1335
1336     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1337                                                String leaderPath, ShardIdentifier shardId) {
1338         Exception failure;
1339         switch (serverChangeStatus) {
1340             case TIMEOUT:
1341                 failure = new TimeoutException(String.format(
1342                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
1343                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
1344                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
1345                 break;
1346             case NO_LEADER:
1347                 failure = createNoShardLeaderException(shardId);
1348                 break;
1349             case NOT_SUPPORTED:
1350                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1351                         serverChange.getSimpleName(), shardId.getShardName()));
1352                 break;
1353             default :
1354                 failure = new RuntimeException(String.format(
1355                         "%s request to leader %s for shard %s failed with status %s",
1356                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1357         }
1358         return failure;
1359     }
1360
1361     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1362         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1363
1364         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1365                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1366             @Override
1367             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1368                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1369             }
1370
1371             @Override
1372             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1373                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1374             }
1375
1376             private void doRemoveShardReplicaAsync(final String primaryPath) {
1377                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1378                         primaryPath, getSender()), getTargetActor());
1379             }
1380         });
1381     }
1382
1383     private void persistShardList() {
1384         List<String> shardList = new ArrayList<>(localShards.keySet());
1385         for (ShardInformation shardInfo : localShards.values()) {
1386             if (!shardInfo.isActiveMember()) {
1387                 shardList.remove(shardInfo.getShardName());
1388             }
1389         }
1390         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1391         saveSnapshot(updateShardManagerSnapshot(shardList));
1392     }
1393
1394     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1395         currentSnapshot = new ShardManagerSnapshot(shardList);
1396         return currentSnapshot;
1397     }
1398
1399     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1400         currentSnapshot = snapshot;
1401
1402         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1403
1404         final MemberName currentMember = cluster.getCurrentMemberName();
1405         Set<String> configuredShardList =
1406             new HashSet<>(configuration.getMemberShardNames(currentMember));
1407         for (String shard : currentSnapshot.getShardList()) {
1408             if (!configuredShardList.contains(shard)) {
1409                 // add the current member as a replica for the shard
1410                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1411                 configuration.addMemberReplicaForShard(shard, currentMember);
1412             } else {
1413                 configuredShardList.remove(shard);
1414             }
1415         }
1416         for (String shard : configuredShardList) {
1417             // remove the member as a replica for the shard
1418             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1419             configuration.removeMemberReplicaForShard(shard, currentMember);
1420         }
1421     }
1422
1423     private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
1424         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1425             persistenceId());
1426         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1427             0, 0));
1428     }
1429
1430     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1431         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1432
1433         String shardName = changeMembersVotingStatus.getShardName();
1434         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1435         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1436             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1437                     e.getValue());
1438         }
1439
1440         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1441
1442         findLocalShard(shardName, getSender(),
1443             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1444             localShardFound.getPath(), getSender()));
1445     }
1446
1447     private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
1448         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1449
1450         ActorRef sender = getSender();
1451         final String shardName = flipMembersVotingStatus.getShardName();
1452         findLocalShard(shardName, sender, localShardFound -> {
1453             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1454                     Timeout.apply(30, TimeUnit.SECONDS));
1455
1456             future.onComplete(new OnComplete<Object>() {
1457                 @Override
1458                 public void onComplete(Throwable failure, Object response) {
1459                     if (failure != null) {
1460                         sender.tell(new Status.Failure(new RuntimeException(
1461                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1462                         return;
1463                     }
1464
1465                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1466                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1467                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1468                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1469                     }
1470
1471                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1472                             .toString(), !raftState.isVoting());
1473
1474                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1475                             shardName, localShardFound.getPath(), sender);
1476                 }
1477             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1478         });
1479
1480     }
1481
1482     private void findLocalShard(FindLocalShard message) {
1483         final ShardInformation shardInformation = localShards.get(message.getShardName());
1484
1485         if (shardInformation == null) {
1486             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1487             return;
1488         }
1489
1490         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1491             () -> new LocalShardFound(shardInformation.getActor()));
1492     }
1493
1494     private void findLocalShard(final String shardName, final ActorRef sender,
1495             final Consumer<LocalShardFound> onLocalShardFound) {
1496         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1497                 .getShardInitializationTimeout().duration().$times(2));
1498
1499         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1500         futureObj.onComplete(new OnComplete<Object>() {
1501             @Override
1502             public void onComplete(Throwable failure, Object response) {
1503                 if (failure != null) {
1504                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1505                             failure);
1506                     sender.tell(new Status.Failure(new RuntimeException(
1507                             String.format("Failed to find local shard %s", shardName), failure)), self());
1508                 } else {
1509                     if (response instanceof LocalShardFound) {
1510                         getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
1511                                 sender);
1512                     } else if (response instanceof LocalShardNotFound) {
1513                         String msg = String.format("Local shard %s does not exist", shardName);
1514                         LOG.debug("{}: {}", persistenceId, msg);
1515                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
1516                     } else {
1517                         String msg = String.format("Failed to find local shard %s: received response: %s",
1518                                 shardName, response);
1519                         LOG.debug("{}: {}", persistenceId, msg);
1520                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1521                                 new RuntimeException(msg)), self());
1522                     }
1523                 }
1524             }
1525         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1526     }
1527
1528     private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
1529             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1530         if (isShardReplicaOperationInProgress(shardName, sender)) {
1531             return;
1532         }
1533
1534         shardReplicaOperationsInProgress.add(shardName);
1535
1536         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1537         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1538
1539         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1540                 changeServersVotingStatus, shardActorRef.path());
1541
1542         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1543         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1544
1545         futureObj.onComplete(new OnComplete<Object>() {
1546             @Override
1547             public void onComplete(Throwable failure, Object response) {
1548                 shardReplicaOperationsInProgress.remove(shardName);
1549                 if (failure != null) {
1550                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
1551                             shardActorRef.path());
1552                     LOG.debug("{}: {}", persistenceId(), msg, failure);
1553                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
1554                 } else {
1555                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1556
1557                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1558                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1559                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1560                         sender.tell(new Status.Success(null), getSelf());
1561                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1562                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1563                                 "The requested voting state change for shard %s is invalid. At least one member "
1564                                 + "must be voting", shardId.getShardName()))), getSelf());
1565                     } else {
1566                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1567                                 persistenceId(), shardName, replyMsg.getStatus());
1568
1569                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1570                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1571                         sender.tell(new Status.Failure(error), getSelf());
1572                     }
1573                 }
1574             }
1575         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1576     }
1577
1578     private static final class ForwardedAddServerReply {
1579         ShardInformation shardInfo;
1580         AddServerReply addServerReply;
1581         String leaderPath;
1582         boolean removeShardOnFailure;
1583
1584         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1585                 boolean removeShardOnFailure) {
1586             this.shardInfo = shardInfo;
1587             this.addServerReply = addServerReply;
1588             this.leaderPath = leaderPath;
1589             this.removeShardOnFailure = removeShardOnFailure;
1590         }
1591     }
1592
1593     private static final class ForwardedAddServerFailure {
1594         String shardName;
1595         String failureMessage;
1596         Throwable failure;
1597         boolean removeShardOnFailure;
1598
1599         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1600                 boolean removeShardOnFailure) {
1601             this.shardName = shardName;
1602             this.failureMessage = failureMessage;
1603             this.failure = failure;
1604             this.removeShardOnFailure = removeShardOnFailure;
1605         }
1606     }
1607
1608     static class OnShardInitialized {
1609         private final Runnable replyRunnable;
1610         private Cancellable timeoutSchedule;
1611
1612         OnShardInitialized(Runnable replyRunnable) {
1613             this.replyRunnable = replyRunnable;
1614         }
1615
1616         Runnable getReplyRunnable() {
1617             return replyRunnable;
1618         }
1619
1620         Cancellable getTimeoutSchedule() {
1621             return timeoutSchedule;
1622         }
1623
1624         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1625             this.timeoutSchedule = timeoutSchedule;
1626         }
1627     }
1628
1629     static class OnShardReady extends OnShardInitialized {
1630         OnShardReady(Runnable replyRunnable) {
1631             super(replyRunnable);
1632         }
1633     }
1634
1635     private interface RunnableMessage extends Runnable {
1636     }
1637
1638     /**
1639      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1640      * a remote or local find primary message is processed.
1641      */
1642     private interface FindPrimaryResponseHandler {
1643         /**
1644          * Invoked when a Failure message is received as a response.
1645          *
1646          * @param failure the failure exception
1647          */
1648         void onFailure(Throwable failure);
1649
1650         /**
1651          * Invoked when a RemotePrimaryShardFound response is received.
1652          *
1653          * @param response the response
1654          */
1655         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1656
1657         /**
1658          * Invoked when a LocalPrimaryShardFound response is received.
1659          *
1660          * @param response the response
1661          */
1662         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1663
1664         /**
1665          * Invoked when an unknown response is received. This is another type of failure.
1666          *
1667          * @param response the response
1668          */
1669         void onUnknownResponse(Object response);
1670     }
1671
1672     /**
1673      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1674      * replica and sends a wrapped Failure response to some targetActor.
1675      */
1676     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1677         private final ActorRef targetActor;
1678         private final String shardName;
1679         private final String persistenceId;
1680         private final ActorRef shardManagerActor;
1681
1682         /**
1683          * Constructs an instance.
1684          *
1685          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1686          * @param shardName The name of the shard for which the primary replica had to be found
1687          * @param persistenceId The persistenceId for the ShardManager
1688          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1689          */
1690         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
1691                 ActorRef shardManagerActor) {
1692             this.targetActor = Preconditions.checkNotNull(targetActor);
1693             this.shardName = Preconditions.checkNotNull(shardName);
1694             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1695             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1696         }
1697
1698         public ActorRef getTargetActor() {
1699             return targetActor;
1700         }
1701
1702         public String getShardName() {
1703             return shardName;
1704         }
1705
1706         @Override
1707         public void onFailure(Throwable failure) {
1708             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1709             targetActor.tell(new Status.Failure(new RuntimeException(
1710                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1711         }
1712
1713         @Override
1714         public void onUnknownResponse(Object response) {
1715             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1716                     shardName, response);
1717             LOG.debug("{}: {}", persistenceId, msg);
1718             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1719                     new RuntimeException(msg)), shardManagerActor);
1720         }
1721     }
1722
1723     /**
1724      * The WrappedShardResponse class wraps a response from a Shard.
1725      */
1726     private static final class WrappedShardResponse {
1727         private final ShardIdentifier shardId;
1728         private final Object response;
1729         private final String leaderPath;
1730
1731         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1732             this.shardId = shardId;
1733             this.response = response;
1734             this.leaderPath = leaderPath;
1735         }
1736
1737         ShardIdentifier getShardId() {
1738             return shardId;
1739         }
1740
1741         Object getResponse() {
1742             return response;
1743         }
1744
1745         String getLeaderPath() {
1746             return leaderPath;
1747         }
1748     }
1749
1750     private static final class ShardNotInitializedTimeout {
1751         private final ActorRef sender;
1752         private final ShardInformation shardInfo;
1753         private final OnShardInitialized onShardInitialized;
1754
1755         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1756             this.sender = sender;
1757             this.shardInfo = shardInfo;
1758             this.onShardInitialized = onShardInitialized;
1759         }
1760
1761         ActorRef getSender() {
1762             return sender;
1763         }
1764
1765         ShardInformation getShardInfo() {
1766             return shardInfo;
1767         }
1768
1769         OnShardInitialized getOnShardInitialized() {
1770             return onShardInitialized;
1771         }
1772     }
1773 }
1774
1775
1776