Move ShardManagerSnapshot to new package
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.Futures;
22 import akka.dispatch.OnComplete;
23 import akka.japi.Function;
24 import akka.pattern.Patterns;
25 import akka.persistence.RecoveryCompleted;
26 import akka.persistence.SaveSnapshotFailure;
27 import akka.persistence.SaveSnapshotSuccess;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.serialization.Serialization;
31 import akka.util.Timeout;
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.base.Objects;
34 import com.google.common.base.Optional;
35 import com.google.common.base.Preconditions;
36 import com.google.common.base.Strings;
37 import com.google.common.base.Supplier;
38 import com.google.common.collect.Sets;
39 import java.io.ByteArrayInputStream;
40 import java.io.ObjectInputStream;
41 import java.util.ArrayList;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.HashMap;
45 import java.util.HashSet;
46 import java.util.Iterator;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.Set;
50 import java.util.concurrent.CountDownLatch;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import javax.annotation.Nonnull;
54 import javax.annotation.Nullable;
55 import org.apache.commons.lang3.SerializationUtils;
56 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
57 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
58 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
59 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
60 import org.opendaylight.controller.cluster.datastore.Shard;
61 import org.opendaylight.controller.cluster.datastore.config.Configuration;
62 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
63 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
64 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
65 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
66 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
67 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
68 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
69 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
70 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
71 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
72 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
73 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
74 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
77 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
78 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
79 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
80 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
81 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
82 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
83 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
84 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
85 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
86 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
87 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
88 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
89 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
90 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
91 import org.opendaylight.controller.cluster.raft.RaftState;
92 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
93 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
94 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
95 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
96 import org.opendaylight.controller.cluster.raft.messages.AddServer;
97 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
98 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
99 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
100 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
101 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
102 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
103 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
104 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
105 import org.slf4j.Logger;
106 import org.slf4j.LoggerFactory;
107 import scala.concurrent.ExecutionContext;
108 import scala.concurrent.Future;
109 import scala.concurrent.duration.Duration;
110 import scala.concurrent.duration.FiniteDuration;
111
112 /**
113  * The ShardManager has the following jobs,
114  * <ul>
115  * <li> Create all the local shard replicas that belong on this cluster member
116  * <li> Find the address of the local shard
117  * <li> Find the primary replica for any given shard
118  * <li> Monitor the cluster members and store their addresses
119  * <ul>
120  */
121 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
122
123     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
124
125     // Stores a mapping between a shard name and it's corresponding information
126     // Shard names look like inventory, topology etc and are as specified in
127     // configuration
128     private final Map<String, ShardInformation> localShards = new HashMap<>();
129
130     // The type of a ShardManager reflects the type of the datastore itself
131     // A data store could be of type config/operational
132     private final String type;
133
134     private final ClusterWrapper cluster;
135
136     private final Configuration configuration;
137
138     private final String shardDispatcherPath;
139
140     private final ShardManagerInfo mBean;
141
142     private DatastoreContextFactory datastoreContextFactory;
143
144     private final CountDownLatch waitTillReadyCountdownLatch;
145
146     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
147
148     private final ShardPeerAddressResolver peerAddressResolver;
149
150     private SchemaContext schemaContext;
151
152     private DatastoreSnapshot restoreFromSnapshot;
153
154     private ShardManagerSnapshot currentSnapshot;
155
156     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
157
158     private final String persistenceId;
159
160     /**
161      */
162     protected ShardManager(AbstractBuilder<?> builder) {
163
164         this.cluster = builder.cluster;
165         this.configuration = builder.configuration;
166         this.datastoreContextFactory = builder.datastoreContextFactory;
167         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
168         this.shardDispatcherPath =
169                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
170         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
171         this.primaryShardInfoCache = builder.primaryShardInfoCache;
172         this.restoreFromSnapshot = builder.restoreFromSnapshot;
173
174         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
175         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
176
177         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
178
179         // Subscribe this actor to cluster member events
180         cluster.subscribeToMemberEvents(getSelf());
181
182         List<String> localShardActorNames = new ArrayList<>();
183         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
184                 "shard-manager-" + this.type,
185                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
186                 localShardActorNames);
187         mBean.setShardManager(this);
188     }
189
190     @Override
191     public void postStop() {
192         LOG.info("Stopping ShardManager {}", persistenceId());
193
194         mBean.unregisterMBean();
195     }
196
197     @Override
198     public void handleCommand(Object message) throws Exception {
199         if (message  instanceof FindPrimary) {
200             findPrimary((FindPrimary)message);
201         } else if(message instanceof FindLocalShard){
202             findLocalShard((FindLocalShard) message);
203         } else if (message instanceof UpdateSchemaContext) {
204             updateSchemaContext(message);
205         } else if(message instanceof ActorInitialized) {
206             onActorInitialized(message);
207         } else if (message instanceof ClusterEvent.MemberUp){
208             memberUp((ClusterEvent.MemberUp) message);
209         } else if (message instanceof ClusterEvent.MemberExited){
210             memberExited((ClusterEvent.MemberExited) message);
211         } else if(message instanceof ClusterEvent.MemberRemoved) {
212             memberRemoved((ClusterEvent.MemberRemoved) message);
213         } else if(message instanceof ClusterEvent.UnreachableMember) {
214             memberUnreachable((ClusterEvent.UnreachableMember)message);
215         } else if(message instanceof ClusterEvent.ReachableMember) {
216             memberReachable((ClusterEvent.ReachableMember) message);
217         } else if(message instanceof DatastoreContextFactory) {
218             onDatastoreContextFactory((DatastoreContextFactory)message);
219         } else if(message instanceof RoleChangeNotification) {
220             onRoleChangeNotification((RoleChangeNotification) message);
221         } else if(message instanceof FollowerInitialSyncUpStatus){
222             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
223         } else if(message instanceof ShardNotInitializedTimeout) {
224             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
225         } else if(message instanceof ShardLeaderStateChanged) {
226             onLeaderStateChanged((ShardLeaderStateChanged) message);
227         } else if(message instanceof SwitchShardBehavior){
228             onSwitchShardBehavior((SwitchShardBehavior) message);
229         } else if(message instanceof CreateShard) {
230             onCreateShard((CreateShard)message);
231         } else if(message instanceof AddShardReplica){
232             onAddShardReplica((AddShardReplica)message);
233         } else if(message instanceof ForwardedAddServerReply) {
234             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
235             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
236                     msg.removeShardOnFailure);
237         } else if(message instanceof ForwardedAddServerFailure) {
238             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
239             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
240         } else if(message instanceof PrimaryShardFoundForContext) {
241             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
242             onPrimaryShardFoundContext(primaryShardFoundContext);
243         } else if(message instanceof RemoveShardReplica) {
244             onRemoveShardReplica((RemoveShardReplica) message);
245         } else if(message instanceof WrappedShardResponse){
246             onWrappedShardResponse((WrappedShardResponse) message);
247         } else if(message instanceof GetSnapshot) {
248             onGetSnapshot();
249         } else if(message instanceof ServerRemoved){
250             onShardReplicaRemoved((ServerRemoved) message);
251         } else if(message instanceof SaveSnapshotSuccess) {
252             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
253         } else if(message instanceof SaveSnapshotFailure) {
254             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
255                     persistenceId(), ((SaveSnapshotFailure) message).cause());
256         } else if(message instanceof Shutdown) {
257             onShutDown();
258         } else {
259             unknownMessage(message);
260         }
261     }
262
263     private void onShutDown() {
264         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
265         for (ShardInformation info : localShards.values()) {
266             if (info.getActor() != null) {
267                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
268
269                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
270                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
271             }
272         }
273
274         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
275
276         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
277         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
278
279         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
280             @Override
281             public void onComplete(Throwable failure, Iterable<Boolean> results) {
282                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
283
284                 self().tell(PoisonPill.getInstance(), self());
285
286                 if(failure != null) {
287                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
288                 } else {
289                     int nfailed = 0;
290                     for(Boolean r: results) {
291                         if(!r) {
292                             nfailed++;
293                         }
294                     }
295
296                     if(nfailed > 0) {
297                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
298                     }
299                 }
300             }
301         }, dispatcher);
302     }
303
304     private void onWrappedShardResponse(WrappedShardResponse message) {
305         if (message.getResponse() instanceof RemoveServerReply) {
306             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
307                     message.getLeaderPath());
308         }
309     }
310
311     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
312             String leaderPath) {
313         shardReplicaOperationsInProgress.remove(shardId);
314
315         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
316
317         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
318             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
319                     shardId.getShardName());
320             originalSender.tell(new akka.actor.Status.Success(null), getSelf());
321         } else {
322             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
323                     persistenceId(), shardId, replyMsg.getStatus());
324
325             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
326                     leaderPath, shardId);
327             originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
328         }
329     }
330
331     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
332         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
333             addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
334                     getSender());
335         } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
336             removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
337                     primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
338         }
339     }
340
341     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
342             final ActorRef sender) {
343         if(isShardReplicaOperationInProgress(shardName, sender)) {
344             return;
345         }
346
347         shardReplicaOperationsInProgress.add(shardName);
348
349         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
350
351         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
352
353         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
354         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
355                 primaryPath, shardId);
356
357         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
358                 duration());
359         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
360                 new RemoveServer(shardId.toString()), removeServerTimeout);
361
362         futureObj.onComplete(new OnComplete<Object>() {
363             @Override
364             public void onComplete(Throwable failure, Object response) {
365                 if (failure != null) {
366                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
367                             primaryPath, shardName);
368
369                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
370
371                     // FAILURE
372                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
373                 } else {
374                     // SUCCESS
375                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
376                 }
377             }
378         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
379     }
380
381     private void onShardReplicaRemoved(ServerRemoved message) {
382         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
383         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
384         if(shardInformation == null) {
385             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
386             return;
387         } else if(shardInformation.getActor() != null) {
388             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
389             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
390         }
391         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
392         persistShardList();
393     }
394
395     private void onGetSnapshot() {
396         LOG.debug("{}: onGetSnapshot", persistenceId());
397
398         List<String> notInitialized = null;
399         for(ShardInformation shardInfo: localShards.values()) {
400             if(!shardInfo.isShardInitialized()) {
401                 if(notInitialized == null) {
402                     notInitialized = new ArrayList<>();
403                 }
404
405                 notInitialized.add(shardInfo.getShardName());
406             }
407         }
408
409         if(notInitialized != null) {
410             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
411                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
412             return;
413         }
414
415         byte[] shardManagerSnapshot = null;
416         if(currentSnapshot != null) {
417             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
418         }
419
420         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
421                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
422                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
423
424         for(ShardInformation shardInfo: localShards.values()) {
425             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
426         }
427     }
428
429     private void onCreateShard(CreateShard createShard) {
430         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
431
432         Object reply;
433         try {
434             String shardName = createShard.getModuleShardConfig().getShardName();
435             if(localShards.containsKey(shardName)) {
436                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
437                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
438             } else {
439                 doCreateShard(createShard);
440                 reply = new akka.actor.Status.Success(null);
441             }
442         } catch (Exception e) {
443             LOG.error("{}: onCreateShard failed", persistenceId(), e);
444             reply = new akka.actor.Status.Failure(e);
445         }
446
447         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
448             getSender().tell(reply, getSelf());
449         }
450     }
451
452     private void doCreateShard(CreateShard createShard) {
453         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
454         String shardName = moduleShardConfig.getShardName();
455
456         configuration.addModuleShardConfiguration(moduleShardConfig);
457
458         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
459         if(shardDatastoreContext == null) {
460             shardDatastoreContext = newShardDatastoreContext(shardName);
461         } else {
462             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
463                     peerAddressResolver).build();
464         }
465
466         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
467
468         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
469                 currentSnapshot.getShardList().contains(shardName);
470
471         Map<String, String> peerAddresses;
472         boolean isActiveMember;
473         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
474                 contains(cluster.getCurrentMemberName())) {
475             peerAddresses = getPeerAddresses(shardName);
476             isActiveMember = true;
477         } else {
478             // The local member is not in the static shard member configuration and the shard did not
479             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
480             // the shard with no peers and with elections disabled so it stays as follower. A
481             // subsequent AddServer request will be needed to make it an active member.
482             isActiveMember = false;
483             peerAddresses = Collections.emptyMap();
484             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
485                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
486         }
487
488         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
489                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
490                 isActiveMember);
491
492         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
493                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
494         info.setActiveMember(isActiveMember);
495         localShards.put(info.getShardName(), info);
496
497         mBean.addLocalShard(shardId.toString());
498
499         if(schemaContext != null) {
500             info.setActor(newShardActor(schemaContext, info));
501         }
502     }
503
504     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
505         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
506                 shardPeerAddressResolver(peerAddressResolver);
507     }
508
509     private DatastoreContext newShardDatastoreContext(String shardName) {
510         return newShardDatastoreContextBuilder(shardName).build();
511     }
512
513     private void checkReady(){
514         if (isReadyWithLeaderId()) {
515             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
516                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
517
518             waitTillReadyCountdownLatch.countDown();
519         }
520     }
521
522     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
523         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
524
525         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
526         if(shardInformation != null) {
527             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
528             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
529             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
530                 primaryShardInfoCache.remove(shardInformation.getShardName());
531             }
532
533             checkReady();
534         } else {
535             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
536         }
537     }
538
539     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
540         ShardInformation shardInfo = message.getShardInfo();
541
542         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
543                 shardInfo.getShardName());
544
545         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
546
547         if(!shardInfo.isShardInitialized()) {
548             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
549             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
550         } else {
551             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
552             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
553         }
554     }
555
556     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
557         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
558                 status.getName(), status.isInitialSyncDone());
559
560         ShardInformation shardInformation = findShardInformation(status.getName());
561
562         if(shardInformation != null) {
563             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
564
565             mBean.setSyncStatus(isInSync());
566         }
567
568     }
569
570     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
571         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
572                 roleChanged.getOldRole(), roleChanged.getNewRole());
573
574         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
575         if(shardInformation != null) {
576             shardInformation.setRole(roleChanged.getNewRole());
577             checkReady();
578             mBean.setSyncStatus(isInSync());
579         }
580     }
581
582
583     private ShardInformation findShardInformation(String memberId) {
584         for(ShardInformation info : localShards.values()){
585             if(info.getShardId().toString().equals(memberId)){
586                 return info;
587             }
588         }
589
590         return null;
591     }
592
593     private boolean isReadyWithLeaderId() {
594         boolean isReady = true;
595         for (ShardInformation info : localShards.values()) {
596             if(!info.isShardReadyWithLeaderId()){
597                 isReady = false;
598                 break;
599             }
600         }
601         return isReady;
602     }
603
604     private boolean isInSync(){
605         for (ShardInformation info : localShards.values()) {
606             if(!info.isInSync()){
607                 return false;
608             }
609         }
610         return true;
611     }
612
613     private void onActorInitialized(Object message) {
614         final ActorRef sender = getSender();
615
616         if (sender == null) {
617             return; //why is a non-actor sending this message? Just ignore.
618         }
619
620         String actorName = sender.path().name();
621         //find shard name from actor name; actor name is stringified shardId
622         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
623
624         if (shardId.getShardName() == null) {
625             return;
626         }
627
628         markShardAsInitialized(shardId.getShardName());
629     }
630
631     private void markShardAsInitialized(String shardName) {
632         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
633
634         ShardInformation shardInformation = localShards.get(shardName);
635         if (shardInformation != null) {
636             shardInformation.setActorInitialized();
637
638             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
639         }
640     }
641
642     @Override
643     protected void handleRecover(Object message) throws Exception {
644         if (message instanceof RecoveryCompleted) {
645             onRecoveryCompleted();
646         } else if (message instanceof SnapshotOffer) {
647             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
648         }
649     }
650
651     private void onRecoveryCompleted() {
652         LOG.info("Recovery complete : {}", persistenceId());
653
654         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
655         // journal on upgrade from Helium.
656         deleteMessages(lastSequenceNr());
657
658         if(currentSnapshot == null && restoreFromSnapshot != null &&
659                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
660             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
661                     restoreFromSnapshot.getShardManagerSnapshot()))) {
662                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
663
664                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
665
666                 applyShardManagerSnapshot(snapshot);
667             } catch(Exception e) {
668                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
669             }
670         }
671
672         createLocalShards();
673     }
674
675     private void findLocalShard(FindLocalShard message) {
676         final ShardInformation shardInformation = localShards.get(message.getShardName());
677
678         if(shardInformation == null){
679             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
680             return;
681         }
682
683         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
684             @Override
685             public Object get() {
686                 return new LocalShardFound(shardInformation.getActor());
687             }
688         });
689     }
690
691     private void sendResponse(ShardInformation shardInformation, boolean doWait,
692             boolean wantShardReady, final Supplier<Object> messageSupplier) {
693         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
694             if(doWait) {
695                 final ActorRef sender = getSender();
696                 final ActorRef self = self();
697
698                 Runnable replyRunnable = new Runnable() {
699                     @Override
700                     public void run() {
701                         sender.tell(messageSupplier.get(), self);
702                     }
703                 };
704
705                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
706                     new OnShardInitialized(replyRunnable);
707
708                 shardInformation.addOnShardInitialized(onShardInitialized);
709
710                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
711                 if(shardInformation.isShardInitialized()) {
712                     // If the shard is already initialized then we'll wait enough time for the shard to
713                     // elect a leader, ie 2 times the election timeout.
714                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
715                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
716                 }
717
718                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
719                         shardInformation.getShardName());
720
721                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
722                         timeout, getSelf(),
723                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
724                         getContext().dispatcher(), getSelf());
725
726                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
727
728             } else if (!shardInformation.isShardInitialized()) {
729                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
730                         shardInformation.getShardName());
731                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
732             } else {
733                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
734                         shardInformation.getShardName());
735                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
736             }
737
738             return;
739         }
740
741         getSender().tell(messageSupplier.get(), getSelf());
742     }
743
744     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
745         return new NoShardLeaderException(null, shardId.toString());
746     }
747
748     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
749         return new NotInitializedException(String.format(
750                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
751     }
752
753     private void memberRemoved(ClusterEvent.MemberRemoved message) {
754         String memberName = message.member().roles().iterator().next();
755
756         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
757                 message.member().address());
758
759         peerAddressResolver.removePeerAddress(memberName);
760
761         for(ShardInformation info : localShards.values()){
762             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
763         }
764     }
765
766     private void memberExited(ClusterEvent.MemberExited message) {
767         String memberName = message.member().roles().iterator().next();
768
769         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
770                 message.member().address());
771
772         peerAddressResolver.removePeerAddress(memberName);
773
774         for(ShardInformation info : localShards.values()){
775             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
776         }
777     }
778
779     private void memberUp(ClusterEvent.MemberUp message) {
780         String memberName = message.member().roles().iterator().next();
781
782         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
783                 message.member().address());
784
785         addPeerAddress(memberName, message.member().address());
786
787         checkReady();
788     }
789
790     private void addPeerAddress(String memberName, Address address) {
791         peerAddressResolver.addPeerAddress(memberName, address);
792
793         for(ShardInformation info : localShards.values()){
794             String shardName = info.getShardName();
795             String peerId = getShardIdentifier(memberName, shardName).toString();
796             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
797
798             info.peerUp(memberName, peerId, getSelf());
799         }
800     }
801
802     private void memberReachable(ClusterEvent.ReachableMember message) {
803         String memberName = message.member().roles().iterator().next();
804         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
805
806         addPeerAddress(memberName, message.member().address());
807
808         markMemberAvailable(memberName);
809     }
810
811     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
812         String memberName = message.member().roles().iterator().next();
813         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
814
815         markMemberUnavailable(memberName);
816     }
817
818     private void markMemberUnavailable(final String memberName) {
819         for(ShardInformation info : localShards.values()){
820             String leaderId = info.getLeaderId();
821             if(leaderId != null && leaderId.contains(memberName)) {
822                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
823                 info.setLeaderAvailable(false);
824
825                 primaryShardInfoCache.remove(info.getShardName());
826             }
827
828             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
829         }
830     }
831
832     private void markMemberAvailable(final String memberName) {
833         for(ShardInformation info : localShards.values()){
834             String leaderId = info.getLeaderId();
835             if(leaderId != null && leaderId.contains(memberName)) {
836                 LOG.debug("Marking Leader {} as available.", leaderId);
837                 info.setLeaderAvailable(true);
838             }
839
840             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
841         }
842     }
843
844     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
845         datastoreContextFactory = factory;
846         for (ShardInformation info : localShards.values()) {
847             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
848         }
849     }
850
851     private void onSwitchShardBehavior(SwitchShardBehavior message) {
852         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
853
854         ShardInformation shardInformation = localShards.get(identifier.getShardName());
855
856         if(shardInformation != null && shardInformation.getActor() != null) {
857             shardInformation.getActor().tell(
858                     new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
859         } else {
860             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
861                     message.getShardName(), message.getNewState());
862         }
863     }
864
865     /**
866      * Notifies all the local shards of a change in the schema context
867      *
868      * @param message
869      */
870     private void updateSchemaContext(final Object message) {
871         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
872
873         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
874
875         for (ShardInformation info : localShards.values()) {
876             if (info.getActor() == null) {
877                 LOG.debug("Creating Shard {}", info.getShardId());
878                 info.setActor(newShardActor(schemaContext, info));
879             } else {
880                 info.getActor().tell(message, getSelf());
881             }
882         }
883     }
884
885     @VisibleForTesting
886     protected ClusterWrapper getCluster() {
887         return cluster;
888     }
889
890     @VisibleForTesting
891     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
892         return getContext().actorOf(info.newProps(schemaContext)
893                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
894     }
895
896     private void findPrimary(FindPrimary message) {
897         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
898
899         final String shardName = message.getShardName();
900         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
901
902         // First see if the there is a local replica for the shard
903         final ShardInformation info = localShards.get(shardName);
904         if (info != null && info.isActiveMember()) {
905             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
906                 @Override
907                 public Object get() {
908                     String primaryPath = info.getSerializedLeaderActor();
909                     Object found = canReturnLocalShardState && info.isLeader() ?
910                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
911                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
912
913                             if(LOG.isDebugEnabled()) {
914                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
915                             }
916
917                             return found;
918                 }
919             });
920
921             return;
922         }
923
924         final Collection<String> visitedAddresses;
925         if(message instanceof RemoteFindPrimary) {
926             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
927         } else {
928             visitedAddresses = new ArrayList<>(1);
929         }
930
931         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
932
933         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
934             if(visitedAddresses.contains(address)) {
935                 continue;
936             }
937
938             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
939                     persistenceId(), shardName, address, visitedAddresses);
940
941             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
942                     message.isWaitUntilReady(), visitedAddresses), getContext());
943             return;
944         }
945
946         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
947
948         getSender().tell(new PrimaryNotFoundException(
949                 String.format("No primary shard found for %s.", shardName)), getSelf());
950     }
951
952     /**
953      * Construct the name of the shard actor given the name of the member on
954      * which the shard resides and the name of the shard
955      *
956      * @param memberName
957      * @param shardName
958      * @return
959      */
960     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
961         return peerAddressResolver.getShardIdentifier(memberName, shardName);
962     }
963
964     /**
965      * Create shards that are local to the member on which the ShardManager
966      * runs
967      *
968      */
969     private void createLocalShards() {
970         String memberName = this.cluster.getCurrentMemberName();
971         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
972
973         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
974         if(restoreFromSnapshot != null)
975         {
976             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
977                 shardSnapshots.put(snapshot.getName(), snapshot);
978             }
979         }
980
981         restoreFromSnapshot = null; // null out to GC
982
983         for(String shardName : memberShardNames){
984             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
985
986             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
987
988             Map<String, String> peerAddresses = getPeerAddresses(shardName);
989             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
990                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
991                         shardSnapshots.get(shardName)), peerAddressResolver));
992             mBean.addLocalShard(shardId.toString());
993         }
994     }
995
996     /**
997      * Given the name of the shard find the addresses of all it's peers
998      *
999      * @param shardName
1000      */
1001     private Map<String, String> getPeerAddresses(String shardName) {
1002         Collection<String> members = configuration.getMembersFromShardName(shardName);
1003         Map<String, String> peerAddresses = new HashMap<>();
1004
1005         String currentMemberName = this.cluster.getCurrentMemberName();
1006
1007         for(String memberName : members) {
1008             if(!currentMemberName.equals(memberName)) {
1009                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1010                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1011                 peerAddresses.put(shardId.toString(), address);
1012             }
1013         }
1014         return peerAddresses;
1015     }
1016
1017     @Override
1018     public SupervisorStrategy supervisorStrategy() {
1019
1020         return new OneForOneStrategy(10, Duration.create("1 minute"),
1021                 new Function<Throwable, SupervisorStrategy.Directive>() {
1022             @Override
1023             public SupervisorStrategy.Directive apply(Throwable t) {
1024                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1025                 return SupervisorStrategy.resume();
1026             }
1027         }
1028                 );
1029
1030     }
1031
1032     @Override
1033     public String persistenceId() {
1034         return persistenceId;
1035     }
1036
1037     @VisibleForTesting
1038     ShardManagerInfoMBean getMBean(){
1039         return mBean;
1040     }
1041
1042     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1043         if (shardReplicaOperationsInProgress.contains(shardName)) {
1044             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1045             LOG.debug ("{}: {}", persistenceId(), msg);
1046             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1047             return true;
1048         }
1049
1050         return false;
1051     }
1052
1053     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
1054         final String shardName = shardReplicaMsg.getShardName();
1055
1056         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1057
1058         // verify the shard with the specified name is present in the cluster configuration
1059         if (!(this.configuration.isShardConfigured(shardName))) {
1060             String msg = String.format("No module configuration exists for shard %s", shardName);
1061             LOG.debug ("{}: {}", persistenceId(), msg);
1062             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1063             return;
1064         }
1065
1066         // Create the localShard
1067         if (schemaContext == null) {
1068             String msg = String.format(
1069                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1070             LOG.debug ("{}: {}", persistenceId(), msg);
1071             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1072             return;
1073         }
1074
1075         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1076             @Override
1077             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1078                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1079             }
1080
1081             @Override
1082             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1083                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1084             }
1085
1086         });
1087     }
1088
1089     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1090         String msg = String.format("Local shard %s already exists", shardName);
1091         LOG.debug ("{}: {}", persistenceId(), msg);
1092         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
1093     }
1094
1095     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1096         if(isShardReplicaOperationInProgress(shardName, sender)) {
1097             return;
1098         }
1099
1100         shardReplicaOperationsInProgress.add(shardName);
1101
1102         final ShardInformation shardInfo;
1103         final boolean removeShardOnFailure;
1104         ShardInformation existingShardInfo = localShards.get(shardName);
1105         if(existingShardInfo == null) {
1106             removeShardOnFailure = true;
1107             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1108
1109             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1110                     DisableElectionsRaftPolicy.class.getName()).build();
1111
1112             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1113                     Shard.builder(), peerAddressResolver);
1114             shardInfo.setActiveMember(false);
1115             localShards.put(shardName, shardInfo);
1116             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1117         } else {
1118             removeShardOnFailure = false;
1119             shardInfo = existingShardInfo;
1120         }
1121
1122         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1123
1124         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1125         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1126                 response.getPrimaryPath(), shardInfo.getShardId());
1127
1128         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1129                 duration());
1130         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1131             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1132
1133         futureObj.onComplete(new OnComplete<Object>() {
1134             @Override
1135             public void onComplete(Throwable failure, Object addServerResponse) {
1136                 if (failure != null) {
1137                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1138                             response.getPrimaryPath(), shardName, failure);
1139
1140                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1141                             response.getPrimaryPath(), shardName);
1142                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1143                 } else {
1144                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1145                             response.getPrimaryPath(), removeShardOnFailure), sender);
1146                 }
1147             }
1148         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1149     }
1150
1151     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1152             boolean removeShardOnFailure) {
1153         shardReplicaOperationsInProgress.remove(shardName);
1154
1155         if(removeShardOnFailure) {
1156             ShardInformation shardInfo = localShards.remove(shardName);
1157             if (shardInfo.getActor() != null) {
1158                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1159             }
1160         }
1161
1162         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
1163             new RuntimeException(message, failure)), getSelf());
1164     }
1165
1166     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1167             String leaderPath, boolean removeShardOnFailure) {
1168         String shardName = shardInfo.getShardName();
1169         shardReplicaOperationsInProgress.remove(shardName);
1170
1171         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1172
1173         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1174             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1175
1176             // Make the local shard voting capable
1177             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1178             shardInfo.setActiveMember(true);
1179             persistShardList();
1180
1181             mBean.addLocalShard(shardInfo.getShardId().toString());
1182             sender.tell(new akka.actor.Status.Success(null), getSelf());
1183         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1184             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1185         } else {
1186             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1187                     persistenceId(), shardName, replyMsg.getStatus());
1188
1189             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1190
1191             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1192         }
1193     }
1194
1195     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1196                                                String leaderPath, ShardIdentifier shardId) {
1197         Exception failure;
1198         switch (serverChangeStatus) {
1199             case TIMEOUT:
1200                 failure = new TimeoutException(String.format(
1201                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1202                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1203                         leaderPath, shardId.getShardName()));
1204                 break;
1205             case NO_LEADER:
1206                 failure = createNoShardLeaderException(shardId);
1207                 break;
1208             case NOT_SUPPORTED:
1209                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1210                         serverChange.getSimpleName(), shardId.getShardName()));
1211                 break;
1212             default :
1213                 failure = new RuntimeException(String.format(
1214                         "%s request to leader %s for shard %s failed with status %s",
1215                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1216         }
1217         return failure;
1218     }
1219
1220     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1221         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1222
1223         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1224                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1225             @Override
1226             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1227                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1228             }
1229
1230             @Override
1231             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1232                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1233             }
1234         });
1235     }
1236
1237     private void persistShardList() {
1238         List<String> shardList = new ArrayList<>(localShards.keySet());
1239         for (ShardInformation shardInfo : localShards.values()) {
1240             if (!shardInfo.isActiveMember()) {
1241                 shardList.remove(shardInfo.getShardName());
1242             }
1243         }
1244         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1245         saveSnapshot(updateShardManagerSnapshot(shardList));
1246     }
1247
1248     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1249         currentSnapshot = new ShardManagerSnapshot(shardList);
1250         return currentSnapshot;
1251     }
1252
1253     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1254         currentSnapshot = snapshot;
1255
1256         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1257
1258         String currentMember = cluster.getCurrentMemberName();
1259         Set<String> configuredShardList =
1260             new HashSet<>(configuration.getMemberShardNames(currentMember));
1261         for (String shard : currentSnapshot.getShardList()) {
1262             if (!configuredShardList.contains(shard)) {
1263                 // add the current member as a replica for the shard
1264                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1265                 configuration.addMemberReplicaForShard(shard, currentMember);
1266             } else {
1267                 configuredShardList.remove(shard);
1268             }
1269         }
1270         for (String shard : configuredShardList) {
1271             // remove the member as a replica for the shard
1272             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1273             configuration.removeMemberReplicaForShard(shard, currentMember);
1274         }
1275     }
1276
1277     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1278         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1279             persistenceId());
1280         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1281             0, 0));
1282     }
1283
1284     private static class ForwardedAddServerReply {
1285         ShardInformation shardInfo;
1286         AddServerReply addServerReply;
1287         String leaderPath;
1288         boolean removeShardOnFailure;
1289
1290         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1291                 boolean removeShardOnFailure) {
1292             this.shardInfo = shardInfo;
1293             this.addServerReply = addServerReply;
1294             this.leaderPath = leaderPath;
1295             this.removeShardOnFailure = removeShardOnFailure;
1296         }
1297     }
1298
1299     private static class ForwardedAddServerFailure {
1300         String shardName;
1301         String failureMessage;
1302         Throwable failure;
1303         boolean removeShardOnFailure;
1304
1305         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1306                 boolean removeShardOnFailure) {
1307             this.shardName = shardName;
1308             this.failureMessage = failureMessage;
1309             this.failure = failure;
1310             this.removeShardOnFailure = removeShardOnFailure;
1311         }
1312     }
1313
1314     @VisibleForTesting
1315     protected static class ShardInformation {
1316         private final ShardIdentifier shardId;
1317         private final String shardName;
1318         private ActorRef actor;
1319         private final Map<String, String> initialPeerAddresses;
1320         private Optional<DataTree> localShardDataTree;
1321         private boolean leaderAvailable = false;
1322
1323         // flag that determines if the actor is ready for business
1324         private boolean actorInitialized = false;
1325
1326         private boolean followerSyncStatus = false;
1327
1328         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1329         private String role ;
1330         private String leaderId;
1331         private short leaderVersion;
1332
1333         private DatastoreContext datastoreContext;
1334         private Shard.AbstractBuilder<?, ?> builder;
1335         private final ShardPeerAddressResolver addressResolver;
1336         private boolean isActiveMember = true;
1337
1338         private ShardInformation(String shardName, ShardIdentifier shardId,
1339                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1340                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1341             this.shardName = shardName;
1342             this.shardId = shardId;
1343             this.initialPeerAddresses = initialPeerAddresses;
1344             this.datastoreContext = datastoreContext;
1345             this.builder = builder;
1346             this.addressResolver = addressResolver;
1347         }
1348
1349         Props newProps(SchemaContext schemaContext) {
1350             Preconditions.checkNotNull(builder);
1351             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1352                     schemaContext(schemaContext).props();
1353             builder = null;
1354             return props;
1355         }
1356
1357         String getShardName() {
1358             return shardName;
1359         }
1360
1361         @Nullable
1362         ActorRef getActor(){
1363             return actor;
1364         }
1365
1366         void setActor(ActorRef actor) {
1367             this.actor = actor;
1368         }
1369
1370         ShardIdentifier getShardId() {
1371             return shardId;
1372         }
1373
1374         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1375             this.localShardDataTree = localShardDataTree;
1376         }
1377
1378         Optional<DataTree> getLocalShardDataTree() {
1379             return localShardDataTree;
1380         }
1381
1382         DatastoreContext getDatastoreContext() {
1383             return datastoreContext;
1384         }
1385
1386         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1387             this.datastoreContext = datastoreContext;
1388             if (actor != null) {
1389                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1390                 actor.tell(this.datastoreContext, sender);
1391             }
1392         }
1393
1394         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1395             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1396
1397             if(actor != null) {
1398                 if(LOG.isDebugEnabled()) {
1399                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1400                             peerId, peerAddress, actor.path());
1401                 }
1402
1403                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1404             }
1405
1406             notifyOnShardInitializedCallbacks();
1407         }
1408
1409         void peerDown(String memberName, String peerId, ActorRef sender) {
1410             if(actor != null) {
1411                 actor.tell(new PeerDown(memberName, peerId), sender);
1412             }
1413         }
1414
1415         void peerUp(String memberName, String peerId, ActorRef sender) {
1416             if(actor != null) {
1417                 actor.tell(new PeerUp(memberName, peerId), sender);
1418             }
1419         }
1420
1421         boolean isShardReady() {
1422             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1423         }
1424
1425         boolean isShardReadyWithLeaderId() {
1426             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1427                     (isLeader() || addressResolver.resolve(leaderId) != null);
1428         }
1429
1430         boolean isShardInitialized() {
1431             return getActor() != null && actorInitialized;
1432         }
1433
1434         boolean isLeader() {
1435             return Objects.equal(leaderId, shardId.toString());
1436         }
1437
1438         String getSerializedLeaderActor() {
1439             if(isLeader()) {
1440                 return Serialization.serializedActorPath(getActor());
1441             } else {
1442                 return addressResolver.resolve(leaderId);
1443             }
1444         }
1445
1446         void setActorInitialized() {
1447             LOG.debug("Shard {} is initialized", shardId);
1448
1449             this.actorInitialized = true;
1450
1451             notifyOnShardInitializedCallbacks();
1452         }
1453
1454         private void notifyOnShardInitializedCallbacks() {
1455             if(onShardInitializedSet.isEmpty()) {
1456                 return;
1457             }
1458
1459             boolean ready = isShardReadyWithLeaderId();
1460
1461             if(LOG.isDebugEnabled()) {
1462                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1463                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1464             }
1465
1466             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1467             while(iter.hasNext()) {
1468                 OnShardInitialized onShardInitialized = iter.next();
1469                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1470                     iter.remove();
1471                     onShardInitialized.getTimeoutSchedule().cancel();
1472                     onShardInitialized.getReplyRunnable().run();
1473                 }
1474             }
1475         }
1476
1477         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1478             onShardInitializedSet.add(onShardInitialized);
1479         }
1480
1481         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1482             onShardInitializedSet.remove(onShardInitialized);
1483         }
1484
1485         void setRole(String newRole) {
1486             this.role = newRole;
1487
1488             notifyOnShardInitializedCallbacks();
1489         }
1490
1491         void setFollowerSyncStatus(boolean syncStatus){
1492             this.followerSyncStatus = syncStatus;
1493         }
1494
1495         boolean isInSync(){
1496             if(RaftState.Follower.name().equals(this.role)){
1497                 return followerSyncStatus;
1498             } else if(RaftState.Leader.name().equals(this.role)){
1499                 return true;
1500             }
1501
1502             return false;
1503         }
1504
1505         boolean setLeaderId(String leaderId) {
1506             boolean changed = !Objects.equal(this.leaderId, leaderId);
1507             this.leaderId = leaderId;
1508             if(leaderId != null) {
1509                 this.leaderAvailable = true;
1510             }
1511             notifyOnShardInitializedCallbacks();
1512
1513             return changed;
1514         }
1515
1516         String getLeaderId() {
1517             return leaderId;
1518         }
1519
1520         void setLeaderAvailable(boolean leaderAvailable) {
1521             this.leaderAvailable = leaderAvailable;
1522
1523             if(leaderAvailable) {
1524                 notifyOnShardInitializedCallbacks();
1525             }
1526         }
1527
1528         short getLeaderVersion() {
1529             return leaderVersion;
1530         }
1531
1532         void setLeaderVersion(short leaderVersion) {
1533             this.leaderVersion = leaderVersion;
1534         }
1535
1536         boolean isActiveMember() {
1537             return isActiveMember;
1538         }
1539
1540         void setActiveMember(boolean isActiveMember) {
1541             this.isActiveMember = isActiveMember;
1542         }
1543     }
1544
1545     private static class OnShardInitialized {
1546         private final Runnable replyRunnable;
1547         private Cancellable timeoutSchedule;
1548
1549         OnShardInitialized(Runnable replyRunnable) {
1550             this.replyRunnable = replyRunnable;
1551         }
1552
1553         Runnable getReplyRunnable() {
1554             return replyRunnable;
1555         }
1556
1557         Cancellable getTimeoutSchedule() {
1558             return timeoutSchedule;
1559         }
1560
1561         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1562             this.timeoutSchedule = timeoutSchedule;
1563         }
1564     }
1565
1566     private static class OnShardReady extends OnShardInitialized {
1567         OnShardReady(Runnable replyRunnable) {
1568             super(replyRunnable);
1569         }
1570     }
1571
1572     private static class ShardNotInitializedTimeout {
1573         private final ActorRef sender;
1574         private final ShardInformation shardInfo;
1575         private final OnShardInitialized onShardInitialized;
1576
1577         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1578             this.sender = sender;
1579             this.shardInfo = shardInfo;
1580             this.onShardInitialized = onShardInitialized;
1581         }
1582
1583         ActorRef getSender() {
1584             return sender;
1585         }
1586
1587         ShardInformation getShardInfo() {
1588             return shardInfo;
1589         }
1590
1591         OnShardInitialized getOnShardInitialized() {
1592             return onShardInitialized;
1593         }
1594     }
1595
1596     public static Builder builder() {
1597         return new Builder();
1598     }
1599
1600     public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
1601         private ClusterWrapper cluster;
1602         private Configuration configuration;
1603         private DatastoreContextFactory datastoreContextFactory;
1604         private CountDownLatch waitTillReadyCountdownLatch;
1605         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1606         private DatastoreSnapshot restoreFromSnapshot;
1607         private volatile boolean sealed;
1608
1609         @SuppressWarnings("unchecked")
1610         private T self() {
1611             return (T) this;
1612         }
1613
1614         protected void checkSealed() {
1615             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1616         }
1617
1618         public T cluster(ClusterWrapper cluster) {
1619             checkSealed();
1620             this.cluster = cluster;
1621             return self();
1622         }
1623
1624         public T configuration(Configuration configuration) {
1625             checkSealed();
1626             this.configuration = configuration;
1627             return self();
1628         }
1629
1630         public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1631             checkSealed();
1632             this.datastoreContextFactory = datastoreContextFactory;
1633             return self();
1634         }
1635
1636         public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1637             checkSealed();
1638             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1639             return self();
1640         }
1641
1642         public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1643             checkSealed();
1644             this.primaryShardInfoCache = primaryShardInfoCache;
1645             return self();
1646         }
1647
1648         public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1649             checkSealed();
1650             this.restoreFromSnapshot = restoreFromSnapshot;
1651             return self();
1652         }
1653
1654         protected void verify() {
1655             sealed = true;
1656             Preconditions.checkNotNull(cluster, "cluster should not be null");
1657             Preconditions.checkNotNull(configuration, "configuration should not be null");
1658             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1659             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1660             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1661         }
1662
1663         public Props props() {
1664             verify();
1665             return Props.create(ShardManager.class, this);
1666         }
1667     }
1668
1669     public static class Builder extends AbstractBuilder<Builder> {
1670     }
1671
1672     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1673         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1674                 getShardInitializationTimeout().duration().$times(2));
1675
1676
1677         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1678         futureObj.onComplete(new OnComplete<Object>() {
1679             @Override
1680             public void onComplete(Throwable failure, Object response) {
1681                 if (failure != null) {
1682                     handler.onFailure(failure);
1683                 } else {
1684                     if(response instanceof RemotePrimaryShardFound) {
1685                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1686                     } else if(response instanceof LocalPrimaryShardFound) {
1687                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1688                     } else {
1689                         handler.onUnknownResponse(response);
1690                     }
1691                 }
1692             }
1693         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1694     }
1695
1696     /**
1697      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1698      * a remote or local find primary message is processed
1699      */
1700     private static interface FindPrimaryResponseHandler {
1701         /**
1702          * Invoked when a Failure message is received as a response
1703          *
1704          * @param failure
1705          */
1706         void onFailure(Throwable failure);
1707
1708         /**
1709          * Invoked when a RemotePrimaryShardFound response is received
1710          *
1711          * @param response
1712          */
1713         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1714
1715         /**
1716          * Invoked when a LocalPrimaryShardFound response is received
1717          * @param response
1718          */
1719         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1720
1721         /**
1722          * Invoked when an unknown response is received. This is another type of failure.
1723          *
1724          * @param response
1725          */
1726         void onUnknownResponse(Object response);
1727     }
1728
1729     /**
1730      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1731      * replica and sends a wrapped Failure response to some targetActor
1732      */
1733     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1734         private final ActorRef targetActor;
1735         private final String shardName;
1736         private final String persistenceId;
1737         private final ActorRef shardManagerActor;
1738
1739         /**
1740          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1741          * @param shardName The name of the shard for which the primary replica had to be found
1742          * @param persistenceId The persistenceId for the ShardManager
1743          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1744          */
1745         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1746             this.targetActor = Preconditions.checkNotNull(targetActor);
1747             this.shardName = Preconditions.checkNotNull(shardName);
1748             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1749             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1750         }
1751
1752         public ActorRef getTargetActor() {
1753             return targetActor;
1754         }
1755
1756         public String getShardName() {
1757             return shardName;
1758         }
1759
1760         @Override
1761         public void onFailure(Throwable failure) {
1762             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1763             targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1764                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1765         }
1766
1767         @Override
1768         public void onUnknownResponse(Object response) {
1769             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1770                     shardName, response);
1771             LOG.debug ("{}: {}", persistenceId, msg);
1772             targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1773                     new RuntimeException(msg)), shardManagerActor);
1774         }
1775     }
1776
1777
1778     /**
1779      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1780      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1781      * as a successful response to find primary.
1782      */
1783     private static class PrimaryShardFoundForContext {
1784         private final String shardName;
1785         private final Object contextMessage;
1786         private final RemotePrimaryShardFound remotePrimaryShardFound;
1787         private final LocalPrimaryShardFound localPrimaryShardFound;
1788
1789         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
1790                 @Nonnull Object primaryFoundMessage) {
1791             this.shardName = Preconditions.checkNotNull(shardName);
1792             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1793             Preconditions.checkNotNull(primaryFoundMessage);
1794             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
1795                     (RemotePrimaryShardFound) primaryFoundMessage : null;
1796             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
1797                     (LocalPrimaryShardFound) primaryFoundMessage : null;
1798         }
1799
1800         @Nonnull
1801         String getPrimaryPath(){
1802             if(remotePrimaryShardFound != null) {
1803                 return remotePrimaryShardFound.getPrimaryPath();
1804             }
1805             return localPrimaryShardFound.getPrimaryPath();
1806         }
1807
1808         @Nonnull
1809         Object getContextMessage() {
1810             return contextMessage;
1811         }
1812
1813         @Nullable
1814         RemotePrimaryShardFound getRemotePrimaryShardFound() {
1815             return remotePrimaryShardFound;
1816         }
1817
1818         @Nonnull
1819         String getShardName() {
1820             return shardName;
1821         }
1822     }
1823
1824     /**
1825      * The WrappedShardResponse class wraps a response from a Shard.
1826      */
1827     private static class WrappedShardResponse {
1828         private final ShardIdentifier shardId;
1829         private final Object response;
1830         private final String leaderPath;
1831
1832         private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1833             this.shardId = shardId;
1834             this.response = response;
1835             this.leaderPath = leaderPath;
1836         }
1837
1838         ShardIdentifier getShardId() {
1839             return shardId;
1840         }
1841
1842         Object getResponse() {
1843             return response;
1844         }
1845
1846         String getLeaderPath() {
1847             return leaderPath;
1848         }
1849     }
1850 }
1851
1852
1853