Move SwitchShardBehaviorMessage to shardmanager package
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.Futures;
22 import akka.dispatch.OnComplete;
23 import akka.japi.Function;
24 import akka.pattern.Patterns;
25 import akka.persistence.RecoveryCompleted;
26 import akka.persistence.SaveSnapshotFailure;
27 import akka.persistence.SaveSnapshotSuccess;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.serialization.Serialization;
31 import akka.util.Timeout;
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.base.Objects;
34 import com.google.common.base.Optional;
35 import com.google.common.base.Preconditions;
36 import com.google.common.base.Strings;
37 import com.google.common.base.Supplier;
38 import com.google.common.collect.Sets;
39 import java.io.ByteArrayInputStream;
40 import java.io.ObjectInputStream;
41 import java.util.ArrayList;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.HashMap;
45 import java.util.HashSet;
46 import java.util.Iterator;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.Set;
50 import java.util.concurrent.CountDownLatch;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import javax.annotation.Nonnull;
54 import javax.annotation.Nullable;
55 import org.apache.commons.lang3.SerializationUtils;
56 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
57 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
58 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
59 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
60 import org.opendaylight.controller.cluster.datastore.Shard;
61 import org.opendaylight.controller.cluster.datastore.config.Configuration;
62 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
63 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
64 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
65 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
66 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
67 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
68 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
69 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
70 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
71 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
72 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
73 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
74 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
77 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
78 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
79 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
80 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
81 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
82 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
83 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
84 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
85 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
86 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
87 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
88 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
89 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
90 import org.opendaylight.controller.cluster.raft.RaftState;
91 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
92 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
93 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
94 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
95 import org.opendaylight.controller.cluster.raft.messages.AddServer;
96 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
97 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
98 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
99 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
100 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
101 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
102 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
103 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
104 import org.slf4j.Logger;
105 import org.slf4j.LoggerFactory;
106 import scala.concurrent.ExecutionContext;
107 import scala.concurrent.Future;
108 import scala.concurrent.duration.Duration;
109 import scala.concurrent.duration.FiniteDuration;
110
111 /**
112  * The ShardManager has the following jobs,
113  * <ul>
114  * <li> Create all the local shard replicas that belong on this cluster member
115  * <li> Find the address of the local shard
116  * <li> Find the primary replica for any given shard
117  * <li> Monitor the cluster members and store their addresses
118  * <ul>
119  */
120 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
121
122     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
123
124     // Stores a mapping between a shard name and it's corresponding information
125     // Shard names look like inventory, topology etc and are as specified in
126     // configuration
127     private final Map<String, ShardInformation> localShards = new HashMap<>();
128
129     // The type of a ShardManager reflects the type of the datastore itself
130     // A data store could be of type config/operational
131     private final String type;
132
133     private final ClusterWrapper cluster;
134
135     private final Configuration configuration;
136
137     private final String shardDispatcherPath;
138
139     private final ShardManagerInfo mBean;
140
141     private DatastoreContextFactory datastoreContextFactory;
142
143     private final CountDownLatch waitTillReadyCountdownLatch;
144
145     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
146
147     private final ShardPeerAddressResolver peerAddressResolver;
148
149     private SchemaContext schemaContext;
150
151     private DatastoreSnapshot restoreFromSnapshot;
152
153     private ShardManagerSnapshot currentSnapshot;
154
155     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
156
157     private final String persistenceId;
158
159     /**
160      */
161     protected ShardManager(AbstractBuilder<?> builder) {
162
163         this.cluster = builder.cluster;
164         this.configuration = builder.configuration;
165         this.datastoreContextFactory = builder.datastoreContextFactory;
166         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
167         this.shardDispatcherPath =
168                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
169         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
170         this.primaryShardInfoCache = builder.primaryShardInfoCache;
171         this.restoreFromSnapshot = builder.restoreFromSnapshot;
172
173         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
174         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
175
176         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
177
178         // Subscribe this actor to cluster member events
179         cluster.subscribeToMemberEvents(getSelf());
180
181         List<String> localShardActorNames = new ArrayList<>();
182         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
183                 "shard-manager-" + this.type,
184                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
185                 localShardActorNames);
186         mBean.setShardManager(this);
187     }
188
189     @Override
190     public void postStop() {
191         LOG.info("Stopping ShardManager {}", persistenceId());
192
193         mBean.unregisterMBean();
194     }
195
196     @Override
197     public void handleCommand(Object message) throws Exception {
198         if (message  instanceof FindPrimary) {
199             findPrimary((FindPrimary)message);
200         } else if(message instanceof FindLocalShard){
201             findLocalShard((FindLocalShard) message);
202         } else if (message instanceof UpdateSchemaContext) {
203             updateSchemaContext(message);
204         } else if(message instanceof ActorInitialized) {
205             onActorInitialized(message);
206         } else if (message instanceof ClusterEvent.MemberUp){
207             memberUp((ClusterEvent.MemberUp) message);
208         } else if (message instanceof ClusterEvent.MemberExited){
209             memberExited((ClusterEvent.MemberExited) message);
210         } else if(message instanceof ClusterEvent.MemberRemoved) {
211             memberRemoved((ClusterEvent.MemberRemoved) message);
212         } else if(message instanceof ClusterEvent.UnreachableMember) {
213             memberUnreachable((ClusterEvent.UnreachableMember)message);
214         } else if(message instanceof ClusterEvent.ReachableMember) {
215             memberReachable((ClusterEvent.ReachableMember) message);
216         } else if(message instanceof DatastoreContextFactory) {
217             onDatastoreContextFactory((DatastoreContextFactory)message);
218         } else if(message instanceof RoleChangeNotification) {
219             onRoleChangeNotification((RoleChangeNotification) message);
220         } else if(message instanceof FollowerInitialSyncUpStatus){
221             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
222         } else if(message instanceof ShardNotInitializedTimeout) {
223             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
224         } else if(message instanceof ShardLeaderStateChanged) {
225             onLeaderStateChanged((ShardLeaderStateChanged) message);
226         } else if(message instanceof SwitchShardBehavior){
227             onSwitchShardBehavior((SwitchShardBehavior) message);
228         } else if(message instanceof CreateShard) {
229             onCreateShard((CreateShard)message);
230         } else if(message instanceof AddShardReplica){
231             onAddShardReplica((AddShardReplica)message);
232         } else if(message instanceof ForwardedAddServerReply) {
233             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
234             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
235                     msg.removeShardOnFailure);
236         } else if(message instanceof ForwardedAddServerFailure) {
237             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
238             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
239         } else if(message instanceof PrimaryShardFoundForContext) {
240             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
241             onPrimaryShardFoundContext(primaryShardFoundContext);
242         } else if(message instanceof RemoveShardReplica) {
243             onRemoveShardReplica((RemoveShardReplica) message);
244         } else if(message instanceof WrappedShardResponse){
245             onWrappedShardResponse((WrappedShardResponse) message);
246         } else if(message instanceof GetSnapshot) {
247             onGetSnapshot();
248         } else if(message instanceof ServerRemoved){
249             onShardReplicaRemoved((ServerRemoved) message);
250         } else if(message instanceof SaveSnapshotSuccess) {
251             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
252         } else if(message instanceof SaveSnapshotFailure) {
253             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
254                     persistenceId(), ((SaveSnapshotFailure) message).cause());
255         } else if(message instanceof Shutdown) {
256             onShutDown();
257         } else {
258             unknownMessage(message);
259         }
260     }
261
262     private void onShutDown() {
263         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
264         for (ShardInformation info : localShards.values()) {
265             if (info.getActor() != null) {
266                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
267
268                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
269                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
270             }
271         }
272
273         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
274
275         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
276         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
277
278         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
279             @Override
280             public void onComplete(Throwable failure, Iterable<Boolean> results) {
281                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
282
283                 self().tell(PoisonPill.getInstance(), self());
284
285                 if(failure != null) {
286                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
287                 } else {
288                     int nfailed = 0;
289                     for(Boolean r: results) {
290                         if(!r) {
291                             nfailed++;
292                         }
293                     }
294
295                     if(nfailed > 0) {
296                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
297                     }
298                 }
299             }
300         }, dispatcher);
301     }
302
303     private void onWrappedShardResponse(WrappedShardResponse message) {
304         if (message.getResponse() instanceof RemoveServerReply) {
305             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
306                     message.getLeaderPath());
307         }
308     }
309
310     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
311             String leaderPath) {
312         shardReplicaOperationsInProgress.remove(shardId);
313
314         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
315
316         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
317             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
318                     shardId.getShardName());
319             originalSender.tell(new akka.actor.Status.Success(null), getSelf());
320         } else {
321             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
322                     persistenceId(), shardId, replyMsg.getStatus());
323
324             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
325                     leaderPath, shardId);
326             originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
327         }
328     }
329
330     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
331         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
332             addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
333                     getSender());
334         } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
335             removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
336                     primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
337         }
338     }
339
340     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
341             final ActorRef sender) {
342         if(isShardReplicaOperationInProgress(shardName, sender)) {
343             return;
344         }
345
346         shardReplicaOperationsInProgress.add(shardName);
347
348         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
349
350         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
351
352         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
353         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
354                 primaryPath, shardId);
355
356         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
357                 duration());
358         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
359                 new RemoveServer(shardId.toString()), removeServerTimeout);
360
361         futureObj.onComplete(new OnComplete<Object>() {
362             @Override
363             public void onComplete(Throwable failure, Object response) {
364                 if (failure != null) {
365                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
366                             primaryPath, shardName);
367
368                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
369
370                     // FAILURE
371                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
372                 } else {
373                     // SUCCESS
374                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
375                 }
376             }
377         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
378     }
379
380     private void onShardReplicaRemoved(ServerRemoved message) {
381         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
382         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
383         if(shardInformation == null) {
384             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
385             return;
386         } else if(shardInformation.getActor() != null) {
387             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
388             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
389         }
390         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
391         persistShardList();
392     }
393
394     private void onGetSnapshot() {
395         LOG.debug("{}: onGetSnapshot", persistenceId());
396
397         List<String> notInitialized = null;
398         for(ShardInformation shardInfo: localShards.values()) {
399             if(!shardInfo.isShardInitialized()) {
400                 if(notInitialized == null) {
401                     notInitialized = new ArrayList<>();
402                 }
403
404                 notInitialized.add(shardInfo.getShardName());
405             }
406         }
407
408         if(notInitialized != null) {
409             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
410                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
411             return;
412         }
413
414         byte[] shardManagerSnapshot = null;
415         if(currentSnapshot != null) {
416             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
417         }
418
419         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
420                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
421                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
422
423         for(ShardInformation shardInfo: localShards.values()) {
424             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
425         }
426     }
427
428     private void onCreateShard(CreateShard createShard) {
429         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
430
431         Object reply;
432         try {
433             String shardName = createShard.getModuleShardConfig().getShardName();
434             if(localShards.containsKey(shardName)) {
435                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
436                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
437             } else {
438                 doCreateShard(createShard);
439                 reply = new akka.actor.Status.Success(null);
440             }
441         } catch (Exception e) {
442             LOG.error("{}: onCreateShard failed", persistenceId(), e);
443             reply = new akka.actor.Status.Failure(e);
444         }
445
446         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
447             getSender().tell(reply, getSelf());
448         }
449     }
450
451     private void doCreateShard(CreateShard createShard) {
452         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
453         String shardName = moduleShardConfig.getShardName();
454
455         configuration.addModuleShardConfiguration(moduleShardConfig);
456
457         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
458         if(shardDatastoreContext == null) {
459             shardDatastoreContext = newShardDatastoreContext(shardName);
460         } else {
461             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
462                     peerAddressResolver).build();
463         }
464
465         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
466
467         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
468                 currentSnapshot.getShardList().contains(shardName);
469
470         Map<String, String> peerAddresses;
471         boolean isActiveMember;
472         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
473                 contains(cluster.getCurrentMemberName())) {
474             peerAddresses = getPeerAddresses(shardName);
475             isActiveMember = true;
476         } else {
477             // The local member is not in the static shard member configuration and the shard did not
478             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
479             // the shard with no peers and with elections disabled so it stays as follower. A
480             // subsequent AddServer request will be needed to make it an active member.
481             isActiveMember = false;
482             peerAddresses = Collections.emptyMap();
483             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
484                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
485         }
486
487         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
488                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
489                 isActiveMember);
490
491         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
492                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
493         info.setActiveMember(isActiveMember);
494         localShards.put(info.getShardName(), info);
495
496         mBean.addLocalShard(shardId.toString());
497
498         if(schemaContext != null) {
499             info.setActor(newShardActor(schemaContext, info));
500         }
501     }
502
503     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
504         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
505                 shardPeerAddressResolver(peerAddressResolver);
506     }
507
508     private DatastoreContext newShardDatastoreContext(String shardName) {
509         return newShardDatastoreContextBuilder(shardName).build();
510     }
511
512     private void checkReady(){
513         if (isReadyWithLeaderId()) {
514             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
515                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
516
517             waitTillReadyCountdownLatch.countDown();
518         }
519     }
520
521     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
522         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
523
524         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
525         if(shardInformation != null) {
526             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
527             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
528             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
529                 primaryShardInfoCache.remove(shardInformation.getShardName());
530             }
531
532             checkReady();
533         } else {
534             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
535         }
536     }
537
538     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
539         ShardInformation shardInfo = message.getShardInfo();
540
541         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
542                 shardInfo.getShardName());
543
544         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
545
546         if(!shardInfo.isShardInitialized()) {
547             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
548             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
549         } else {
550             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
551             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
552         }
553     }
554
555     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
556         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
557                 status.getName(), status.isInitialSyncDone());
558
559         ShardInformation shardInformation = findShardInformation(status.getName());
560
561         if(shardInformation != null) {
562             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
563
564             mBean.setSyncStatus(isInSync());
565         }
566
567     }
568
569     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
570         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
571                 roleChanged.getOldRole(), roleChanged.getNewRole());
572
573         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
574         if(shardInformation != null) {
575             shardInformation.setRole(roleChanged.getNewRole());
576             checkReady();
577             mBean.setSyncStatus(isInSync());
578         }
579     }
580
581
582     private ShardInformation findShardInformation(String memberId) {
583         for(ShardInformation info : localShards.values()){
584             if(info.getShardId().toString().equals(memberId)){
585                 return info;
586             }
587         }
588
589         return null;
590     }
591
592     private boolean isReadyWithLeaderId() {
593         boolean isReady = true;
594         for (ShardInformation info : localShards.values()) {
595             if(!info.isShardReadyWithLeaderId()){
596                 isReady = false;
597                 break;
598             }
599         }
600         return isReady;
601     }
602
603     private boolean isInSync(){
604         for (ShardInformation info : localShards.values()) {
605             if(!info.isInSync()){
606                 return false;
607             }
608         }
609         return true;
610     }
611
612     private void onActorInitialized(Object message) {
613         final ActorRef sender = getSender();
614
615         if (sender == null) {
616             return; //why is a non-actor sending this message? Just ignore.
617         }
618
619         String actorName = sender.path().name();
620         //find shard name from actor name; actor name is stringified shardId
621         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
622
623         if (shardId.getShardName() == null) {
624             return;
625         }
626
627         markShardAsInitialized(shardId.getShardName());
628     }
629
630     private void markShardAsInitialized(String shardName) {
631         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
632
633         ShardInformation shardInformation = localShards.get(shardName);
634         if (shardInformation != null) {
635             shardInformation.setActorInitialized();
636
637             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
638         }
639     }
640
641     @Override
642     protected void handleRecover(Object message) throws Exception {
643         if (message instanceof RecoveryCompleted) {
644             onRecoveryCompleted();
645         } else if (message instanceof SnapshotOffer) {
646             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
647         }
648     }
649
650     private void onRecoveryCompleted() {
651         LOG.info("Recovery complete : {}", persistenceId());
652
653         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
654         // journal on upgrade from Helium.
655         deleteMessages(lastSequenceNr());
656
657         if(currentSnapshot == null && restoreFromSnapshot != null &&
658                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
659             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
660                     restoreFromSnapshot.getShardManagerSnapshot()))) {
661                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
662
663                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
664
665                 applyShardManagerSnapshot(snapshot);
666             } catch(Exception e) {
667                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
668             }
669         }
670
671         createLocalShards();
672     }
673
674     private void findLocalShard(FindLocalShard message) {
675         final ShardInformation shardInformation = localShards.get(message.getShardName());
676
677         if(shardInformation == null){
678             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
679             return;
680         }
681
682         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
683             @Override
684             public Object get() {
685                 return new LocalShardFound(shardInformation.getActor());
686             }
687         });
688     }
689
690     private void sendResponse(ShardInformation shardInformation, boolean doWait,
691             boolean wantShardReady, final Supplier<Object> messageSupplier) {
692         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
693             if(doWait) {
694                 final ActorRef sender = getSender();
695                 final ActorRef self = self();
696
697                 Runnable replyRunnable = new Runnable() {
698                     @Override
699                     public void run() {
700                         sender.tell(messageSupplier.get(), self);
701                     }
702                 };
703
704                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
705                     new OnShardInitialized(replyRunnable);
706
707                 shardInformation.addOnShardInitialized(onShardInitialized);
708
709                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
710                 if(shardInformation.isShardInitialized()) {
711                     // If the shard is already initialized then we'll wait enough time for the shard to
712                     // elect a leader, ie 2 times the election timeout.
713                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
714                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
715                 }
716
717                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
718                         shardInformation.getShardName());
719
720                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
721                         timeout, getSelf(),
722                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
723                         getContext().dispatcher(), getSelf());
724
725                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
726
727             } else if (!shardInformation.isShardInitialized()) {
728                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
729                         shardInformation.getShardName());
730                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
731             } else {
732                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
733                         shardInformation.getShardName());
734                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
735             }
736
737             return;
738         }
739
740         getSender().tell(messageSupplier.get(), getSelf());
741     }
742
743     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
744         return new NoShardLeaderException(null, shardId.toString());
745     }
746
747     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
748         return new NotInitializedException(String.format(
749                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
750     }
751
752     private void memberRemoved(ClusterEvent.MemberRemoved message) {
753         String memberName = message.member().roles().iterator().next();
754
755         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
756                 message.member().address());
757
758         peerAddressResolver.removePeerAddress(memberName);
759
760         for(ShardInformation info : localShards.values()){
761             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
762         }
763     }
764
765     private void memberExited(ClusterEvent.MemberExited message) {
766         String memberName = message.member().roles().iterator().next();
767
768         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
769                 message.member().address());
770
771         peerAddressResolver.removePeerAddress(memberName);
772
773         for(ShardInformation info : localShards.values()){
774             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
775         }
776     }
777
778     private void memberUp(ClusterEvent.MemberUp message) {
779         String memberName = message.member().roles().iterator().next();
780
781         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
782                 message.member().address());
783
784         addPeerAddress(memberName, message.member().address());
785
786         checkReady();
787     }
788
789     private void addPeerAddress(String memberName, Address address) {
790         peerAddressResolver.addPeerAddress(memberName, address);
791
792         for(ShardInformation info : localShards.values()){
793             String shardName = info.getShardName();
794             String peerId = getShardIdentifier(memberName, shardName).toString();
795             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
796
797             info.peerUp(memberName, peerId, getSelf());
798         }
799     }
800
801     private void memberReachable(ClusterEvent.ReachableMember message) {
802         String memberName = message.member().roles().iterator().next();
803         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
804
805         addPeerAddress(memberName, message.member().address());
806
807         markMemberAvailable(memberName);
808     }
809
810     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
811         String memberName = message.member().roles().iterator().next();
812         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
813
814         markMemberUnavailable(memberName);
815     }
816
817     private void markMemberUnavailable(final String memberName) {
818         for(ShardInformation info : localShards.values()){
819             String leaderId = info.getLeaderId();
820             if(leaderId != null && leaderId.contains(memberName)) {
821                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
822                 info.setLeaderAvailable(false);
823
824                 primaryShardInfoCache.remove(info.getShardName());
825             }
826
827             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
828         }
829     }
830
831     private void markMemberAvailable(final String memberName) {
832         for(ShardInformation info : localShards.values()){
833             String leaderId = info.getLeaderId();
834             if(leaderId != null && leaderId.contains(memberName)) {
835                 LOG.debug("Marking Leader {} as available.", leaderId);
836                 info.setLeaderAvailable(true);
837             }
838
839             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
840         }
841     }
842
843     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
844         datastoreContextFactory = factory;
845         for (ShardInformation info : localShards.values()) {
846             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
847         }
848     }
849
850     private void onSwitchShardBehavior(SwitchShardBehavior message) {
851         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
852
853         ShardInformation shardInformation = localShards.get(identifier.getShardName());
854
855         if(shardInformation != null && shardInformation.getActor() != null) {
856             shardInformation.getActor().tell(
857                     new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
858         } else {
859             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
860                     message.getShardName(), message.getNewState());
861         }
862     }
863
864     /**
865      * Notifies all the local shards of a change in the schema context
866      *
867      * @param message
868      */
869     private void updateSchemaContext(final Object message) {
870         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
871
872         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
873
874         for (ShardInformation info : localShards.values()) {
875             if (info.getActor() == null) {
876                 LOG.debug("Creating Shard {}", info.getShardId());
877                 info.setActor(newShardActor(schemaContext, info));
878             } else {
879                 info.getActor().tell(message, getSelf());
880             }
881         }
882     }
883
884     @VisibleForTesting
885     protected ClusterWrapper getCluster() {
886         return cluster;
887     }
888
889     @VisibleForTesting
890     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
891         return getContext().actorOf(info.newProps(schemaContext)
892                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
893     }
894
895     private void findPrimary(FindPrimary message) {
896         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
897
898         final String shardName = message.getShardName();
899         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
900
901         // First see if the there is a local replica for the shard
902         final ShardInformation info = localShards.get(shardName);
903         if (info != null && info.isActiveMember()) {
904             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
905                 @Override
906                 public Object get() {
907                     String primaryPath = info.getSerializedLeaderActor();
908                     Object found = canReturnLocalShardState && info.isLeader() ?
909                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
910                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
911
912                             if(LOG.isDebugEnabled()) {
913                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
914                             }
915
916                             return found;
917                 }
918             });
919
920             return;
921         }
922
923         final Collection<String> visitedAddresses;
924         if(message instanceof RemoteFindPrimary) {
925             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
926         } else {
927             visitedAddresses = new ArrayList<>(1);
928         }
929
930         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
931
932         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
933             if(visitedAddresses.contains(address)) {
934                 continue;
935             }
936
937             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
938                     persistenceId(), shardName, address, visitedAddresses);
939
940             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
941                     message.isWaitUntilReady(), visitedAddresses), getContext());
942             return;
943         }
944
945         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
946
947         getSender().tell(new PrimaryNotFoundException(
948                 String.format("No primary shard found for %s.", shardName)), getSelf());
949     }
950
951     /**
952      * Construct the name of the shard actor given the name of the member on
953      * which the shard resides and the name of the shard
954      *
955      * @param memberName
956      * @param shardName
957      * @return
958      */
959     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
960         return peerAddressResolver.getShardIdentifier(memberName, shardName);
961     }
962
963     /**
964      * Create shards that are local to the member on which the ShardManager
965      * runs
966      *
967      */
968     private void createLocalShards() {
969         String memberName = this.cluster.getCurrentMemberName();
970         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
971
972         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
973         if(restoreFromSnapshot != null)
974         {
975             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
976                 shardSnapshots.put(snapshot.getName(), snapshot);
977             }
978         }
979
980         restoreFromSnapshot = null; // null out to GC
981
982         for(String shardName : memberShardNames){
983             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
984
985             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
986
987             Map<String, String> peerAddresses = getPeerAddresses(shardName);
988             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
989                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
990                         shardSnapshots.get(shardName)), peerAddressResolver));
991             mBean.addLocalShard(shardId.toString());
992         }
993     }
994
995     /**
996      * Given the name of the shard find the addresses of all it's peers
997      *
998      * @param shardName
999      */
1000     private Map<String, String> getPeerAddresses(String shardName) {
1001         Collection<String> members = configuration.getMembersFromShardName(shardName);
1002         Map<String, String> peerAddresses = new HashMap<>();
1003
1004         String currentMemberName = this.cluster.getCurrentMemberName();
1005
1006         for(String memberName : members) {
1007             if(!currentMemberName.equals(memberName)) {
1008                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1009                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1010                 peerAddresses.put(shardId.toString(), address);
1011             }
1012         }
1013         return peerAddresses;
1014     }
1015
1016     @Override
1017     public SupervisorStrategy supervisorStrategy() {
1018
1019         return new OneForOneStrategy(10, Duration.create("1 minute"),
1020                 new Function<Throwable, SupervisorStrategy.Directive>() {
1021             @Override
1022             public SupervisorStrategy.Directive apply(Throwable t) {
1023                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1024                 return SupervisorStrategy.resume();
1025             }
1026         }
1027                 );
1028
1029     }
1030
1031     @Override
1032     public String persistenceId() {
1033         return persistenceId;
1034     }
1035
1036     @VisibleForTesting
1037     ShardManagerInfoMBean getMBean(){
1038         return mBean;
1039     }
1040
1041     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1042         if (shardReplicaOperationsInProgress.contains(shardName)) {
1043             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1044             LOG.debug ("{}: {}", persistenceId(), msg);
1045             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1046             return true;
1047         }
1048
1049         return false;
1050     }
1051
1052     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
1053         final String shardName = shardReplicaMsg.getShardName();
1054
1055         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1056
1057         // verify the shard with the specified name is present in the cluster configuration
1058         if (!(this.configuration.isShardConfigured(shardName))) {
1059             String msg = String.format("No module configuration exists for shard %s", shardName);
1060             LOG.debug ("{}: {}", persistenceId(), msg);
1061             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1062             return;
1063         }
1064
1065         // Create the localShard
1066         if (schemaContext == null) {
1067             String msg = String.format(
1068                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1069             LOG.debug ("{}: {}", persistenceId(), msg);
1070             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1071             return;
1072         }
1073
1074         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1075             @Override
1076             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1077                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1078             }
1079
1080             @Override
1081             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1082                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1083             }
1084
1085         });
1086     }
1087
1088     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1089         String msg = String.format("Local shard %s already exists", shardName);
1090         LOG.debug ("{}: {}", persistenceId(), msg);
1091         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
1092     }
1093
1094     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1095         if(isShardReplicaOperationInProgress(shardName, sender)) {
1096             return;
1097         }
1098
1099         shardReplicaOperationsInProgress.add(shardName);
1100
1101         final ShardInformation shardInfo;
1102         final boolean removeShardOnFailure;
1103         ShardInformation existingShardInfo = localShards.get(shardName);
1104         if(existingShardInfo == null) {
1105             removeShardOnFailure = true;
1106             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1107
1108             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1109                     DisableElectionsRaftPolicy.class.getName()).build();
1110
1111             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1112                     Shard.builder(), peerAddressResolver);
1113             shardInfo.setActiveMember(false);
1114             localShards.put(shardName, shardInfo);
1115             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1116         } else {
1117             removeShardOnFailure = false;
1118             shardInfo = existingShardInfo;
1119         }
1120
1121         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1122
1123         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1124         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1125                 response.getPrimaryPath(), shardInfo.getShardId());
1126
1127         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1128                 duration());
1129         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1130             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1131
1132         futureObj.onComplete(new OnComplete<Object>() {
1133             @Override
1134             public void onComplete(Throwable failure, Object addServerResponse) {
1135                 if (failure != null) {
1136                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1137                             response.getPrimaryPath(), shardName, failure);
1138
1139                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1140                             response.getPrimaryPath(), shardName);
1141                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1142                 } else {
1143                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1144                             response.getPrimaryPath(), removeShardOnFailure), sender);
1145                 }
1146             }
1147         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1148     }
1149
1150     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1151             boolean removeShardOnFailure) {
1152         shardReplicaOperationsInProgress.remove(shardName);
1153
1154         if(removeShardOnFailure) {
1155             ShardInformation shardInfo = localShards.remove(shardName);
1156             if (shardInfo.getActor() != null) {
1157                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1158             }
1159         }
1160
1161         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
1162             new RuntimeException(message, failure)), getSelf());
1163     }
1164
1165     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1166             String leaderPath, boolean removeShardOnFailure) {
1167         String shardName = shardInfo.getShardName();
1168         shardReplicaOperationsInProgress.remove(shardName);
1169
1170         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1171
1172         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1173             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1174
1175             // Make the local shard voting capable
1176             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1177             shardInfo.setActiveMember(true);
1178             persistShardList();
1179
1180             mBean.addLocalShard(shardInfo.getShardId().toString());
1181             sender.tell(new akka.actor.Status.Success(null), getSelf());
1182         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1183             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1184         } else {
1185             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1186                     persistenceId(), shardName, replyMsg.getStatus());
1187
1188             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1189
1190             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1191         }
1192     }
1193
1194     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1195                                                String leaderPath, ShardIdentifier shardId) {
1196         Exception failure;
1197         switch (serverChangeStatus) {
1198             case TIMEOUT:
1199                 failure = new TimeoutException(String.format(
1200                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1201                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1202                         leaderPath, shardId.getShardName()));
1203                 break;
1204             case NO_LEADER:
1205                 failure = createNoShardLeaderException(shardId);
1206                 break;
1207             case NOT_SUPPORTED:
1208                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1209                         serverChange.getSimpleName(), shardId.getShardName()));
1210                 break;
1211             default :
1212                 failure = new RuntimeException(String.format(
1213                         "%s request to leader %s for shard %s failed with status %s",
1214                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1215         }
1216         return failure;
1217     }
1218
1219     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1220         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1221
1222         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1223                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1224             @Override
1225             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1226                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1227             }
1228
1229             @Override
1230             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1231                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1232             }
1233         });
1234     }
1235
1236     private void persistShardList() {
1237         List<String> shardList = new ArrayList<>(localShards.keySet());
1238         for (ShardInformation shardInfo : localShards.values()) {
1239             if (!shardInfo.isActiveMember()) {
1240                 shardList.remove(shardInfo.getShardName());
1241             }
1242         }
1243         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1244         saveSnapshot(updateShardManagerSnapshot(shardList));
1245     }
1246
1247     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1248         currentSnapshot = new ShardManagerSnapshot(shardList);
1249         return currentSnapshot;
1250     }
1251
1252     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1253         currentSnapshot = snapshot;
1254
1255         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1256
1257         String currentMember = cluster.getCurrentMemberName();
1258         Set<String> configuredShardList =
1259             new HashSet<>(configuration.getMemberShardNames(currentMember));
1260         for (String shard : currentSnapshot.getShardList()) {
1261             if (!configuredShardList.contains(shard)) {
1262                 // add the current member as a replica for the shard
1263                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1264                 configuration.addMemberReplicaForShard(shard, currentMember);
1265             } else {
1266                 configuredShardList.remove(shard);
1267             }
1268         }
1269         for (String shard : configuredShardList) {
1270             // remove the member as a replica for the shard
1271             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1272             configuration.removeMemberReplicaForShard(shard, currentMember);
1273         }
1274     }
1275
1276     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1277         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1278             persistenceId());
1279         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1280             0, 0));
1281     }
1282
1283     private static class ForwardedAddServerReply {
1284         ShardInformation shardInfo;
1285         AddServerReply addServerReply;
1286         String leaderPath;
1287         boolean removeShardOnFailure;
1288
1289         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1290                 boolean removeShardOnFailure) {
1291             this.shardInfo = shardInfo;
1292             this.addServerReply = addServerReply;
1293             this.leaderPath = leaderPath;
1294             this.removeShardOnFailure = removeShardOnFailure;
1295         }
1296     }
1297
1298     private static class ForwardedAddServerFailure {
1299         String shardName;
1300         String failureMessage;
1301         Throwable failure;
1302         boolean removeShardOnFailure;
1303
1304         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1305                 boolean removeShardOnFailure) {
1306             this.shardName = shardName;
1307             this.failureMessage = failureMessage;
1308             this.failure = failure;
1309             this.removeShardOnFailure = removeShardOnFailure;
1310         }
1311     }
1312
1313     @VisibleForTesting
1314     protected static class ShardInformation {
1315         private final ShardIdentifier shardId;
1316         private final String shardName;
1317         private ActorRef actor;
1318         private final Map<String, String> initialPeerAddresses;
1319         private Optional<DataTree> localShardDataTree;
1320         private boolean leaderAvailable = false;
1321
1322         // flag that determines if the actor is ready for business
1323         private boolean actorInitialized = false;
1324
1325         private boolean followerSyncStatus = false;
1326
1327         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1328         private String role ;
1329         private String leaderId;
1330         private short leaderVersion;
1331
1332         private DatastoreContext datastoreContext;
1333         private Shard.AbstractBuilder<?, ?> builder;
1334         private final ShardPeerAddressResolver addressResolver;
1335         private boolean isActiveMember = true;
1336
1337         private ShardInformation(String shardName, ShardIdentifier shardId,
1338                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1339                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1340             this.shardName = shardName;
1341             this.shardId = shardId;
1342             this.initialPeerAddresses = initialPeerAddresses;
1343             this.datastoreContext = datastoreContext;
1344             this.builder = builder;
1345             this.addressResolver = addressResolver;
1346         }
1347
1348         Props newProps(SchemaContext schemaContext) {
1349             Preconditions.checkNotNull(builder);
1350             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1351                     schemaContext(schemaContext).props();
1352             builder = null;
1353             return props;
1354         }
1355
1356         String getShardName() {
1357             return shardName;
1358         }
1359
1360         @Nullable
1361         ActorRef getActor(){
1362             return actor;
1363         }
1364
1365         void setActor(ActorRef actor) {
1366             this.actor = actor;
1367         }
1368
1369         ShardIdentifier getShardId() {
1370             return shardId;
1371         }
1372
1373         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1374             this.localShardDataTree = localShardDataTree;
1375         }
1376
1377         Optional<DataTree> getLocalShardDataTree() {
1378             return localShardDataTree;
1379         }
1380
1381         DatastoreContext getDatastoreContext() {
1382             return datastoreContext;
1383         }
1384
1385         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1386             this.datastoreContext = datastoreContext;
1387             if (actor != null) {
1388                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1389                 actor.tell(this.datastoreContext, sender);
1390             }
1391         }
1392
1393         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1394             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1395
1396             if(actor != null) {
1397                 if(LOG.isDebugEnabled()) {
1398                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1399                             peerId, peerAddress, actor.path());
1400                 }
1401
1402                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1403             }
1404
1405             notifyOnShardInitializedCallbacks();
1406         }
1407
1408         void peerDown(String memberName, String peerId, ActorRef sender) {
1409             if(actor != null) {
1410                 actor.tell(new PeerDown(memberName, peerId), sender);
1411             }
1412         }
1413
1414         void peerUp(String memberName, String peerId, ActorRef sender) {
1415             if(actor != null) {
1416                 actor.tell(new PeerUp(memberName, peerId), sender);
1417             }
1418         }
1419
1420         boolean isShardReady() {
1421             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1422         }
1423
1424         boolean isShardReadyWithLeaderId() {
1425             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1426                     (isLeader() || addressResolver.resolve(leaderId) != null);
1427         }
1428
1429         boolean isShardInitialized() {
1430             return getActor() != null && actorInitialized;
1431         }
1432
1433         boolean isLeader() {
1434             return Objects.equal(leaderId, shardId.toString());
1435         }
1436
1437         String getSerializedLeaderActor() {
1438             if(isLeader()) {
1439                 return Serialization.serializedActorPath(getActor());
1440             } else {
1441                 return addressResolver.resolve(leaderId);
1442             }
1443         }
1444
1445         void setActorInitialized() {
1446             LOG.debug("Shard {} is initialized", shardId);
1447
1448             this.actorInitialized = true;
1449
1450             notifyOnShardInitializedCallbacks();
1451         }
1452
1453         private void notifyOnShardInitializedCallbacks() {
1454             if(onShardInitializedSet.isEmpty()) {
1455                 return;
1456             }
1457
1458             boolean ready = isShardReadyWithLeaderId();
1459
1460             if(LOG.isDebugEnabled()) {
1461                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1462                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1463             }
1464
1465             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1466             while(iter.hasNext()) {
1467                 OnShardInitialized onShardInitialized = iter.next();
1468                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1469                     iter.remove();
1470                     onShardInitialized.getTimeoutSchedule().cancel();
1471                     onShardInitialized.getReplyRunnable().run();
1472                 }
1473             }
1474         }
1475
1476         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1477             onShardInitializedSet.add(onShardInitialized);
1478         }
1479
1480         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1481             onShardInitializedSet.remove(onShardInitialized);
1482         }
1483
1484         void setRole(String newRole) {
1485             this.role = newRole;
1486
1487             notifyOnShardInitializedCallbacks();
1488         }
1489
1490         void setFollowerSyncStatus(boolean syncStatus){
1491             this.followerSyncStatus = syncStatus;
1492         }
1493
1494         boolean isInSync(){
1495             if(RaftState.Follower.name().equals(this.role)){
1496                 return followerSyncStatus;
1497             } else if(RaftState.Leader.name().equals(this.role)){
1498                 return true;
1499             }
1500
1501             return false;
1502         }
1503
1504         boolean setLeaderId(String leaderId) {
1505             boolean changed = !Objects.equal(this.leaderId, leaderId);
1506             this.leaderId = leaderId;
1507             if(leaderId != null) {
1508                 this.leaderAvailable = true;
1509             }
1510             notifyOnShardInitializedCallbacks();
1511
1512             return changed;
1513         }
1514
1515         String getLeaderId() {
1516             return leaderId;
1517         }
1518
1519         void setLeaderAvailable(boolean leaderAvailable) {
1520             this.leaderAvailable = leaderAvailable;
1521
1522             if(leaderAvailable) {
1523                 notifyOnShardInitializedCallbacks();
1524             }
1525         }
1526
1527         short getLeaderVersion() {
1528             return leaderVersion;
1529         }
1530
1531         void setLeaderVersion(short leaderVersion) {
1532             this.leaderVersion = leaderVersion;
1533         }
1534
1535         boolean isActiveMember() {
1536             return isActiveMember;
1537         }
1538
1539         void setActiveMember(boolean isActiveMember) {
1540             this.isActiveMember = isActiveMember;
1541         }
1542     }
1543
1544     private static class OnShardInitialized {
1545         private final Runnable replyRunnable;
1546         private Cancellable timeoutSchedule;
1547
1548         OnShardInitialized(Runnable replyRunnable) {
1549             this.replyRunnable = replyRunnable;
1550         }
1551
1552         Runnable getReplyRunnable() {
1553             return replyRunnable;
1554         }
1555
1556         Cancellable getTimeoutSchedule() {
1557             return timeoutSchedule;
1558         }
1559
1560         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1561             this.timeoutSchedule = timeoutSchedule;
1562         }
1563     }
1564
1565     private static class OnShardReady extends OnShardInitialized {
1566         OnShardReady(Runnable replyRunnable) {
1567             super(replyRunnable);
1568         }
1569     }
1570
1571     private static class ShardNotInitializedTimeout {
1572         private final ActorRef sender;
1573         private final ShardInformation shardInfo;
1574         private final OnShardInitialized onShardInitialized;
1575
1576         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1577             this.sender = sender;
1578             this.shardInfo = shardInfo;
1579             this.onShardInitialized = onShardInitialized;
1580         }
1581
1582         ActorRef getSender() {
1583             return sender;
1584         }
1585
1586         ShardInformation getShardInfo() {
1587             return shardInfo;
1588         }
1589
1590         OnShardInitialized getOnShardInitialized() {
1591             return onShardInitialized;
1592         }
1593     }
1594
1595     public static Builder builder() {
1596         return new Builder();
1597     }
1598
1599     public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
1600         private ClusterWrapper cluster;
1601         private Configuration configuration;
1602         private DatastoreContextFactory datastoreContextFactory;
1603         private CountDownLatch waitTillReadyCountdownLatch;
1604         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1605         private DatastoreSnapshot restoreFromSnapshot;
1606         private volatile boolean sealed;
1607
1608         @SuppressWarnings("unchecked")
1609         private T self() {
1610             return (T) this;
1611         }
1612
1613         protected void checkSealed() {
1614             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1615         }
1616
1617         public T cluster(ClusterWrapper cluster) {
1618             checkSealed();
1619             this.cluster = cluster;
1620             return self();
1621         }
1622
1623         public T configuration(Configuration configuration) {
1624             checkSealed();
1625             this.configuration = configuration;
1626             return self();
1627         }
1628
1629         public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1630             checkSealed();
1631             this.datastoreContextFactory = datastoreContextFactory;
1632             return self();
1633         }
1634
1635         public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1636             checkSealed();
1637             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1638             return self();
1639         }
1640
1641         public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1642             checkSealed();
1643             this.primaryShardInfoCache = primaryShardInfoCache;
1644             return self();
1645         }
1646
1647         public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1648             checkSealed();
1649             this.restoreFromSnapshot = restoreFromSnapshot;
1650             return self();
1651         }
1652
1653         protected void verify() {
1654             sealed = true;
1655             Preconditions.checkNotNull(cluster, "cluster should not be null");
1656             Preconditions.checkNotNull(configuration, "configuration should not be null");
1657             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1658             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1659             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1660         }
1661
1662         public Props props() {
1663             verify();
1664             return Props.create(ShardManager.class, this);
1665         }
1666     }
1667
1668     public static class Builder extends AbstractBuilder<Builder> {
1669     }
1670
1671     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1672         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1673                 getShardInitializationTimeout().duration().$times(2));
1674
1675
1676         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1677         futureObj.onComplete(new OnComplete<Object>() {
1678             @Override
1679             public void onComplete(Throwable failure, Object response) {
1680                 if (failure != null) {
1681                     handler.onFailure(failure);
1682                 } else {
1683                     if(response instanceof RemotePrimaryShardFound) {
1684                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1685                     } else if(response instanceof LocalPrimaryShardFound) {
1686                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1687                     } else {
1688                         handler.onUnknownResponse(response);
1689                     }
1690                 }
1691             }
1692         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1693     }
1694
1695     /**
1696      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1697      * a remote or local find primary message is processed
1698      */
1699     private static interface FindPrimaryResponseHandler {
1700         /**
1701          * Invoked when a Failure message is received as a response
1702          *
1703          * @param failure
1704          */
1705         void onFailure(Throwable failure);
1706
1707         /**
1708          * Invoked when a RemotePrimaryShardFound response is received
1709          *
1710          * @param response
1711          */
1712         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1713
1714         /**
1715          * Invoked when a LocalPrimaryShardFound response is received
1716          * @param response
1717          */
1718         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1719
1720         /**
1721          * Invoked when an unknown response is received. This is another type of failure.
1722          *
1723          * @param response
1724          */
1725         void onUnknownResponse(Object response);
1726     }
1727
1728     /**
1729      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1730      * replica and sends a wrapped Failure response to some targetActor
1731      */
1732     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1733         private final ActorRef targetActor;
1734         private final String shardName;
1735         private final String persistenceId;
1736         private final ActorRef shardManagerActor;
1737
1738         /**
1739          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1740          * @param shardName The name of the shard for which the primary replica had to be found
1741          * @param persistenceId The persistenceId for the ShardManager
1742          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1743          */
1744         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1745             this.targetActor = Preconditions.checkNotNull(targetActor);
1746             this.shardName = Preconditions.checkNotNull(shardName);
1747             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1748             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1749         }
1750
1751         public ActorRef getTargetActor() {
1752             return targetActor;
1753         }
1754
1755         public String getShardName() {
1756             return shardName;
1757         }
1758
1759         @Override
1760         public void onFailure(Throwable failure) {
1761             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1762             targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1763                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1764         }
1765
1766         @Override
1767         public void onUnknownResponse(Object response) {
1768             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1769                     shardName, response);
1770             LOG.debug ("{}: {}", persistenceId, msg);
1771             targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1772                     new RuntimeException(msg)), shardManagerActor);
1773         }
1774     }
1775
1776
1777     /**
1778      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1779      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1780      * as a successful response to find primary.
1781      */
1782     private static class PrimaryShardFoundForContext {
1783         private final String shardName;
1784         private final Object contextMessage;
1785         private final RemotePrimaryShardFound remotePrimaryShardFound;
1786         private final LocalPrimaryShardFound localPrimaryShardFound;
1787
1788         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
1789                 @Nonnull Object primaryFoundMessage) {
1790             this.shardName = Preconditions.checkNotNull(shardName);
1791             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1792             Preconditions.checkNotNull(primaryFoundMessage);
1793             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
1794                     (RemotePrimaryShardFound) primaryFoundMessage : null;
1795             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
1796                     (LocalPrimaryShardFound) primaryFoundMessage : null;
1797         }
1798
1799         @Nonnull
1800         String getPrimaryPath(){
1801             if(remotePrimaryShardFound != null) {
1802                 return remotePrimaryShardFound.getPrimaryPath();
1803             }
1804             return localPrimaryShardFound.getPrimaryPath();
1805         }
1806
1807         @Nonnull
1808         Object getContextMessage() {
1809             return contextMessage;
1810         }
1811
1812         @Nullable
1813         RemotePrimaryShardFound getRemotePrimaryShardFound() {
1814             return remotePrimaryShardFound;
1815         }
1816
1817         @Nonnull
1818         String getShardName() {
1819             return shardName;
1820         }
1821     }
1822
1823     /**
1824      * The WrappedShardResponse class wraps a response from a Shard.
1825      */
1826     private static class WrappedShardResponse {
1827         private final ShardIdentifier shardId;
1828         private final Object response;
1829         private final String leaderPath;
1830
1831         private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1832             this.shardId = shardId;
1833             this.response = response;
1834             this.leaderPath = leaderPath;
1835         }
1836
1837         ShardIdentifier getShardId() {
1838             return shardId;
1839         }
1840
1841         Object getResponse() {
1842             return response;
1843         }
1844
1845         String getLeaderPath() {
1846             return leaderPath;
1847         }
1848     }
1849 }
1850
1851
1852