Send Shutdown message to Shard on ServerRemoved
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.Status;
20 import akka.actor.SupervisorStrategy;
21 import akka.cluster.ClusterEvent;
22 import akka.dispatch.Futures;
23 import akka.dispatch.OnComplete;
24 import akka.japi.Function;
25 import akka.pattern.Patterns;
26 import akka.persistence.RecoveryCompleted;
27 import akka.persistence.SaveSnapshotFailure;
28 import akka.persistence.SaveSnapshotSuccess;
29 import akka.persistence.SnapshotOffer;
30 import akka.persistence.SnapshotSelectionCriteria;
31 import akka.serialization.Serialization;
32 import akka.util.Timeout;
33 import com.google.common.annotations.VisibleForTesting;
34 import com.google.common.base.Objects;
35 import com.google.common.base.Optional;
36 import com.google.common.base.Preconditions;
37 import com.google.common.base.Strings;
38 import com.google.common.base.Supplier;
39 import com.google.common.collect.Sets;
40 import java.io.ByteArrayInputStream;
41 import java.io.ObjectInputStream;
42 import java.io.Serializable;
43 import java.util.ArrayList;
44 import java.util.Collection;
45 import java.util.Collections;
46 import java.util.HashMap;
47 import java.util.HashSet;
48 import java.util.Iterator;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.Set;
52 import java.util.concurrent.CountDownLatch;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.TimeoutException;
55 import javax.annotation.Nonnull;
56 import javax.annotation.Nullable;
57 import org.apache.commons.lang3.SerializationUtils;
58 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
59 import org.opendaylight.controller.cluster.datastore.config.Configuration;
60 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
63 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
64 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
65 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
66 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
67 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
68 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
69 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
70 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
71 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
72 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
73 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
77 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
78 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
79 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
80 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
81 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
82 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
83 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
84 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
85 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
86 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
87 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
88 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
89 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
90 import org.opendaylight.controller.cluster.raft.RaftState;
91 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
92 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
93 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
94 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
95 import org.opendaylight.controller.cluster.raft.messages.AddServer;
96 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
97 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
98 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
99 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
100 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
101 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
102 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
103 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
104 import org.slf4j.Logger;
105 import org.slf4j.LoggerFactory;
106 import scala.concurrent.ExecutionContext;
107 import scala.concurrent.Future;
108 import scala.concurrent.duration.Duration;
109 import scala.concurrent.duration.FiniteDuration;
110
111 /**
112  * The ShardManager has the following jobs,
113  * <ul>
114  * <li> Create all the local shard replicas that belong on this cluster member
115  * <li> Find the address of the local shard
116  * <li> Find the primary replica for any given shard
117  * <li> Monitor the cluster members and store their addresses
118  * <ul>
119  */
120 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
121
122     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
123
124     // Stores a mapping between a shard name and it's corresponding information
125     // Shard names look like inventory, topology etc and are as specified in
126     // configuration
127     private final Map<String, ShardInformation> localShards = new HashMap<>();
128
129     // The type of a ShardManager reflects the type of the datastore itself
130     // A data store could be of type config/operational
131     private final String type;
132
133     private final ClusterWrapper cluster;
134
135     private final Configuration configuration;
136
137     private final String shardDispatcherPath;
138
139     private final ShardManagerInfo mBean;
140
141     private DatastoreContextFactory datastoreContextFactory;
142
143     private final CountDownLatch waitTillReadyCountdownLatch;
144
145     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
146
147     private final ShardPeerAddressResolver peerAddressResolver;
148
149     private SchemaContext schemaContext;
150
151     private DatastoreSnapshot restoreFromSnapshot;
152
153     private ShardManagerSnapshot currentSnapshot;
154
155     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
156
157     private final String persistenceId;
158
159     /**
160      */
161     protected ShardManager(AbstractBuilder<?> builder) {
162
163         this.cluster = builder.cluster;
164         this.configuration = builder.configuration;
165         this.datastoreContextFactory = builder.datastoreContextFactory;
166         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
167         this.shardDispatcherPath =
168                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
169         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
170         this.primaryShardInfoCache = builder.primaryShardInfoCache;
171         this.restoreFromSnapshot = builder.restoreFromSnapshot;
172
173         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
174         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
175
176         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
177
178         // Subscribe this actor to cluster member events
179         cluster.subscribeToMemberEvents(getSelf());
180
181         List<String> localShardActorNames = new ArrayList<>();
182         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
183                 "shard-manager-" + this.type,
184                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
185                 localShardActorNames);
186         mBean.setShardManager(this);
187     }
188
189     @Override
190     public void postStop() {
191         LOG.info("Stopping ShardManager {}", persistenceId());
192
193         mBean.unregisterMBean();
194     }
195
196     @Override
197     public void handleCommand(Object message) throws Exception {
198         if (message  instanceof FindPrimary) {
199             findPrimary((FindPrimary)message);
200         } else if(message instanceof FindLocalShard){
201             findLocalShard((FindLocalShard) message);
202         } else if (message instanceof UpdateSchemaContext) {
203             updateSchemaContext(message);
204         } else if(message instanceof ActorInitialized) {
205             onActorInitialized(message);
206         } else if (message instanceof ClusterEvent.MemberUp){
207             memberUp((ClusterEvent.MemberUp) message);
208         } else if (message instanceof ClusterEvent.MemberExited){
209             memberExited((ClusterEvent.MemberExited) message);
210         } else if(message instanceof ClusterEvent.MemberRemoved) {
211             memberRemoved((ClusterEvent.MemberRemoved) message);
212         } else if(message instanceof ClusterEvent.UnreachableMember) {
213             memberUnreachable((ClusterEvent.UnreachableMember)message);
214         } else if(message instanceof ClusterEvent.ReachableMember) {
215             memberReachable((ClusterEvent.ReachableMember) message);
216         } else if(message instanceof DatastoreContextFactory) {
217             onDatastoreContextFactory((DatastoreContextFactory)message);
218         } else if(message instanceof RoleChangeNotification) {
219             onRoleChangeNotification((RoleChangeNotification) message);
220         } else if(message instanceof FollowerInitialSyncUpStatus){
221             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
222         } else if(message instanceof ShardNotInitializedTimeout) {
223             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
224         } else if(message instanceof ShardLeaderStateChanged) {
225             onLeaderStateChanged((ShardLeaderStateChanged) message);
226         } else if(message instanceof SwitchShardBehavior){
227             onSwitchShardBehavior((SwitchShardBehavior) message);
228         } else if(message instanceof CreateShard) {
229             onCreateShard((CreateShard)message);
230         } else if(message instanceof AddShardReplica){
231             onAddShardReplica((AddShardReplica)message);
232         } else if(message instanceof ForwardedAddServerReply) {
233             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
234             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
235                     msg.removeShardOnFailure);
236         } else if(message instanceof ForwardedAddServerFailure) {
237             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
238             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
239         } else if(message instanceof PrimaryShardFoundForContext) {
240             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
241             onPrimaryShardFoundContext(primaryShardFoundContext);
242         } else if(message instanceof RemoveShardReplica) {
243             onRemoveShardReplica((RemoveShardReplica) message);
244         } else if(message instanceof WrappedShardResponse){
245             onWrappedShardResponse((WrappedShardResponse) message);
246         } else if(message instanceof GetSnapshot) {
247             onGetSnapshot();
248         } else if(message instanceof ServerRemoved){
249             onShardReplicaRemoved((ServerRemoved) message);
250         } else if(message instanceof SaveSnapshotSuccess) {
251             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
252         } else if(message instanceof SaveSnapshotFailure) {
253             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
254                     persistenceId(), ((SaveSnapshotFailure) message).cause());
255         } else if(message instanceof Shutdown) {
256             onShutDown();
257         } else {
258             unknownMessage(message);
259         }
260     }
261
262     private void onShutDown() {
263         Shutdown shutdown = new Shutdown();
264         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
265         for (ShardInformation info : localShards.values()) {
266             if (info.getActor() != null) {
267                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
268
269                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
270                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, shutdown));
271             }
272         }
273
274         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
275
276         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
277         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
278
279         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
280             @Override
281             public void onComplete(Throwable failure, Iterable<Boolean> results) {
282                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
283
284                 self().tell(PoisonPill.getInstance(), self());
285
286                 if(failure != null) {
287                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
288                 } else {
289                     int nfailed = 0;
290                     for(Boolean r: results) {
291                         if(!r) {
292                             nfailed++;
293                         }
294                     }
295
296                     if(nfailed > 0) {
297                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
298                     }
299                 }
300             }
301         }, dispatcher);
302     }
303
304     private void onWrappedShardResponse(WrappedShardResponse message) {
305         if (message.getResponse() instanceof RemoveServerReply) {
306             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
307                     message.getLeaderPath());
308         }
309     }
310
311     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
312             String leaderPath) {
313         shardReplicaOperationsInProgress.remove(shardId);
314
315         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
316
317         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
318             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
319                     shardId.getShardName());
320             originalSender.tell(new akka.actor.Status.Success(null), getSelf());
321         } else {
322             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
323                     persistenceId(), shardId, replyMsg.getStatus());
324
325             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
326                     leaderPath, shardId);
327             originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
328         }
329     }
330
331     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
332         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
333             addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
334                     getSender());
335         } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
336             removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
337                     primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
338         }
339     }
340
341     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
342             final ActorRef sender) {
343         if(isShardReplicaOperationInProgress(shardName, sender)) {
344             return;
345         }
346
347         shardReplicaOperationsInProgress.add(shardName);
348
349         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
350
351         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
352
353         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
354         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
355                 primaryPath, shardId);
356
357         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
358                 duration());
359         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
360                 new RemoveServer(shardId.toString()), removeServerTimeout);
361
362         futureObj.onComplete(new OnComplete<Object>() {
363             @Override
364             public void onComplete(Throwable failure, Object response) {
365                 if (failure != null) {
366                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
367                             primaryPath, shardName);
368
369                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
370
371                     // FAILURE
372                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
373                 } else {
374                     // SUCCESS
375                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
376                 }
377             }
378         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
379     }
380
381     private void onShardReplicaRemoved(ServerRemoved message) {
382         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
383         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
384         if(shardInformation == null) {
385             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
386             return;
387         } else if(shardInformation.getActor() != null) {
388             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
389             shardInformation.getActor().tell(new Shutdown(), self());
390         }
391         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
392         persistShardList();
393     }
394
395     private void onGetSnapshot() {
396         LOG.debug("{}: onGetSnapshot", persistenceId());
397
398         List<String> notInitialized = null;
399         for(ShardInformation shardInfo: localShards.values()) {
400             if(!shardInfo.isShardInitialized()) {
401                 if(notInitialized == null) {
402                     notInitialized = new ArrayList<>();
403                 }
404
405                 notInitialized.add(shardInfo.getShardName());
406             }
407         }
408
409         if(notInitialized != null) {
410             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
411                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
412             return;
413         }
414
415         byte[] shardManagerSnapshot = null;
416         if(currentSnapshot != null) {
417             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
418         }
419
420         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
421                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
422                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
423
424         for(ShardInformation shardInfo: localShards.values()) {
425             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
426         }
427     }
428
429     private void onCreateShard(CreateShard createShard) {
430         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
431
432         Object reply;
433         try {
434             String shardName = createShard.getModuleShardConfig().getShardName();
435             if(localShards.containsKey(shardName)) {
436                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
437                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
438             } else {
439                 doCreateShard(createShard);
440                 reply = new akka.actor.Status.Success(null);
441             }
442         } catch (Exception e) {
443             LOG.error("{}: onCreateShard failed", persistenceId(), e);
444             reply = new akka.actor.Status.Failure(e);
445         }
446
447         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
448             getSender().tell(reply, getSelf());
449         }
450     }
451
452     private void doCreateShard(CreateShard createShard) {
453         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
454         String shardName = moduleShardConfig.getShardName();
455
456         configuration.addModuleShardConfiguration(moduleShardConfig);
457
458         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
459         if(shardDatastoreContext == null) {
460             shardDatastoreContext = newShardDatastoreContext(shardName);
461         } else {
462             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
463                     peerAddressResolver).build();
464         }
465
466         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
467
468         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
469                 currentSnapshot.getShardList().contains(shardName);
470
471         Map<String, String> peerAddresses;
472         boolean isActiveMember;
473         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
474                 contains(cluster.getCurrentMemberName())) {
475             peerAddresses = getPeerAddresses(shardName);
476             isActiveMember = true;
477         } else {
478             // The local member is not in the static shard member configuration and the shard did not
479             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
480             // the shard with no peers and with elections disabled so it stays as follower. A
481             // subsequent AddServer request will be needed to make it an active member.
482             isActiveMember = false;
483             peerAddresses = Collections.emptyMap();
484             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
485                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
486         }
487
488         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
489                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
490                 isActiveMember);
491
492         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
493                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
494         info.setActiveMember(isActiveMember);
495         localShards.put(info.getShardName(), info);
496
497         mBean.addLocalShard(shardId.toString());
498
499         if(schemaContext != null) {
500             info.setActor(newShardActor(schemaContext, info));
501         }
502     }
503
504     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
505         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
506                 shardPeerAddressResolver(peerAddressResolver);
507     }
508
509     private DatastoreContext newShardDatastoreContext(String shardName) {
510         return newShardDatastoreContextBuilder(shardName).build();
511     }
512
513     private void checkReady(){
514         if (isReadyWithLeaderId()) {
515             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
516                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
517
518             waitTillReadyCountdownLatch.countDown();
519         }
520     }
521
522     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
523         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
524
525         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
526         if(shardInformation != null) {
527             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
528             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
529             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
530                 primaryShardInfoCache.remove(shardInformation.getShardName());
531             }
532
533             checkReady();
534         } else {
535             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
536         }
537     }
538
539     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
540         ShardInformation shardInfo = message.getShardInfo();
541
542         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
543                 shardInfo.getShardName());
544
545         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
546
547         if(!shardInfo.isShardInitialized()) {
548             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
549             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
550         } else {
551             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
552             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
553         }
554     }
555
556     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
557         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
558                 status.getName(), status.isInitialSyncDone());
559
560         ShardInformation shardInformation = findShardInformation(status.getName());
561
562         if(shardInformation != null) {
563             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
564
565             mBean.setSyncStatus(isInSync());
566         }
567
568     }
569
570     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
571         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
572                 roleChanged.getOldRole(), roleChanged.getNewRole());
573
574         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
575         if(shardInformation != null) {
576             shardInformation.setRole(roleChanged.getNewRole());
577             checkReady();
578             mBean.setSyncStatus(isInSync());
579         }
580     }
581
582
583     private ShardInformation findShardInformation(String memberId) {
584         for(ShardInformation info : localShards.values()){
585             if(info.getShardId().toString().equals(memberId)){
586                 return info;
587             }
588         }
589
590         return null;
591     }
592
593     private boolean isReadyWithLeaderId() {
594         boolean isReady = true;
595         for (ShardInformation info : localShards.values()) {
596             if(!info.isShardReadyWithLeaderId()){
597                 isReady = false;
598                 break;
599             }
600         }
601         return isReady;
602     }
603
604     private boolean isInSync(){
605         for (ShardInformation info : localShards.values()) {
606             if(!info.isInSync()){
607                 return false;
608             }
609         }
610         return true;
611     }
612
613     private void onActorInitialized(Object message) {
614         final ActorRef sender = getSender();
615
616         if (sender == null) {
617             return; //why is a non-actor sending this message? Just ignore.
618         }
619
620         String actorName = sender.path().name();
621         //find shard name from actor name; actor name is stringified shardId
622         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
623
624         if (shardId.getShardName() == null) {
625             return;
626         }
627
628         markShardAsInitialized(shardId.getShardName());
629     }
630
631     private void markShardAsInitialized(String shardName) {
632         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
633
634         ShardInformation shardInformation = localShards.get(shardName);
635         if (shardInformation != null) {
636             shardInformation.setActorInitialized();
637
638             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
639         }
640     }
641
642     @Override
643     protected void handleRecover(Object message) throws Exception {
644         if (message instanceof RecoveryCompleted) {
645             onRecoveryCompleted();
646         } else if (message instanceof SnapshotOffer) {
647             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
648         }
649     }
650
651     private void onRecoveryCompleted() {
652         LOG.info("Recovery complete : {}", persistenceId());
653
654         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
655         // journal on upgrade from Helium.
656         deleteMessages(lastSequenceNr());
657
658         if(currentSnapshot == null && restoreFromSnapshot != null &&
659                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
660             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
661                     restoreFromSnapshot.getShardManagerSnapshot()))) {
662                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
663
664                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
665
666                 applyShardManagerSnapshot(snapshot);
667             } catch(Exception e) {
668                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
669             }
670         }
671
672         createLocalShards();
673     }
674
675     private void findLocalShard(FindLocalShard message) {
676         final ShardInformation shardInformation = localShards.get(message.getShardName());
677
678         if(shardInformation == null){
679             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
680             return;
681         }
682
683         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
684             @Override
685             public Object get() {
686                 return new LocalShardFound(shardInformation.getActor());
687             }
688         });
689     }
690
691     private void sendResponse(ShardInformation shardInformation, boolean doWait,
692             boolean wantShardReady, final Supplier<Object> messageSupplier) {
693         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
694             if(doWait) {
695                 final ActorRef sender = getSender();
696                 final ActorRef self = self();
697
698                 Runnable replyRunnable = new Runnable() {
699                     @Override
700                     public void run() {
701                         sender.tell(messageSupplier.get(), self);
702                     }
703                 };
704
705                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
706                     new OnShardInitialized(replyRunnable);
707
708                 shardInformation.addOnShardInitialized(onShardInitialized);
709
710                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
711                 if(shardInformation.isShardInitialized()) {
712                     // If the shard is already initialized then we'll wait enough time for the shard to
713                     // elect a leader, ie 2 times the election timeout.
714                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
715                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
716                 }
717
718                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
719                         shardInformation.getShardName());
720
721                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
722                         timeout, getSelf(),
723                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
724                         getContext().dispatcher(), getSelf());
725
726                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
727
728             } else if (!shardInformation.isShardInitialized()) {
729                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
730                         shardInformation.getShardName());
731                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
732             } else {
733                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
734                         shardInformation.getShardName());
735                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
736             }
737
738             return;
739         }
740
741         getSender().tell(messageSupplier.get(), getSelf());
742     }
743
744     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
745         return new NoShardLeaderException(null, shardId.toString());
746     }
747
748     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
749         return new NotInitializedException(String.format(
750                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
751     }
752
753     private void memberRemoved(ClusterEvent.MemberRemoved message) {
754         String memberName = message.member().roles().head();
755
756         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
757                 message.member().address());
758
759         peerAddressResolver.removePeerAddress(memberName);
760
761         for(ShardInformation info : localShards.values()){
762             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
763         }
764     }
765
766     private void memberExited(ClusterEvent.MemberExited message) {
767         String memberName = message.member().roles().head();
768
769         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
770                 message.member().address());
771
772         peerAddressResolver.removePeerAddress(memberName);
773
774         for(ShardInformation info : localShards.values()){
775             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
776         }
777     }
778
779     private void memberUp(ClusterEvent.MemberUp message) {
780         String memberName = message.member().roles().head();
781
782         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
783                 message.member().address());
784
785         addPeerAddress(memberName, message.member().address());
786
787         checkReady();
788     }
789
790     private void addPeerAddress(String memberName, Address address) {
791         peerAddressResolver.addPeerAddress(memberName, address);
792
793         for(ShardInformation info : localShards.values()){
794             String shardName = info.getShardName();
795             String peerId = getShardIdentifier(memberName, shardName).toString();
796             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
797
798             info.peerUp(memberName, peerId, getSelf());
799         }
800     }
801
802     private void memberReachable(ClusterEvent.ReachableMember message) {
803         String memberName = message.member().roles().head();
804         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
805
806         addPeerAddress(memberName, message.member().address());
807
808         markMemberAvailable(memberName);
809     }
810
811     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
812         String memberName = message.member().roles().head();
813         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
814
815         markMemberUnavailable(memberName);
816     }
817
818     private void markMemberUnavailable(final String memberName) {
819         for(ShardInformation info : localShards.values()){
820             String leaderId = info.getLeaderId();
821             if(leaderId != null && leaderId.contains(memberName)) {
822                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
823                 info.setLeaderAvailable(false);
824
825                 primaryShardInfoCache.remove(info.getShardName());
826             }
827
828             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
829         }
830     }
831
832     private void markMemberAvailable(final String memberName) {
833         for(ShardInformation info : localShards.values()){
834             String leaderId = info.getLeaderId();
835             if(leaderId != null && leaderId.contains(memberName)) {
836                 LOG.debug("Marking Leader {} as available.", leaderId);
837                 info.setLeaderAvailable(true);
838             }
839
840             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
841         }
842     }
843
844     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
845         datastoreContextFactory = factory;
846         for (ShardInformation info : localShards.values()) {
847             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
848         }
849     }
850
851     private void onSwitchShardBehavior(SwitchShardBehavior message) {
852         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
853
854         ShardInformation shardInformation = localShards.get(identifier.getShardName());
855
856         if(shardInformation != null && shardInformation.getActor() != null) {
857             shardInformation.getActor().tell(
858                     new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
859         } else {
860             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
861                     message.getShardName(), message.getNewState());
862         }
863     }
864
865     /**
866      * Notifies all the local shards of a change in the schema context
867      *
868      * @param message
869      */
870     private void updateSchemaContext(final Object message) {
871         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
872
873         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
874
875         for (ShardInformation info : localShards.values()) {
876             if (info.getActor() == null) {
877                 LOG.debug("Creating Shard {}", info.getShardId());
878                 info.setActor(newShardActor(schemaContext, info));
879             } else {
880                 info.getActor().tell(message, getSelf());
881             }
882         }
883     }
884
885     @VisibleForTesting
886     protected ClusterWrapper getCluster() {
887         return cluster;
888     }
889
890     @VisibleForTesting
891     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
892         return getContext().actorOf(info.newProps(schemaContext)
893                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
894     }
895
896     private void findPrimary(FindPrimary message) {
897         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
898
899         final String shardName = message.getShardName();
900         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
901
902         // First see if the there is a local replica for the shard
903         final ShardInformation info = localShards.get(shardName);
904         if (info != null && info.isActiveMember()) {
905             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
906                 @Override
907                 public Object get() {
908                     String primaryPath = info.getSerializedLeaderActor();
909                     Object found = canReturnLocalShardState && info.isLeader() ?
910                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
911                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
912
913                             if(LOG.isDebugEnabled()) {
914                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
915                             }
916
917                             return found;
918                 }
919             });
920
921             return;
922         }
923
924         Collection<String> visitedAddresses;
925         if(message instanceof RemoteFindPrimary) {
926             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
927         } else {
928             visitedAddresses = new ArrayList<>();
929         }
930
931         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
932
933         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
934             if(visitedAddresses.contains(address)) {
935                 continue;
936             }
937
938             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
939                     shardName, address);
940
941             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
942                     message.isWaitUntilReady(), visitedAddresses), getContext());
943             return;
944         }
945
946         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
947
948         getSender().tell(new PrimaryNotFoundException(
949                 String.format("No primary shard found for %s.", shardName)), getSelf());
950     }
951
952     /**
953      * Construct the name of the shard actor given the name of the member on
954      * which the shard resides and the name of the shard
955      *
956      * @param memberName
957      * @param shardName
958      * @return
959      */
960     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
961         return peerAddressResolver.getShardIdentifier(memberName, shardName);
962     }
963
964     /**
965      * Create shards that are local to the member on which the ShardManager
966      * runs
967      *
968      */
969     private void createLocalShards() {
970         String memberName = this.cluster.getCurrentMemberName();
971         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
972
973         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
974         if(restoreFromSnapshot != null)
975         {
976             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
977                 shardSnapshots.put(snapshot.getName(), snapshot);
978             }
979         }
980
981         restoreFromSnapshot = null; // null out to GC
982
983         for(String shardName : memberShardNames){
984             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
985
986             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
987
988             Map<String, String> peerAddresses = getPeerAddresses(shardName);
989             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
990                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
991                         shardSnapshots.get(shardName)), peerAddressResolver));
992             mBean.addLocalShard(shardId.toString());
993         }
994     }
995
996     /**
997      * Given the name of the shard find the addresses of all it's peers
998      *
999      * @param shardName
1000      */
1001     private Map<String, String> getPeerAddresses(String shardName) {
1002         Collection<String> members = configuration.getMembersFromShardName(shardName);
1003         Map<String, String> peerAddresses = new HashMap<>();
1004
1005         String currentMemberName = this.cluster.getCurrentMemberName();
1006
1007         for(String memberName : members) {
1008             if(!currentMemberName.equals(memberName)) {
1009                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1010                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1011                 peerAddresses.put(shardId.toString(), address);
1012             }
1013         }
1014         return peerAddresses;
1015     }
1016
1017     @Override
1018     public SupervisorStrategy supervisorStrategy() {
1019
1020         return new OneForOneStrategy(10, Duration.create("1 minute"),
1021                 new Function<Throwable, SupervisorStrategy.Directive>() {
1022             @Override
1023             public SupervisorStrategy.Directive apply(Throwable t) {
1024                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1025                 return SupervisorStrategy.resume();
1026             }
1027         }
1028                 );
1029
1030     }
1031
1032     @Override
1033     public String persistenceId() {
1034         return persistenceId;
1035     }
1036
1037     @VisibleForTesting
1038     ShardManagerInfoMBean getMBean(){
1039         return mBean;
1040     }
1041
1042     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1043         if (shardReplicaOperationsInProgress.contains(shardName)) {
1044             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1045             LOG.debug ("{}: {}", persistenceId(), msg);
1046             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1047             return true;
1048         }
1049
1050         return false;
1051     }
1052
1053     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
1054         final String shardName = shardReplicaMsg.getShardName();
1055
1056         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1057
1058         // verify the shard with the specified name is present in the cluster configuration
1059         if (!(this.configuration.isShardConfigured(shardName))) {
1060             String msg = String.format("No module configuration exists for shard %s", shardName);
1061             LOG.debug ("{}: {}", persistenceId(), msg);
1062             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1063             return;
1064         }
1065
1066         // Create the localShard
1067         if (schemaContext == null) {
1068             String msg = String.format(
1069                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1070             LOG.debug ("{}: {}", persistenceId(), msg);
1071             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1072             return;
1073         }
1074
1075         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1076             @Override
1077             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1078                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1079             }
1080
1081             @Override
1082             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1083                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1084             }
1085
1086         });
1087     }
1088
1089     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1090         String msg = String.format("Local shard %s already exists", shardName);
1091         LOG.debug ("{}: {}", persistenceId(), msg);
1092         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
1093     }
1094
1095     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1096         if(isShardReplicaOperationInProgress(shardName, sender)) {
1097             return;
1098         }
1099
1100         shardReplicaOperationsInProgress.add(shardName);
1101
1102         final ShardInformation shardInfo;
1103         final boolean removeShardOnFailure;
1104         ShardInformation existingShardInfo = localShards.get(shardName);
1105         if(existingShardInfo == null) {
1106             removeShardOnFailure = true;
1107             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1108
1109             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1110                     DisableElectionsRaftPolicy.class.getName()).build();
1111
1112             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1113                     Shard.builder(), peerAddressResolver);
1114             shardInfo.setActiveMember(false);
1115             localShards.put(shardName, shardInfo);
1116             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1117         } else {
1118             removeShardOnFailure = false;
1119             shardInfo = existingShardInfo;
1120         }
1121
1122         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1123
1124         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1125         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1126                 response.getPrimaryPath(), shardInfo.getShardId());
1127
1128         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1129                 duration());
1130         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1131             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1132
1133         futureObj.onComplete(new OnComplete<Object>() {
1134             @Override
1135             public void onComplete(Throwable failure, Object addServerResponse) {
1136                 if (failure != null) {
1137                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1138                             response.getPrimaryPath(), shardName, failure);
1139
1140                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1141                             response.getPrimaryPath(), shardName);
1142                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1143                 } else {
1144                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1145                             response.getPrimaryPath(), removeShardOnFailure), sender);
1146                 }
1147             }
1148         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1149     }
1150
1151     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1152             boolean removeShardOnFailure) {
1153         shardReplicaOperationsInProgress.remove(shardName);
1154
1155         if(removeShardOnFailure) {
1156             ShardInformation shardInfo = localShards.remove(shardName);
1157             if (shardInfo.getActor() != null) {
1158                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1159             }
1160         }
1161
1162         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
1163             new RuntimeException(message, failure)), getSelf());
1164     }
1165
1166     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1167             String leaderPath, boolean removeShardOnFailure) {
1168         String shardName = shardInfo.getShardName();
1169         shardReplicaOperationsInProgress.remove(shardName);
1170
1171         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1172
1173         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1174             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1175
1176             // Make the local shard voting capable
1177             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1178             shardInfo.setActiveMember(true);
1179             persistShardList();
1180
1181             mBean.addLocalShard(shardInfo.getShardId().toString());
1182             sender.tell(new akka.actor.Status.Success(null), getSelf());
1183         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1184             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1185         } else {
1186             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1187                     persistenceId(), shardName, replyMsg.getStatus());
1188
1189             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1190
1191             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1192         }
1193     }
1194
1195     private Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1196                                                String leaderPath, ShardIdentifier shardId) {
1197         Exception failure;
1198         switch (serverChangeStatus) {
1199             case TIMEOUT:
1200                 failure = new TimeoutException(String.format(
1201                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1202                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1203                         leaderPath, shardId.getShardName()));
1204                 break;
1205             case NO_LEADER:
1206                 failure = createNoShardLeaderException(shardId);
1207                 break;
1208             case NOT_SUPPORTED:
1209                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1210                         serverChange.getSimpleName(), shardId.getShardName()));
1211                 break;
1212             default :
1213                 failure = new RuntimeException(String.format(
1214                         "%s request to leader %s for shard %s failed with status %s",
1215                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1216         }
1217         return failure;
1218     }
1219
1220     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1221         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1222
1223         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1224                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1225             @Override
1226             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1227                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1228             }
1229
1230             @Override
1231             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1232                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1233             }
1234         });
1235     }
1236
1237     private void persistShardList() {
1238         List<String> shardList = new ArrayList<>(localShards.keySet());
1239         for (ShardInformation shardInfo : localShards.values()) {
1240             if (!shardInfo.isActiveMember()) {
1241                 shardList.remove(shardInfo.getShardName());
1242             }
1243         }
1244         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1245         saveSnapshot(updateShardManagerSnapshot(shardList));
1246     }
1247
1248     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1249         currentSnapshot = new ShardManagerSnapshot(shardList);
1250         return currentSnapshot;
1251     }
1252
1253     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1254         currentSnapshot = snapshot;
1255
1256         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1257
1258         String currentMember = cluster.getCurrentMemberName();
1259         Set<String> configuredShardList =
1260             new HashSet<>(configuration.getMemberShardNames(currentMember));
1261         for (String shard : currentSnapshot.getShardList()) {
1262             if (!configuredShardList.contains(shard)) {
1263                 // add the current member as a replica for the shard
1264                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1265                 configuration.addMemberReplicaForShard(shard, currentMember);
1266             } else {
1267                 configuredShardList.remove(shard);
1268             }
1269         }
1270         for (String shard : configuredShardList) {
1271             // remove the member as a replica for the shard
1272             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1273             configuration.removeMemberReplicaForShard(shard, currentMember);
1274         }
1275     }
1276
1277     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1278         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1279             persistenceId());
1280         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
1281     }
1282
1283     private static class ForwardedAddServerReply {
1284         ShardInformation shardInfo;
1285         AddServerReply addServerReply;
1286         String leaderPath;
1287         boolean removeShardOnFailure;
1288
1289         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1290                 boolean removeShardOnFailure) {
1291             this.shardInfo = shardInfo;
1292             this.addServerReply = addServerReply;
1293             this.leaderPath = leaderPath;
1294             this.removeShardOnFailure = removeShardOnFailure;
1295         }
1296     }
1297
1298     private static class ForwardedAddServerFailure {
1299         String shardName;
1300         String failureMessage;
1301         Throwable failure;
1302         boolean removeShardOnFailure;
1303
1304         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1305                 boolean removeShardOnFailure) {
1306             this.shardName = shardName;
1307             this.failureMessage = failureMessage;
1308             this.failure = failure;
1309             this.removeShardOnFailure = removeShardOnFailure;
1310         }
1311     }
1312
1313     @VisibleForTesting
1314     protected static class ShardInformation {
1315         private final ShardIdentifier shardId;
1316         private final String shardName;
1317         private ActorRef actor;
1318         private ActorPath actorPath;
1319         private final Map<String, String> initialPeerAddresses;
1320         private Optional<DataTree> localShardDataTree;
1321         private boolean leaderAvailable = false;
1322
1323         // flag that determines if the actor is ready for business
1324         private boolean actorInitialized = false;
1325
1326         private boolean followerSyncStatus = false;
1327
1328         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1329         private String role ;
1330         private String leaderId;
1331         private short leaderVersion;
1332
1333         private DatastoreContext datastoreContext;
1334         private Shard.AbstractBuilder<?, ?> builder;
1335         private final ShardPeerAddressResolver addressResolver;
1336         private boolean isActiveMember = true;
1337
1338         private ShardInformation(String shardName, ShardIdentifier shardId,
1339                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1340                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1341             this.shardName = shardName;
1342             this.shardId = shardId;
1343             this.initialPeerAddresses = initialPeerAddresses;
1344             this.datastoreContext = datastoreContext;
1345             this.builder = builder;
1346             this.addressResolver = addressResolver;
1347         }
1348
1349         Props newProps(SchemaContext schemaContext) {
1350             Preconditions.checkNotNull(builder);
1351             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1352                     schemaContext(schemaContext).props();
1353             builder = null;
1354             return props;
1355         }
1356
1357         String getShardName() {
1358             return shardName;
1359         }
1360
1361         @Nullable
1362         ActorRef getActor(){
1363             return actor;
1364         }
1365
1366         ActorPath getActorPath() {
1367             return actorPath;
1368         }
1369
1370         void setActor(ActorRef actor) {
1371             this.actor = actor;
1372             this.actorPath = actor.path();
1373         }
1374
1375         ShardIdentifier getShardId() {
1376             return shardId;
1377         }
1378
1379         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1380             this.localShardDataTree = localShardDataTree;
1381         }
1382
1383         Optional<DataTree> getLocalShardDataTree() {
1384             return localShardDataTree;
1385         }
1386
1387         DatastoreContext getDatastoreContext() {
1388             return datastoreContext;
1389         }
1390
1391         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1392             this.datastoreContext = datastoreContext;
1393             if (actor != null) {
1394                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1395                 actor.tell(this.datastoreContext, sender);
1396             }
1397         }
1398
1399         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1400             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1401
1402             if(actor != null) {
1403                 if(LOG.isDebugEnabled()) {
1404                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1405                             peerId, peerAddress, actor.path());
1406                 }
1407
1408                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1409             }
1410
1411             notifyOnShardInitializedCallbacks();
1412         }
1413
1414         void peerDown(String memberName, String peerId, ActorRef sender) {
1415             if(actor != null) {
1416                 actor.tell(new PeerDown(memberName, peerId), sender);
1417             }
1418         }
1419
1420         void peerUp(String memberName, String peerId, ActorRef sender) {
1421             if(actor != null) {
1422                 actor.tell(new PeerUp(memberName, peerId), sender);
1423             }
1424         }
1425
1426         boolean isShardReady() {
1427             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1428         }
1429
1430         boolean isShardReadyWithLeaderId() {
1431             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1432                     (isLeader() || addressResolver.resolve(leaderId) != null);
1433         }
1434
1435         boolean isShardInitialized() {
1436             return getActor() != null && actorInitialized;
1437         }
1438
1439         boolean isLeader() {
1440             return Objects.equal(leaderId, shardId.toString());
1441         }
1442
1443         String getSerializedLeaderActor() {
1444             if(isLeader()) {
1445                 return Serialization.serializedActorPath(getActor());
1446             } else {
1447                 return addressResolver.resolve(leaderId);
1448             }
1449         }
1450
1451         void setActorInitialized() {
1452             LOG.debug("Shard {} is initialized", shardId);
1453
1454             this.actorInitialized = true;
1455
1456             notifyOnShardInitializedCallbacks();
1457         }
1458
1459         private void notifyOnShardInitializedCallbacks() {
1460             if(onShardInitializedSet.isEmpty()) {
1461                 return;
1462             }
1463
1464             boolean ready = isShardReadyWithLeaderId();
1465
1466             if(LOG.isDebugEnabled()) {
1467                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1468                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1469             }
1470
1471             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1472             while(iter.hasNext()) {
1473                 OnShardInitialized onShardInitialized = iter.next();
1474                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1475                     iter.remove();
1476                     onShardInitialized.getTimeoutSchedule().cancel();
1477                     onShardInitialized.getReplyRunnable().run();
1478                 }
1479             }
1480         }
1481
1482         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1483             onShardInitializedSet.add(onShardInitialized);
1484         }
1485
1486         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1487             onShardInitializedSet.remove(onShardInitialized);
1488         }
1489
1490         void setRole(String newRole) {
1491             this.role = newRole;
1492
1493             notifyOnShardInitializedCallbacks();
1494         }
1495
1496         void setFollowerSyncStatus(boolean syncStatus){
1497             this.followerSyncStatus = syncStatus;
1498         }
1499
1500         boolean isInSync(){
1501             if(RaftState.Follower.name().equals(this.role)){
1502                 return followerSyncStatus;
1503             } else if(RaftState.Leader.name().equals(this.role)){
1504                 return true;
1505             }
1506
1507             return false;
1508         }
1509
1510         boolean setLeaderId(String leaderId) {
1511             boolean changed = !Objects.equal(this.leaderId, leaderId);
1512             this.leaderId = leaderId;
1513             if(leaderId != null) {
1514                 this.leaderAvailable = true;
1515             }
1516             notifyOnShardInitializedCallbacks();
1517
1518             return changed;
1519         }
1520
1521         String getLeaderId() {
1522             return leaderId;
1523         }
1524
1525         void setLeaderAvailable(boolean leaderAvailable) {
1526             this.leaderAvailable = leaderAvailable;
1527
1528             if(leaderAvailable) {
1529                 notifyOnShardInitializedCallbacks();
1530             }
1531         }
1532
1533         short getLeaderVersion() {
1534             return leaderVersion;
1535         }
1536
1537         void setLeaderVersion(short leaderVersion) {
1538             this.leaderVersion = leaderVersion;
1539         }
1540
1541         boolean isActiveMember() {
1542             return isActiveMember;
1543         }
1544
1545         void setActiveMember(boolean isActiveMember) {
1546             this.isActiveMember = isActiveMember;
1547         }
1548     }
1549
1550     private static class OnShardInitialized {
1551         private final Runnable replyRunnable;
1552         private Cancellable timeoutSchedule;
1553
1554         OnShardInitialized(Runnable replyRunnable) {
1555             this.replyRunnable = replyRunnable;
1556         }
1557
1558         Runnable getReplyRunnable() {
1559             return replyRunnable;
1560         }
1561
1562         Cancellable getTimeoutSchedule() {
1563             return timeoutSchedule;
1564         }
1565
1566         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1567             this.timeoutSchedule = timeoutSchedule;
1568         }
1569     }
1570
1571     private static class OnShardReady extends OnShardInitialized {
1572         OnShardReady(Runnable replyRunnable) {
1573             super(replyRunnable);
1574         }
1575     }
1576
1577     private static class ShardNotInitializedTimeout {
1578         private final ActorRef sender;
1579         private final ShardInformation shardInfo;
1580         private final OnShardInitialized onShardInitialized;
1581
1582         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1583             this.sender = sender;
1584             this.shardInfo = shardInfo;
1585             this.onShardInitialized = onShardInitialized;
1586         }
1587
1588         ActorRef getSender() {
1589             return sender;
1590         }
1591
1592         ShardInformation getShardInfo() {
1593             return shardInfo;
1594         }
1595
1596         OnShardInitialized getOnShardInitialized() {
1597             return onShardInitialized;
1598         }
1599     }
1600
1601     /**
1602      * We no longer persist SchemaContextModules but keep this class around for now for backwards
1603      * compatibility so we don't get de-serialization failures on upgrade from Helium.
1604      */
1605     @Deprecated
1606     static class SchemaContextModules implements Serializable {
1607         private static final long serialVersionUID = -8884620101025936590L;
1608
1609         private final Set<String> modules;
1610
1611         SchemaContextModules(Set<String> modules){
1612             this.modules = modules;
1613         }
1614
1615         public Set<String> getModules() {
1616             return modules;
1617         }
1618     }
1619
1620     public static Builder builder() {
1621         return new Builder();
1622     }
1623
1624     public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
1625         private ClusterWrapper cluster;
1626         private Configuration configuration;
1627         private DatastoreContextFactory datastoreContextFactory;
1628         private CountDownLatch waitTillReadyCountdownLatch;
1629         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1630         private DatastoreSnapshot restoreFromSnapshot;
1631
1632         private volatile boolean sealed;
1633
1634         @SuppressWarnings("unchecked")
1635         private T self() {
1636             return (T) this;
1637         }
1638
1639         protected void checkSealed() {
1640             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1641         }
1642
1643         public T cluster(ClusterWrapper cluster) {
1644             checkSealed();
1645             this.cluster = cluster;
1646             return self();
1647         }
1648
1649         public T configuration(Configuration configuration) {
1650             checkSealed();
1651             this.configuration = configuration;
1652             return self();
1653         }
1654
1655         public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1656             checkSealed();
1657             this.datastoreContextFactory = datastoreContextFactory;
1658             return self();
1659         }
1660
1661         public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1662             checkSealed();
1663             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1664             return self();
1665         }
1666
1667         public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1668             checkSealed();
1669             this.primaryShardInfoCache = primaryShardInfoCache;
1670             return self();
1671         }
1672
1673         public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1674             checkSealed();
1675             this.restoreFromSnapshot = restoreFromSnapshot;
1676             return self();
1677         }
1678
1679         protected void verify() {
1680             sealed = true;
1681             Preconditions.checkNotNull(cluster, "cluster should not be null");
1682             Preconditions.checkNotNull(configuration, "configuration should not be null");
1683             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1684             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1685             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1686         }
1687
1688         public Props props() {
1689             verify();
1690             return Props.create(ShardManager.class, this);
1691         }
1692     }
1693
1694     public static class Builder extends AbstractBuilder<Builder> {
1695     }
1696
1697     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1698         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1699                 getShardInitializationTimeout().duration().$times(2));
1700
1701
1702         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1703         futureObj.onComplete(new OnComplete<Object>() {
1704             @Override
1705             public void onComplete(Throwable failure, Object response) {
1706                 if (failure != null) {
1707                     handler.onFailure(failure);
1708                 } else {
1709                     if(response instanceof RemotePrimaryShardFound) {
1710                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1711                     } else if(response instanceof LocalPrimaryShardFound) {
1712                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1713                     } else {
1714                         handler.onUnknownResponse(response);
1715                     }
1716                 }
1717             }
1718         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1719     }
1720
1721     /**
1722      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1723      * a remote or local find primary message is processed
1724      */
1725     private static interface FindPrimaryResponseHandler {
1726         /**
1727          * Invoked when a Failure message is received as a response
1728          *
1729          * @param failure
1730          */
1731         void onFailure(Throwable failure);
1732
1733         /**
1734          * Invoked when a RemotePrimaryShardFound response is received
1735          *
1736          * @param response
1737          */
1738         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1739
1740         /**
1741          * Invoked when a LocalPrimaryShardFound response is received
1742          * @param response
1743          */
1744         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1745
1746         /**
1747          * Invoked when an unknown response is received. This is another type of failure.
1748          *
1749          * @param response
1750          */
1751         void onUnknownResponse(Object response);
1752     }
1753
1754     /**
1755      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1756      * replica and sends a wrapped Failure response to some targetActor
1757      */
1758     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1759         private final ActorRef targetActor;
1760         private final String shardName;
1761         private final String persistenceId;
1762         private final ActorRef shardManagerActor;
1763
1764         /**
1765          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1766          * @param shardName The name of the shard for which the primary replica had to be found
1767          * @param persistenceId The persistenceId for the ShardManager
1768          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1769          */
1770         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1771             this.targetActor = Preconditions.checkNotNull(targetActor);
1772             this.shardName = Preconditions.checkNotNull(shardName);
1773             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1774             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1775         }
1776
1777         public ActorRef getTargetActor() {
1778             return targetActor;
1779         }
1780
1781         public String getShardName() {
1782             return shardName;
1783         }
1784
1785         @Override
1786         public void onFailure(Throwable failure) {
1787             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1788             targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1789                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1790         }
1791
1792         @Override
1793         public void onUnknownResponse(Object response) {
1794             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1795                     shardName, response);
1796             LOG.debug ("{}: {}", persistenceId, msg);
1797             targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1798                     new RuntimeException(msg)), shardManagerActor);
1799         }
1800     }
1801
1802
1803     /**
1804      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1805      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1806      * as a successful response to find primary.
1807      */
1808     private static class PrimaryShardFoundForContext {
1809         private final String shardName;
1810         private final Object contextMessage;
1811         private final RemotePrimaryShardFound remotePrimaryShardFound;
1812         private final LocalPrimaryShardFound localPrimaryShardFound;
1813
1814         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
1815                 @Nonnull Object primaryFoundMessage) {
1816             this.shardName = Preconditions.checkNotNull(shardName);
1817             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1818             Preconditions.checkNotNull(primaryFoundMessage);
1819             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
1820                     (RemotePrimaryShardFound) primaryFoundMessage : null;
1821             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
1822                     (LocalPrimaryShardFound) primaryFoundMessage : null;
1823         }
1824
1825         @Nonnull
1826         String getPrimaryPath(){
1827             if(remotePrimaryShardFound != null) {
1828                 return remotePrimaryShardFound.getPrimaryPath();
1829             }
1830             return localPrimaryShardFound.getPrimaryPath();
1831         }
1832
1833         @Nonnull
1834         Object getContextMessage() {
1835             return contextMessage;
1836         }
1837
1838         @Nullable
1839         RemotePrimaryShardFound getRemotePrimaryShardFound() {
1840             return remotePrimaryShardFound;
1841         }
1842
1843         @Nonnull
1844         String getShardName() {
1845             return shardName;
1846         }
1847     }
1848
1849     /**
1850      * The WrappedShardResponse class wraps a response from a Shard.
1851      */
1852     private static class WrappedShardResponse {
1853         private final ShardIdentifier shardId;
1854         private final Object response;
1855         private final String leaderPath;
1856
1857         private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1858             this.shardId = shardId;
1859             this.response = response;
1860             this.leaderPath = leaderPath;
1861         }
1862
1863         ShardIdentifier getShardId() {
1864             return shardId;
1865         }
1866
1867         Object getResponse() {
1868             return response;
1869         }
1870
1871         String getLeaderPath() {
1872             return leaderPath;
1873         }
1874     }
1875 }
1876
1877
1878