BUG-7965 Switch distributed-data backend to a separate shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.actor.SupervisorStrategy.Directive;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberWeaklyUp;
23 import akka.cluster.Member;
24 import akka.dispatch.Futures;
25 import akka.dispatch.OnComplete;
26 import akka.japi.Function;
27 import akka.pattern.Patterns;
28 import akka.persistence.RecoveryCompleted;
29 import akka.persistence.SaveSnapshotFailure;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.persistence.SnapshotOffer;
32 import akka.persistence.SnapshotSelectionCriteria;
33 import akka.util.Timeout;
34 import com.google.common.annotations.VisibleForTesting;
35 import com.google.common.base.Preconditions;
36 import java.util.ArrayList;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.HashMap;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.Set;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import java.util.function.Consumer;
49 import java.util.function.Supplier;
50 import org.opendaylight.controller.cluster.access.concepts.MemberName;
51 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
52 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
53 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
54 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
55 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
57 import org.opendaylight.controller.cluster.datastore.Shard;
58 import org.opendaylight.controller.cluster.datastore.config.Configuration;
59 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
60 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
63 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
64 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
65 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
66 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
67 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
68 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
69 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
70 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
71 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
72 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
73 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
76 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
77 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
78 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
79 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
80 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
81 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
82 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
83 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
84 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
85 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
87 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
88 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
89 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
90 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
91 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
92 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
93 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
94 import org.opendaylight.controller.cluster.raft.messages.AddServer;
95 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
96 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
97 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
98 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
99 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
100 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
101 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
102 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
103 import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
104 import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
105 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
106 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
107 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
108 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
109 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
110 import org.opendaylight.yangtools.concepts.ListenerRegistration;
111 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
112 import org.slf4j.Logger;
113 import org.slf4j.LoggerFactory;
114 import scala.concurrent.ExecutionContext;
115 import scala.concurrent.Future;
116 import scala.concurrent.duration.Duration;
117 import scala.concurrent.duration.FiniteDuration;
118
119 /**
120  * Manages the shards for a data store. The ShardManager has the following jobs:
121  * <ul>
122  * <li> Create all the local shard replicas that belong on this cluster member
123  * <li> Find the address of the local shard
124  * <li> Find the primary replica for any given shard
125  * <li> Monitor the cluster members and store their addresses
126  * </ul>
127  */
128 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
129     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
130
131     // Stores a mapping between a shard name and it's corresponding information
132     // Shard names look like inventory, topology etc and are as specified in
133     // configuration
134     private final Map<String, ShardInformation> localShards = new HashMap<>();
135
136     // The type of a ShardManager reflects the type of the datastore itself
137     // A data store could be of type config/operational
138     private final String type;
139
140     private final ClusterWrapper cluster;
141
142     private final Configuration configuration;
143
144     private final String shardDispatcherPath;
145
146     private final ShardManagerInfo shardManagerMBean;
147
148     private DatastoreContextFactory datastoreContextFactory;
149
150     private final CountDownLatch waitTillReadyCountdownLatch;
151
152     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
153
154     private final ShardPeerAddressResolver peerAddressResolver;
155
156     private SchemaContext schemaContext;
157
158     private DatastoreSnapshot restoreFromSnapshot;
159
160     private ShardManagerSnapshot currentSnapshot;
161
162     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
163
164     private final String persistenceId;
165     private final AbstractDataStore dataStore;
166
167     private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
168     private PrefixedShardConfigUpdateHandler configUpdateHandler;
169
170     ShardManager(AbstractShardManagerCreator<?> builder) {
171         this.cluster = builder.getCluster();
172         this.configuration = builder.getConfiguration();
173         this.datastoreContextFactory = builder.getDatastoreContextFactory();
174         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
175         this.shardDispatcherPath =
176                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
177         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
178         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
179         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
180
181         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
182         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
183
184         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
185
186         // Subscribe this actor to cluster member events
187         cluster.subscribeToMemberEvents(getSelf());
188
189         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
190                 "shard-manager-" + this.type,
191                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
192         shardManagerMBean.registerMBean();
193
194         dataStore = builder.getDistributedDataStore();
195     }
196
197     @Override
198     public void preStart() {
199         LOG.info("Starting ShardManager {}", persistenceId);
200     }
201
202     @Override
203     public void postStop() {
204         LOG.info("Stopping ShardManager {}", persistenceId());
205
206         shardManagerMBean.unregisterMBean();
207
208         if (configListenerReg != null) {
209             configListenerReg.close();
210             configListenerReg = null;
211         }
212     }
213
214     @Override
215     public void handleCommand(Object message) throws Exception {
216         if (message  instanceof FindPrimary) {
217             findPrimary((FindPrimary)message);
218         } else if (message instanceof FindLocalShard) {
219             findLocalShard((FindLocalShard) message);
220         } else if (message instanceof UpdateSchemaContext) {
221             updateSchemaContext(message);
222         } else if (message instanceof ActorInitialized) {
223             onActorInitialized(message);
224         } else if (message instanceof ClusterEvent.MemberUp) {
225             memberUp((ClusterEvent.MemberUp) message);
226         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
227             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
228         } else if (message instanceof ClusterEvent.MemberExited) {
229             memberExited((ClusterEvent.MemberExited) message);
230         } else if (message instanceof ClusterEvent.MemberRemoved) {
231             memberRemoved((ClusterEvent.MemberRemoved) message);
232         } else if (message instanceof ClusterEvent.UnreachableMember) {
233             memberUnreachable((ClusterEvent.UnreachableMember) message);
234         } else if (message instanceof ClusterEvent.ReachableMember) {
235             memberReachable((ClusterEvent.ReachableMember) message);
236         } else if (message instanceof DatastoreContextFactory) {
237             onDatastoreContextFactory((DatastoreContextFactory) message);
238         } else if (message instanceof RoleChangeNotification) {
239             onRoleChangeNotification((RoleChangeNotification) message);
240         } else if (message instanceof FollowerInitialSyncUpStatus) {
241             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
242         } else if (message instanceof ShardNotInitializedTimeout) {
243             onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
244         } else if (message instanceof ShardLeaderStateChanged) {
245             onLeaderStateChanged((ShardLeaderStateChanged) message);
246         } else if (message instanceof SwitchShardBehavior) {
247             onSwitchShardBehavior((SwitchShardBehavior) message);
248         } else if (message instanceof CreateShard) {
249             onCreateShard((CreateShard)message);
250         } else if (message instanceof AddShardReplica) {
251             onAddShardReplica((AddShardReplica) message);
252         } else if (message instanceof PrefixShardCreated) {
253             onPrefixShardCreated((PrefixShardCreated) message);
254         } else if (message instanceof PrefixShardRemoved) {
255             onPrefixShardRemoved((PrefixShardRemoved) message);
256         } else if (message instanceof InitConfigListener) {
257             onInitConfigListener();
258         } else if (message instanceof ForwardedAddServerReply) {
259             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
260             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
261                     msg.removeShardOnFailure);
262         } else if (message instanceof ForwardedAddServerFailure) {
263             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
264             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
265         } else if (message instanceof RemoveShardReplica) {
266             onRemoveShardReplica((RemoveShardReplica) message);
267         } else if (message instanceof WrappedShardResponse) {
268             onWrappedShardResponse((WrappedShardResponse) message);
269         } else if (message instanceof GetSnapshot) {
270             onGetSnapshot();
271         } else if (message instanceof ServerRemoved) {
272             onShardReplicaRemoved((ServerRemoved) message);
273         } else if (message instanceof ChangeShardMembersVotingStatus) {
274             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
275         } else if (message instanceof FlipShardMembersVotingStatus) {
276             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
277         } else if (message instanceof SaveSnapshotSuccess) {
278             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
279         } else if (message instanceof SaveSnapshotFailure) {
280             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
281                     ((SaveSnapshotFailure) message).cause());
282         } else if (message instanceof Shutdown) {
283             onShutDown();
284         } else if (message instanceof GetLocalShardIds) {
285             onGetLocalShardIds();
286         } else if (message instanceof RunnableMessage) {
287             ((RunnableMessage)message).run();
288         } else {
289             unknownMessage(message);
290         }
291     }
292
293     private void onInitConfigListener() {
294         LOG.debug("{}: Initializing config listener.", persistenceId());
295
296         final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
297                 org.opendaylight.mdsal.common.api.LogicalDatastoreType
298                         .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
299
300         if (configUpdateHandler != null) {
301             configUpdateHandler.close();
302         }
303
304         configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
305         configUpdateHandler.initListener(dataStore, type);
306     }
307
308     private void onShutDown() {
309         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
310         for (ShardInformation info : localShards.values()) {
311             if (info.getActor() != null) {
312                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
313
314                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
315                         .getElectionTimeOutInterval().$times(2);
316                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
317             }
318         }
319
320         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
321
322         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
323                 .getDispatcher(Dispatchers.DispatcherType.Client);
324         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
325
326         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
327             @Override
328             public void onComplete(Throwable failure, Iterable<Boolean> results) {
329                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
330
331                 self().tell(PoisonPill.getInstance(), self());
332
333                 if (failure != null) {
334                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
335                 } else {
336                     int nfailed = 0;
337                     for (Boolean result : results) {
338                         if (!result) {
339                             nfailed++;
340                         }
341                     }
342
343                     if (nfailed > 0) {
344                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
345                     }
346                 }
347             }
348         }, dispatcher);
349     }
350
351     private void onWrappedShardResponse(WrappedShardResponse message) {
352         if (message.getResponse() instanceof RemoveServerReply) {
353             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
354                     message.getLeaderPath());
355         }
356     }
357
358     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
359             String leaderPath) {
360         shardReplicaOperationsInProgress.remove(shardId.getShardName());
361
362         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
363
364         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
365             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
366                     shardId.getShardName());
367             originalSender.tell(new Status.Success(null), getSelf());
368         } else {
369             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
370                     persistenceId(), shardId, replyMsg.getStatus());
371
372             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
373             originalSender.tell(new Status.Failure(failure), getSelf());
374         }
375     }
376
377     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
378             final ActorRef sender) {
379         if (isShardReplicaOperationInProgress(shardName, sender)) {
380             return;
381         }
382
383         shardReplicaOperationsInProgress.add(shardName);
384
385         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
386
387         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
388
389         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
390         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
391                 primaryPath, shardId);
392
393         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
394         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
395                 new RemoveServer(shardId.toString()), removeServerTimeout);
396
397         futureObj.onComplete(new OnComplete<Object>() {
398             @Override
399             public void onComplete(Throwable failure, Object response) {
400                 if (failure != null) {
401                     shardReplicaOperationsInProgress.remove(shardName);
402                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
403                             primaryPath, shardName);
404
405                     LOG.debug("{}: {}", persistenceId(), msg, failure);
406
407                     // FAILURE
408                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
409                 } else {
410                     // SUCCESS
411                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
412                 }
413             }
414         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
415     }
416
417     private void onShardReplicaRemoved(ServerRemoved message) {
418         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
419         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
420         if (shardInformation == null) {
421             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
422             return;
423         } else if (shardInformation.getActor() != null) {
424             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
425             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
426         }
427         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
428         persistShardList();
429     }
430
431     private void onGetSnapshot() {
432         LOG.debug("{}: onGetSnapshot", persistenceId());
433
434         List<String> notInitialized = null;
435         for (ShardInformation shardInfo : localShards.values()) {
436             if (!shardInfo.isShardInitialized()) {
437                 if (notInitialized == null) {
438                     notInitialized = new ArrayList<>();
439                 }
440
441                 notInitialized.add(shardInfo.getShardName());
442             }
443         }
444
445         if (notInitialized != null) {
446             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
447                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
448             return;
449         }
450
451         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
452                 new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
453                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
454
455         for (ShardInformation shardInfo: localShards.values()) {
456             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
457         }
458     }
459
460     @SuppressWarnings("checkstyle:IllegalCatch")
461     private void onCreateShard(CreateShard createShard) {
462         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
463
464         Object reply;
465         try {
466             String shardName = createShard.getModuleShardConfig().getShardName();
467             if (localShards.containsKey(shardName)) {
468                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
469                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
470             } else {
471                 doCreateShard(createShard);
472                 reply = new Status.Success(null);
473             }
474         } catch (Exception e) {
475             LOG.error("{}: onCreateShard failed", persistenceId(), e);
476             reply = new Status.Failure(e);
477         }
478
479         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
480             getSender().tell(reply, getSelf());
481         }
482     }
483
484     private void onPrefixShardCreated(final PrefixShardCreated message) {
485         LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
486
487         final PrefixShardConfiguration config = message.getConfiguration();
488
489         final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(),
490                 config.getPrefix());
491         final String shardName = shardId.getShardName();
492
493         if (localShards.containsKey(shardName)) {
494             LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
495             final PrefixShardConfiguration existing =
496                     configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
497
498             if (existing != null && existing.equals(config)) {
499                 // we don't have to do nothing here
500                 return;
501             }
502         }
503
504         doCreatePrefixShard(config, shardId, shardName);
505     }
506
507     private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
508         configuration.addPrefixShardConfiguration(config);
509
510         final Builder builder = newShardDatastoreContextBuilder(shardName);
511         builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
512                 .storeRoot(config.getPrefix().getRootIdentifier());
513         DatastoreContext shardDatastoreContext = builder.build();
514
515         final Map<String, String> peerAddresses = Collections.emptyMap();
516         final boolean isActiveMember = true;
517
518         LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
519                 persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
520
521         final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
522                 shardDatastoreContext, Shard.builder(), peerAddressResolver);
523         info.setActiveMember(isActiveMember);
524         localShards.put(info.getShardName(), info);
525
526         if (schemaContext != null) {
527             info.setActor(newShardActor(schemaContext, info));
528         }
529     }
530
531     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
532         LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
533
534         final DOMDataTreeIdentifier prefix = message.getPrefix();
535         final ShardIdentifier shardId = ClusterUtils.getShardIdentifier(cluster.getCurrentMemberName(), prefix);
536         final ShardInformation shard = localShards.remove(shardId.getShardName());
537
538         configuration.removePrefixShardConfiguration(prefix);
539
540         if (shard == null) {
541             LOG.warn("{}: Received removal for unconfigured shard: {}, ignoring.. ", persistenceId(), prefix);
542             return;
543         }
544
545         if (shard.getActor() != null) {
546             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shard.getActor());
547             shard.getActor().tell(Shutdown.INSTANCE, self());
548         }
549
550         LOG.debug("{}: Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
551         persistShardList();
552     }
553
554     private void doCreateShard(final CreateShard createShard) {
555         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
556         final String shardName = moduleShardConfig.getShardName();
557
558         configuration.addModuleShardConfiguration(moduleShardConfig);
559
560         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
561         if (shardDatastoreContext == null) {
562             shardDatastoreContext = newShardDatastoreContext(shardName);
563         } else {
564             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
565                     peerAddressResolver).build();
566         }
567
568         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
569
570         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
571                 && currentSnapshot.getShardList().contains(shardName);
572
573         Map<String, String> peerAddresses;
574         boolean isActiveMember;
575         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
576                 .contains(cluster.getCurrentMemberName())) {
577             peerAddresses = getPeerAddresses(shardName);
578             isActiveMember = true;
579         } else {
580             // The local member is not in the static shard member configuration and the shard did not
581             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
582             // the shard with no peers and with elections disabled so it stays as follower. A
583             // subsequent AddServer request will be needed to make it an active member.
584             isActiveMember = false;
585             peerAddresses = Collections.emptyMap();
586             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
587                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
588         }
589
590         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
591                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
592                 isActiveMember);
593
594         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
595                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
596         info.setActiveMember(isActiveMember);
597         localShards.put(info.getShardName(), info);
598
599         if (schemaContext != null) {
600             info.setActor(newShardActor(schemaContext, info));
601         }
602     }
603
604     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
605         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
606                 .shardPeerAddressResolver(peerAddressResolver);
607     }
608
609     private DatastoreContext newShardDatastoreContext(String shardName) {
610         return newShardDatastoreContextBuilder(shardName).build();
611     }
612
613     private void checkReady() {
614         if (isReadyWithLeaderId()) {
615             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
616                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
617
618             waitTillReadyCountdownLatch.countDown();
619         }
620     }
621
622     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
623         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
624
625         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
626         if (shardInformation != null) {
627             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
628             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
629             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
630                 primaryShardInfoCache.remove(shardInformation.getShardName());
631             }
632
633             checkReady();
634         } else {
635             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
636         }
637     }
638
639     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
640         ShardInformation shardInfo = message.getShardInfo();
641
642         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
643                 shardInfo.getShardName());
644
645         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
646
647         if (!shardInfo.isShardInitialized()) {
648             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
649             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
650         } else {
651             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
652             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
653         }
654     }
655
656     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
657         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
658                 status.getName(), status.isInitialSyncDone());
659
660         ShardInformation shardInformation = findShardInformation(status.getName());
661
662         if (shardInformation != null) {
663             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
664
665             shardManagerMBean.setSyncStatus(isInSync());
666         }
667
668     }
669
670     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
671         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
672                 roleChanged.getOldRole(), roleChanged.getNewRole());
673
674         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
675         if (shardInformation != null) {
676             shardInformation.setRole(roleChanged.getNewRole());
677             checkReady();
678             shardManagerMBean.setSyncStatus(isInSync());
679         }
680     }
681
682
683     private ShardInformation findShardInformation(String memberId) {
684         for (ShardInformation info : localShards.values()) {
685             if (info.getShardId().toString().equals(memberId)) {
686                 return info;
687             }
688         }
689
690         return null;
691     }
692
693     private boolean isReadyWithLeaderId() {
694         boolean isReady = true;
695         for (ShardInformation info : localShards.values()) {
696             if (!info.isShardReadyWithLeaderId()) {
697                 isReady = false;
698                 break;
699             }
700         }
701         return isReady;
702     }
703
704     private boolean isInSync() {
705         for (ShardInformation info : localShards.values()) {
706             if (!info.isInSync()) {
707                 return false;
708             }
709         }
710         return true;
711     }
712
713     private void onActorInitialized(Object message) {
714         final ActorRef sender = getSender();
715
716         if (sender == null) {
717             return; //why is a non-actor sending this message? Just ignore.
718         }
719
720         String actorName = sender.path().name();
721         //find shard name from actor name; actor name is stringified shardId
722
723         final ShardIdentifier shardId;
724         try {
725             shardId = ShardIdentifier.fromShardIdString(actorName);
726         } catch (IllegalArgumentException e) {
727             LOG.debug("{}: ignoring actor {}", actorName, e);
728             return;
729         }
730
731         markShardAsInitialized(shardId.getShardName());
732     }
733
734     private void markShardAsInitialized(String shardName) {
735         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
736
737         ShardInformation shardInformation = localShards.get(shardName);
738         if (shardInformation != null) {
739             shardInformation.setActorInitialized();
740
741             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
742         }
743     }
744
745     @Override
746     protected void handleRecover(Object message) throws Exception {
747         if (message instanceof RecoveryCompleted) {
748             onRecoveryCompleted();
749         } else if (message instanceof SnapshotOffer) {
750             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
751         }
752     }
753
754     @SuppressWarnings("checkstyle:IllegalCatch")
755     private void onRecoveryCompleted() {
756         LOG.info("Recovery complete : {}", persistenceId());
757
758         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
759         // journal on upgrade from Helium.
760         deleteMessages(lastSequenceNr());
761
762         if (currentSnapshot == null && restoreFromSnapshot != null
763                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
764             ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
765
766             LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
767
768             applyShardManagerSnapshot(snapshot);
769         }
770
771         createLocalShards();
772     }
773
774     private void sendResponse(ShardInformation shardInformation, boolean doWait,
775             boolean wantShardReady, final Supplier<Object> messageSupplier) {
776         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
777             if (doWait) {
778                 final ActorRef sender = getSender();
779                 final ActorRef self = self();
780
781                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
782
783                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
784                     new OnShardInitialized(replyRunnable);
785
786                 shardInformation.addOnShardInitialized(onShardInitialized);
787
788                 FiniteDuration timeout = shardInformation.getDatastoreContext()
789                         .getShardInitializationTimeout().duration();
790                 if (shardInformation.isShardInitialized()) {
791                     // If the shard is already initialized then we'll wait enough time for the shard to
792                     // elect a leader, ie 2 times the election timeout.
793                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
794                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
795                 }
796
797                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
798                         shardInformation.getShardName());
799
800                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
801                         timeout, getSelf(),
802                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
803                         getContext().dispatcher(), getSelf());
804
805                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
806
807             } else if (!shardInformation.isShardInitialized()) {
808                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
809                         shardInformation.getShardName());
810                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
811             } else {
812                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
813                         shardInformation.getShardName());
814                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
815             }
816
817             return;
818         }
819
820         getSender().tell(messageSupplier.get(), getSelf());
821     }
822
823     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
824         return new NoShardLeaderException(null, shardId.toString());
825     }
826
827     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
828         return new NotInitializedException(String.format(
829                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
830     }
831
832     @VisibleForTesting
833     static MemberName memberToName(final Member member) {
834         return MemberName.forName(member.roles().iterator().next());
835     }
836
837     private void memberRemoved(ClusterEvent.MemberRemoved message) {
838         MemberName memberName = memberToName(message.member());
839
840         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
841                 message.member().address());
842
843         peerAddressResolver.removePeerAddress(memberName);
844
845         for (ShardInformation info : localShards.values()) {
846             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
847         }
848     }
849
850     private void memberExited(ClusterEvent.MemberExited message) {
851         MemberName memberName = memberToName(message.member());
852
853         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
854                 message.member().address());
855
856         peerAddressResolver.removePeerAddress(memberName);
857
858         for (ShardInformation info : localShards.values()) {
859             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
860         }
861     }
862
863     private void memberUp(ClusterEvent.MemberUp message) {
864         MemberName memberName = memberToName(message.member());
865
866         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
867                 message.member().address());
868
869         memberUp(memberName, message.member().address());
870     }
871
872     private void memberUp(MemberName memberName, Address address) {
873         addPeerAddress(memberName, address);
874         checkReady();
875     }
876
877     private void memberWeaklyUp(MemberWeaklyUp message) {
878         MemberName memberName = memberToName(message.member());
879
880         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
881                 message.member().address());
882
883         memberUp(memberName, message.member().address());
884     }
885
886     private void addPeerAddress(MemberName memberName, Address address) {
887         peerAddressResolver.addPeerAddress(memberName, address);
888
889         for (ShardInformation info : localShards.values()) {
890             String shardName = info.getShardName();
891             String peerId = getShardIdentifier(memberName, shardName).toString();
892             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
893
894             info.peerUp(memberName, peerId, getSelf());
895         }
896     }
897
898     private void memberReachable(ClusterEvent.ReachableMember message) {
899         MemberName memberName = memberToName(message.member());
900         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
901
902         addPeerAddress(memberName, message.member().address());
903
904         markMemberAvailable(memberName);
905     }
906
907     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
908         MemberName memberName = memberToName(message.member());
909         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
910
911         markMemberUnavailable(memberName);
912     }
913
914     private void markMemberUnavailable(final MemberName memberName) {
915         final String memberStr = memberName.getName();
916         for (ShardInformation info : localShards.values()) {
917             String leaderId = info.getLeaderId();
918             // XXX: why are we using String#contains() here?
919             if (leaderId != null && leaderId.contains(memberStr)) {
920                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
921                 info.setLeaderAvailable(false);
922
923                 primaryShardInfoCache.remove(info.getShardName());
924             }
925
926             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
927         }
928     }
929
930     private void markMemberAvailable(final MemberName memberName) {
931         final String memberStr = memberName.getName();
932         for (ShardInformation info : localShards.values()) {
933             String leaderId = info.getLeaderId();
934             // XXX: why are we using String#contains() here?
935             if (leaderId != null && leaderId.contains(memberStr)) {
936                 LOG.debug("Marking Leader {} as available.", leaderId);
937                 info.setLeaderAvailable(true);
938             }
939
940             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
941         }
942     }
943
944     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
945         datastoreContextFactory = factory;
946         for (ShardInformation info : localShards.values()) {
947             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
948         }
949     }
950
951     private void onGetLocalShardIds() {
952         final List<String> response = new ArrayList<>(localShards.size());
953
954         for (ShardInformation info : localShards.values()) {
955             response.add(info.getShardId().toString());
956         }
957
958         getSender().tell(new Status.Success(response), getSelf());
959     }
960
961     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
962         final ShardIdentifier identifier = message.getShardId();
963
964         if (identifier != null) {
965             final ShardInformation info = localShards.get(identifier.getShardName());
966             if (info == null) {
967                 getSender().tell(new Status.Failure(
968                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
969                 return;
970             }
971
972             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
973         } else {
974             for (ShardInformation info : localShards.values()) {
975                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
976             }
977         }
978
979         getSender().tell(new Status.Success(null), getSelf());
980     }
981
982     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
983         final ActorRef actor = info.getActor();
984         if (actor != null) {
985             actor.tell(switchBehavior, getSelf());
986         } else {
987             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
988                 info.getShardName(), switchBehavior.getNewState());
989         }
990     }
991
992     /**
993      * Notifies all the local shards of a change in the schema context.
994      *
995      * @param message the message to send
996      */
997     private void updateSchemaContext(final Object message) {
998         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
999
1000         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
1001
1002         for (ShardInformation info : localShards.values()) {
1003             if (info.getActor() == null) {
1004                 LOG.debug("Creating Shard {}", info.getShardId());
1005                 info.setActor(newShardActor(schemaContext, info));
1006             } else {
1007                 info.getActor().tell(message, getSelf());
1008             }
1009         }
1010     }
1011
1012     @VisibleForTesting
1013     protected ClusterWrapper getCluster() {
1014         return cluster;
1015     }
1016
1017     @VisibleForTesting
1018     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
1019         return getContext().actorOf(info.newProps(schemaContext)
1020                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
1021     }
1022
1023     private void findPrimary(FindPrimary message) {
1024         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
1025
1026         final String shardName = message.getShardName();
1027         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
1028
1029         // First see if the there is a local replica for the shard
1030         final ShardInformation info = localShards.get(shardName);
1031         if (info != null && info.isActiveMember()) {
1032             sendResponse(info, message.isWaitUntilReady(), true, () -> {
1033                 String primaryPath = info.getSerializedLeaderActor();
1034                 Object found = canReturnLocalShardState && info.isLeader()
1035                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
1036                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
1037
1038                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
1039                 return found;
1040             });
1041
1042             return;
1043         }
1044
1045         final Collection<String> visitedAddresses;
1046         if (message instanceof RemoteFindPrimary) {
1047             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1048         } else {
1049             visitedAddresses = new ArrayList<>(1);
1050         }
1051
1052         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1053
1054         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1055             if (visitedAddresses.contains(address)) {
1056                 continue;
1057             }
1058
1059             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1060                     persistenceId(), shardName, address, visitedAddresses);
1061
1062             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1063                     message.isWaitUntilReady(), visitedAddresses), getContext());
1064             return;
1065         }
1066
1067         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1068
1069         getSender().tell(new PrimaryNotFoundException(
1070                 String.format("No primary shard found for %s.", shardName)), getSelf());
1071     }
1072
1073     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1074         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1075                 .getShardInitializationTimeout().duration().$times(2));
1076
1077         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1078         futureObj.onComplete(new OnComplete<Object>() {
1079             @Override
1080             public void onComplete(Throwable failure, Object response) {
1081                 if (failure != null) {
1082                     handler.onFailure(failure);
1083                 } else {
1084                     if (response instanceof RemotePrimaryShardFound) {
1085                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1086                     } else if (response instanceof LocalPrimaryShardFound) {
1087                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1088                     } else {
1089                         handler.onUnknownResponse(response);
1090                     }
1091                 }
1092             }
1093         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1094     }
1095
1096     /**
1097      * Construct the name of the shard actor given the name of the member on
1098      * which the shard resides and the name of the shard.
1099      *
1100      * @param memberName the member name
1101      * @param shardName the shard name
1102      * @return a b
1103      */
1104     private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
1105         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1106     }
1107
1108     /**
1109      * Create shards that are local to the member on which the ShardManager runs.
1110      */
1111     private void createLocalShards() {
1112         MemberName memberName = this.cluster.getCurrentMemberName();
1113         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
1114
1115         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1116         if (restoreFromSnapshot != null) {
1117             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1118                 shardSnapshots.put(snapshot.getName(), snapshot);
1119             }
1120         }
1121
1122         restoreFromSnapshot = null; // null out to GC
1123
1124         for (String shardName : memberShardNames) {
1125             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1126
1127             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1128
1129             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1130             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
1131                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
1132                         shardSnapshots.get(shardName)), peerAddressResolver));
1133         }
1134     }
1135
1136     /**
1137      * Given the name of the shard find the addresses of all it's peers.
1138      *
1139      * @param shardName the shard name
1140      */
1141     private Map<String, String> getPeerAddresses(String shardName) {
1142         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1143         return getPeerAddresses(shardName, members);
1144     }
1145
1146     private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
1147         Map<String, String> peerAddresses = new HashMap<>();
1148         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1149
1150         for (MemberName memberName : members) {
1151             if (!currentMemberName.equals(memberName)) {
1152                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1153                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1154                 peerAddresses.put(shardId.toString(), address);
1155             }
1156         }
1157         return peerAddresses;
1158     }
1159
1160     @Override
1161     public SupervisorStrategy supervisorStrategy() {
1162
1163         return new OneForOneStrategy(10, Duration.create("1 minute"),
1164                 (Function<Throwable, Directive>) t -> {
1165                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1166                     return SupervisorStrategy.resume();
1167                 });
1168     }
1169
1170     @Override
1171     public String persistenceId() {
1172         return persistenceId;
1173     }
1174
1175     @VisibleForTesting
1176     ShardManagerInfoMBean getMBean() {
1177         return shardManagerMBean;
1178     }
1179
1180     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1181         if (shardReplicaOperationsInProgress.contains(shardName)) {
1182             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1183             LOG.debug("{}: {}", persistenceId(), msg);
1184             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1185             return true;
1186         }
1187
1188         return false;
1189     }
1190
1191     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1192         final String shardName = shardReplicaMsg.getShardName();
1193
1194         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1195
1196         // verify the shard with the specified name is present in the cluster configuration
1197         if (!this.configuration.isShardConfigured(shardName)) {
1198             String msg = String.format("No module configuration exists for shard %s", shardName);
1199             LOG.debug("{}: {}", persistenceId(), msg);
1200             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1201             return;
1202         }
1203
1204         // Create the localShard
1205         if (schemaContext == null) {
1206             String msg = String.format(
1207                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1208             LOG.debug("{}: {}", persistenceId(), msg);
1209             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1210             return;
1211         }
1212
1213         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1214                 getSelf()) {
1215             @Override
1216             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1217                 getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
1218                         getTargetActor());
1219             }
1220
1221             @Override
1222             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1223                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1224             }
1225
1226         });
1227     }
1228
1229     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1230         String msg = String.format("Local shard %s already exists", shardName);
1231         LOG.debug("{}: {}", persistenceId(), msg);
1232         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1233     }
1234
1235     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1236         if (isShardReplicaOperationInProgress(shardName, sender)) {
1237             return;
1238         }
1239
1240         shardReplicaOperationsInProgress.add(shardName);
1241
1242         final ShardInformation shardInfo;
1243         final boolean removeShardOnFailure;
1244         ShardInformation existingShardInfo = localShards.get(shardName);
1245         if (existingShardInfo == null) {
1246             removeShardOnFailure = true;
1247             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1248
1249             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1250                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1251
1252             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1253                     Shard.builder(), peerAddressResolver);
1254             shardInfo.setActiveMember(false);
1255             localShards.put(shardName, shardInfo);
1256             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1257         } else {
1258             removeShardOnFailure = false;
1259             shardInfo = existingShardInfo;
1260         }
1261
1262         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1263
1264         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1265         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1266                 response.getPrimaryPath(), shardInfo.getShardId());
1267
1268         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1269                 .getShardLeaderElectionTimeout().duration());
1270         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1271             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1272
1273         futureObj.onComplete(new OnComplete<Object>() {
1274             @Override
1275             public void onComplete(Throwable failure, Object addServerResponse) {
1276                 if (failure != null) {
1277                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1278                             response.getPrimaryPath(), shardName, failure);
1279
1280                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1281                             response.getPrimaryPath(), shardName);
1282                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1283                 } else {
1284                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1285                             response.getPrimaryPath(), removeShardOnFailure), sender);
1286                 }
1287             }
1288         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1289     }
1290
1291     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1292             boolean removeShardOnFailure) {
1293         shardReplicaOperationsInProgress.remove(shardName);
1294
1295         if (removeShardOnFailure) {
1296             ShardInformation shardInfo = localShards.remove(shardName);
1297             if (shardInfo.getActor() != null) {
1298                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1299             }
1300         }
1301
1302         sender.tell(new Status.Failure(message == null ? failure :
1303             new RuntimeException(message, failure)), getSelf());
1304     }
1305
1306     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1307             String leaderPath, boolean removeShardOnFailure) {
1308         String shardName = shardInfo.getShardName();
1309         shardReplicaOperationsInProgress.remove(shardName);
1310
1311         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1312
1313         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1314             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1315
1316             // Make the local shard voting capable
1317             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1318             shardInfo.setActiveMember(true);
1319             persistShardList();
1320
1321             sender.tell(new Status.Success(null), getSelf());
1322         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1323             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1324         } else {
1325             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1326                     persistenceId(), shardName, replyMsg.getStatus());
1327
1328             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1329                     shardInfo.getShardId());
1330
1331             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1332         }
1333     }
1334
1335     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1336                                                String leaderPath, ShardIdentifier shardId) {
1337         Exception failure;
1338         switch (serverChangeStatus) {
1339             case TIMEOUT:
1340                 failure = new TimeoutException(String.format(
1341                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
1342                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
1343                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
1344                 break;
1345             case NO_LEADER:
1346                 failure = createNoShardLeaderException(shardId);
1347                 break;
1348             case NOT_SUPPORTED:
1349                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1350                         serverChange.getSimpleName(), shardId.getShardName()));
1351                 break;
1352             default :
1353                 failure = new RuntimeException(String.format(
1354                         "%s request to leader %s for shard %s failed with status %s",
1355                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1356         }
1357         return failure;
1358     }
1359
1360     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1361         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1362
1363         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1364                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1365             @Override
1366             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1367                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1368             }
1369
1370             @Override
1371             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1372                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1373             }
1374
1375             private void doRemoveShardReplicaAsync(final String primaryPath) {
1376                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1377                         primaryPath, getSender()), getTargetActor());
1378             }
1379         });
1380     }
1381
1382     private void persistShardList() {
1383         List<String> shardList = new ArrayList<>(localShards.keySet());
1384         for (ShardInformation shardInfo : localShards.values()) {
1385             if (!shardInfo.isActiveMember()) {
1386                 shardList.remove(shardInfo.getShardName());
1387             }
1388         }
1389         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1390         saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
1391     }
1392
1393     private ShardManagerSnapshot updateShardManagerSnapshot(
1394             final List<String> shardList,
1395             final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
1396         currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
1397         return currentSnapshot;
1398     }
1399
1400     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1401         currentSnapshot = snapshot;
1402
1403         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1404
1405         final MemberName currentMember = cluster.getCurrentMemberName();
1406         Set<String> configuredShardList =
1407             new HashSet<>(configuration.getMemberShardNames(currentMember));
1408         for (String shard : currentSnapshot.getShardList()) {
1409             if (!configuredShardList.contains(shard)) {
1410                 // add the current member as a replica for the shard
1411                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1412                 configuration.addMemberReplicaForShard(shard, currentMember);
1413             } else {
1414                 configuredShardList.remove(shard);
1415             }
1416         }
1417         for (String shard : configuredShardList) {
1418             // remove the member as a replica for the shard
1419             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1420             configuration.removeMemberReplicaForShard(shard, currentMember);
1421         }
1422     }
1423
1424     private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
1425         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1426             persistenceId());
1427         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1428             0, 0));
1429     }
1430
1431     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1432         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1433
1434         String shardName = changeMembersVotingStatus.getShardName();
1435         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1436         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1437             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1438                     e.getValue());
1439         }
1440
1441         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1442
1443         findLocalShard(shardName, getSender(),
1444             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1445             localShardFound.getPath(), getSender()));
1446     }
1447
1448     private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
1449         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1450
1451         ActorRef sender = getSender();
1452         final String shardName = flipMembersVotingStatus.getShardName();
1453         findLocalShard(shardName, sender, localShardFound -> {
1454             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1455                     Timeout.apply(30, TimeUnit.SECONDS));
1456
1457             future.onComplete(new OnComplete<Object>() {
1458                 @Override
1459                 public void onComplete(Throwable failure, Object response) {
1460                     if (failure != null) {
1461                         sender.tell(new Status.Failure(new RuntimeException(
1462                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1463                         return;
1464                     }
1465
1466                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1467                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1468                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1469                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1470                     }
1471
1472                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1473                             .toString(), !raftState.isVoting());
1474
1475                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1476                             shardName, localShardFound.getPath(), sender);
1477                 }
1478             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1479         });
1480
1481     }
1482
1483     private void findLocalShard(FindLocalShard message) {
1484         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
1485
1486         final ShardInformation shardInformation = localShards.get(message.getShardName());
1487
1488         if (shardInformation == null) {
1489             LOG.debug("{}: Local shard {} not found - shards present: {}",
1490                     persistenceId(), message.getShardName(), localShards.keySet());
1491
1492             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1493             return;
1494         }
1495
1496         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1497             () -> new LocalShardFound(shardInformation.getActor()));
1498     }
1499
1500     private void findLocalShard(final String shardName, final ActorRef sender,
1501             final Consumer<LocalShardFound> onLocalShardFound) {
1502         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1503                 .getShardInitializationTimeout().duration().$times(2));
1504
1505         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1506         futureObj.onComplete(new OnComplete<Object>() {
1507             @Override
1508             public void onComplete(Throwable failure, Object response) {
1509                 if (failure != null) {
1510                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1511                             failure);
1512                     sender.tell(new Status.Failure(new RuntimeException(
1513                             String.format("Failed to find local shard %s", shardName), failure)), self());
1514                 } else {
1515                     if (response instanceof LocalShardFound) {
1516                         getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
1517                                 sender);
1518                     } else if (response instanceof LocalShardNotFound) {
1519                         String msg = String.format("Local shard %s does not exist", shardName);
1520                         LOG.debug("{}: {}", persistenceId, msg);
1521                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
1522                     } else {
1523                         String msg = String.format("Failed to find local shard %s: received response: %s",
1524                                 shardName, response);
1525                         LOG.debug("{}: {}", persistenceId, msg);
1526                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1527                                 new RuntimeException(msg)), self());
1528                     }
1529                 }
1530             }
1531         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1532     }
1533
1534     private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
1535             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1536         if (isShardReplicaOperationInProgress(shardName, sender)) {
1537             return;
1538         }
1539
1540         shardReplicaOperationsInProgress.add(shardName);
1541
1542         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1543         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1544
1545         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1546                 changeServersVotingStatus, shardActorRef.path());
1547
1548         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1549         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1550
1551         futureObj.onComplete(new OnComplete<Object>() {
1552             @Override
1553             public void onComplete(Throwable failure, Object response) {
1554                 shardReplicaOperationsInProgress.remove(shardName);
1555                 if (failure != null) {
1556                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
1557                             shardActorRef.path());
1558                     LOG.debug("{}: {}", persistenceId(), msg, failure);
1559                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
1560                 } else {
1561                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1562
1563                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1564                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1565                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1566                         sender.tell(new Status.Success(null), getSelf());
1567                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1568                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1569                                 "The requested voting state change for shard %s is invalid. At least one member "
1570                                 + "must be voting", shardId.getShardName()))), getSelf());
1571                     } else {
1572                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1573                                 persistenceId(), shardName, replyMsg.getStatus());
1574
1575                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1576                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1577                         sender.tell(new Status.Failure(error), getSelf());
1578                     }
1579                 }
1580             }
1581         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1582     }
1583
1584     private static final class ForwardedAddServerReply {
1585         ShardInformation shardInfo;
1586         AddServerReply addServerReply;
1587         String leaderPath;
1588         boolean removeShardOnFailure;
1589
1590         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1591                 boolean removeShardOnFailure) {
1592             this.shardInfo = shardInfo;
1593             this.addServerReply = addServerReply;
1594             this.leaderPath = leaderPath;
1595             this.removeShardOnFailure = removeShardOnFailure;
1596         }
1597     }
1598
1599     private static final class ForwardedAddServerFailure {
1600         String shardName;
1601         String failureMessage;
1602         Throwable failure;
1603         boolean removeShardOnFailure;
1604
1605         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1606                 boolean removeShardOnFailure) {
1607             this.shardName = shardName;
1608             this.failureMessage = failureMessage;
1609             this.failure = failure;
1610             this.removeShardOnFailure = removeShardOnFailure;
1611         }
1612     }
1613
1614     static class OnShardInitialized {
1615         private final Runnable replyRunnable;
1616         private Cancellable timeoutSchedule;
1617
1618         OnShardInitialized(Runnable replyRunnable) {
1619             this.replyRunnable = replyRunnable;
1620         }
1621
1622         Runnable getReplyRunnable() {
1623             return replyRunnable;
1624         }
1625
1626         Cancellable getTimeoutSchedule() {
1627             return timeoutSchedule;
1628         }
1629
1630         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1631             this.timeoutSchedule = timeoutSchedule;
1632         }
1633     }
1634
1635     static class OnShardReady extends OnShardInitialized {
1636         OnShardReady(Runnable replyRunnable) {
1637             super(replyRunnable);
1638         }
1639     }
1640
1641     private interface RunnableMessage extends Runnable {
1642     }
1643
1644     /**
1645      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1646      * a remote or local find primary message is processed.
1647      */
1648     private interface FindPrimaryResponseHandler {
1649         /**
1650          * Invoked when a Failure message is received as a response.
1651          *
1652          * @param failure the failure exception
1653          */
1654         void onFailure(Throwable failure);
1655
1656         /**
1657          * Invoked when a RemotePrimaryShardFound response is received.
1658          *
1659          * @param response the response
1660          */
1661         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1662
1663         /**
1664          * Invoked when a LocalPrimaryShardFound response is received.
1665          *
1666          * @param response the response
1667          */
1668         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1669
1670         /**
1671          * Invoked when an unknown response is received. This is another type of failure.
1672          *
1673          * @param response the response
1674          */
1675         void onUnknownResponse(Object response);
1676     }
1677
1678     /**
1679      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1680      * replica and sends a wrapped Failure response to some targetActor.
1681      */
1682     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1683         private final ActorRef targetActor;
1684         private final String shardName;
1685         private final String persistenceId;
1686         private final ActorRef shardManagerActor;
1687
1688         /**
1689          * Constructs an instance.
1690          *
1691          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1692          * @param shardName The name of the shard for which the primary replica had to be found
1693          * @param persistenceId The persistenceId for the ShardManager
1694          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1695          */
1696         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
1697                 ActorRef shardManagerActor) {
1698             this.targetActor = Preconditions.checkNotNull(targetActor);
1699             this.shardName = Preconditions.checkNotNull(shardName);
1700             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1701             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1702         }
1703
1704         public ActorRef getTargetActor() {
1705             return targetActor;
1706         }
1707
1708         public String getShardName() {
1709             return shardName;
1710         }
1711
1712         @Override
1713         public void onFailure(Throwable failure) {
1714             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1715             targetActor.tell(new Status.Failure(new RuntimeException(
1716                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1717         }
1718
1719         @Override
1720         public void onUnknownResponse(Object response) {
1721             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1722                     shardName, response);
1723             LOG.debug("{}: {}", persistenceId, msg);
1724             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1725                     new RuntimeException(msg)), shardManagerActor);
1726         }
1727     }
1728
1729     /**
1730      * The WrappedShardResponse class wraps a response from a Shard.
1731      */
1732     private static final class WrappedShardResponse {
1733         private final ShardIdentifier shardId;
1734         private final Object response;
1735         private final String leaderPath;
1736
1737         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1738             this.shardId = shardId;
1739             this.response = response;
1740             this.leaderPath = leaderPath;
1741         }
1742
1743         ShardIdentifier getShardId() {
1744             return shardId;
1745         }
1746
1747         Object getResponse() {
1748             return response;
1749         }
1750
1751         String getLeaderPath() {
1752             return leaderPath;
1753         }
1754     }
1755
1756     private static final class ShardNotInitializedTimeout {
1757         private final ActorRef sender;
1758         private final ShardInformation shardInfo;
1759         private final OnShardInitialized onShardInitialized;
1760
1761         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1762             this.sender = sender;
1763             this.shardInfo = shardInfo;
1764             this.onShardInitialized = onShardInitialized;
1765         }
1766
1767         ActorRef getSender() {
1768             return sender;
1769         }
1770
1771         ShardInformation getShardInfo() {
1772             return shardInfo;
1773         }
1774
1775         OnShardInitialized getOnShardInitialized() {
1776             return onShardInitialized;
1777         }
1778     }
1779 }
1780
1781
1782