33b3810447532741a30e2e0f7084888646346a6a
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.Futures;
22 import akka.dispatch.OnComplete;
23 import akka.japi.Function;
24 import akka.pattern.Patterns;
25 import akka.persistence.RecoveryCompleted;
26 import akka.persistence.SaveSnapshotFailure;
27 import akka.persistence.SaveSnapshotSuccess;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.serialization.Serialization;
31 import akka.util.Timeout;
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.base.Objects;
34 import com.google.common.base.Optional;
35 import com.google.common.base.Preconditions;
36 import com.google.common.base.Strings;
37 import com.google.common.base.Supplier;
38 import com.google.common.collect.Sets;
39 import java.io.ByteArrayInputStream;
40 import java.io.ObjectInputStream;
41 import java.io.Serializable;
42 import java.util.ArrayList;
43 import java.util.Collection;
44 import java.util.Collections;
45 import java.util.HashMap;
46 import java.util.HashSet;
47 import java.util.Iterator;
48 import java.util.List;
49 import java.util.Map;
50 import java.util.Set;
51 import java.util.concurrent.CountDownLatch;
52 import java.util.concurrent.TimeUnit;
53 import java.util.concurrent.TimeoutException;
54 import javax.annotation.Nonnull;
55 import javax.annotation.Nullable;
56 import org.apache.commons.lang3.SerializationUtils;
57 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
58 import org.opendaylight.controller.cluster.datastore.config.Configuration;
59 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
60 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
61 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
63 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
64 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
65 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
66 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
67 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
68 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
69 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
70 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
71 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
72 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
73 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
76 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
77 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
78 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
79 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
80 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
81 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
82 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
83 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
84 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
85 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
86 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
87 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
88 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
89 import org.opendaylight.controller.cluster.raft.RaftState;
90 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
91 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
92 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
93 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
94 import org.opendaylight.controller.cluster.raft.messages.AddServer;
95 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
96 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
97 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
98 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
99 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
100 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
101 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
102 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
103 import org.slf4j.Logger;
104 import org.slf4j.LoggerFactory;
105 import scala.concurrent.ExecutionContext;
106 import scala.concurrent.Future;
107 import scala.concurrent.duration.Duration;
108 import scala.concurrent.duration.FiniteDuration;
109
110 /**
111  * The ShardManager has the following jobs,
112  * <ul>
113  * <li> Create all the local shard replicas that belong on this cluster member
114  * <li> Find the address of the local shard
115  * <li> Find the primary replica for any given shard
116  * <li> Monitor the cluster members and store their addresses
117  * <ul>
118  */
119 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
120
121     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
122
123     // Stores a mapping between a shard name and it's corresponding information
124     // Shard names look like inventory, topology etc and are as specified in
125     // configuration
126     private final Map<String, ShardInformation> localShards = new HashMap<>();
127
128     // The type of a ShardManager reflects the type of the datastore itself
129     // A data store could be of type config/operational
130     private final String type;
131
132     private final ClusterWrapper cluster;
133
134     private final Configuration configuration;
135
136     private final String shardDispatcherPath;
137
138     private final ShardManagerInfo mBean;
139
140     private DatastoreContextFactory datastoreContextFactory;
141
142     private final CountDownLatch waitTillReadyCountdownLatch;
143
144     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
145
146     private final ShardPeerAddressResolver peerAddressResolver;
147
148     private SchemaContext schemaContext;
149
150     private DatastoreSnapshot restoreFromSnapshot;
151
152     private ShardManagerSnapshot currentSnapshot;
153
154     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
155
156     private final String persistenceId;
157
158     /**
159      */
160     protected ShardManager(AbstractBuilder<?> builder) {
161
162         this.cluster = builder.cluster;
163         this.configuration = builder.configuration;
164         this.datastoreContextFactory = builder.datastoreContextFactory;
165         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
166         this.shardDispatcherPath =
167                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
168         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
169         this.primaryShardInfoCache = builder.primaryShardInfoCache;
170         this.restoreFromSnapshot = builder.restoreFromSnapshot;
171
172         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
173         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
174
175         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
176
177         // Subscribe this actor to cluster member events
178         cluster.subscribeToMemberEvents(getSelf());
179
180         List<String> localShardActorNames = new ArrayList<>();
181         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
182                 "shard-manager-" + this.type,
183                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
184                 localShardActorNames);
185         mBean.setShardManager(this);
186     }
187
188     @Override
189     public void postStop() {
190         LOG.info("Stopping ShardManager {}", persistenceId());
191
192         mBean.unregisterMBean();
193     }
194
195     @Override
196     public void handleCommand(Object message) throws Exception {
197         if (message  instanceof FindPrimary) {
198             findPrimary((FindPrimary)message);
199         } else if(message instanceof FindLocalShard){
200             findLocalShard((FindLocalShard) message);
201         } else if (message instanceof UpdateSchemaContext) {
202             updateSchemaContext(message);
203         } else if(message instanceof ActorInitialized) {
204             onActorInitialized(message);
205         } else if (message instanceof ClusterEvent.MemberUp){
206             memberUp((ClusterEvent.MemberUp) message);
207         } else if (message instanceof ClusterEvent.MemberExited){
208             memberExited((ClusterEvent.MemberExited) message);
209         } else if(message instanceof ClusterEvent.MemberRemoved) {
210             memberRemoved((ClusterEvent.MemberRemoved) message);
211         } else if(message instanceof ClusterEvent.UnreachableMember) {
212             memberUnreachable((ClusterEvent.UnreachableMember)message);
213         } else if(message instanceof ClusterEvent.ReachableMember) {
214             memberReachable((ClusterEvent.ReachableMember) message);
215         } else if(message instanceof DatastoreContextFactory) {
216             onDatastoreContextFactory((DatastoreContextFactory)message);
217         } else if(message instanceof RoleChangeNotification) {
218             onRoleChangeNotification((RoleChangeNotification) message);
219         } else if(message instanceof FollowerInitialSyncUpStatus){
220             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
221         } else if(message instanceof ShardNotInitializedTimeout) {
222             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
223         } else if(message instanceof ShardLeaderStateChanged) {
224             onLeaderStateChanged((ShardLeaderStateChanged) message);
225         } else if(message instanceof SwitchShardBehavior){
226             onSwitchShardBehavior((SwitchShardBehavior) message);
227         } else if(message instanceof CreateShard) {
228             onCreateShard((CreateShard)message);
229         } else if(message instanceof AddShardReplica){
230             onAddShardReplica((AddShardReplica)message);
231         } else if(message instanceof ForwardedAddServerReply) {
232             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
233             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
234                     msg.removeShardOnFailure);
235         } else if(message instanceof ForwardedAddServerFailure) {
236             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
237             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
238         } else if(message instanceof PrimaryShardFoundForContext) {
239             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
240             onPrimaryShardFoundContext(primaryShardFoundContext);
241         } else if(message instanceof RemoveShardReplica) {
242             onRemoveShardReplica((RemoveShardReplica) message);
243         } else if(message instanceof WrappedShardResponse){
244             onWrappedShardResponse((WrappedShardResponse) message);
245         } else if(message instanceof GetSnapshot) {
246             onGetSnapshot();
247         } else if(message instanceof ServerRemoved){
248             onShardReplicaRemoved((ServerRemoved) message);
249         } else if(message instanceof SaveSnapshotSuccess) {
250             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
251         } else if(message instanceof SaveSnapshotFailure) {
252             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
253                     persistenceId(), ((SaveSnapshotFailure) message).cause());
254         } else if(message instanceof Shutdown) {
255             onShutDown();
256         } else {
257             unknownMessage(message);
258         }
259     }
260
261     private void onShutDown() {
262         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
263         for (ShardInformation info : localShards.values()) {
264             if (info.getActor() != null) {
265                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
266
267                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
268                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
269             }
270         }
271
272         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
273
274         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
275         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
276
277         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
278             @Override
279             public void onComplete(Throwable failure, Iterable<Boolean> results) {
280                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
281
282                 self().tell(PoisonPill.getInstance(), self());
283
284                 if(failure != null) {
285                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
286                 } else {
287                     int nfailed = 0;
288                     for(Boolean r: results) {
289                         if(!r) {
290                             nfailed++;
291                         }
292                     }
293
294                     if(nfailed > 0) {
295                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
296                     }
297                 }
298             }
299         }, dispatcher);
300     }
301
302     private void onWrappedShardResponse(WrappedShardResponse message) {
303         if (message.getResponse() instanceof RemoveServerReply) {
304             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
305                     message.getLeaderPath());
306         }
307     }
308
309     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
310             String leaderPath) {
311         shardReplicaOperationsInProgress.remove(shardId);
312
313         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
314
315         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
316             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
317                     shardId.getShardName());
318             originalSender.tell(new akka.actor.Status.Success(null), getSelf());
319         } else {
320             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
321                     persistenceId(), shardId, replyMsg.getStatus());
322
323             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
324                     leaderPath, shardId);
325             originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
326         }
327     }
328
329     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
330         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
331             addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
332                     getSender());
333         } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
334             removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
335                     primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
336         }
337     }
338
339     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
340             final ActorRef sender) {
341         if(isShardReplicaOperationInProgress(shardName, sender)) {
342             return;
343         }
344
345         shardReplicaOperationsInProgress.add(shardName);
346
347         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
348
349         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
350
351         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
352         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
353                 primaryPath, shardId);
354
355         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
356                 duration());
357         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
358                 new RemoveServer(shardId.toString()), removeServerTimeout);
359
360         futureObj.onComplete(new OnComplete<Object>() {
361             @Override
362             public void onComplete(Throwable failure, Object response) {
363                 if (failure != null) {
364                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
365                             primaryPath, shardName);
366
367                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
368
369                     // FAILURE
370                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
371                 } else {
372                     // SUCCESS
373                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
374                 }
375             }
376         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
377     }
378
379     private void onShardReplicaRemoved(ServerRemoved message) {
380         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
381         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
382         if(shardInformation == null) {
383             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
384             return;
385         } else if(shardInformation.getActor() != null) {
386             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
387             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
388         }
389         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
390         persistShardList();
391     }
392
393     private void onGetSnapshot() {
394         LOG.debug("{}: onGetSnapshot", persistenceId());
395
396         List<String> notInitialized = null;
397         for(ShardInformation shardInfo: localShards.values()) {
398             if(!shardInfo.isShardInitialized()) {
399                 if(notInitialized == null) {
400                     notInitialized = new ArrayList<>();
401                 }
402
403                 notInitialized.add(shardInfo.getShardName());
404             }
405         }
406
407         if(notInitialized != null) {
408             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
409                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
410             return;
411         }
412
413         byte[] shardManagerSnapshot = null;
414         if(currentSnapshot != null) {
415             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
416         }
417
418         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
419                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
420                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
421
422         for(ShardInformation shardInfo: localShards.values()) {
423             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
424         }
425     }
426
427     private void onCreateShard(CreateShard createShard) {
428         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
429
430         Object reply;
431         try {
432             String shardName = createShard.getModuleShardConfig().getShardName();
433             if(localShards.containsKey(shardName)) {
434                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
435                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
436             } else {
437                 doCreateShard(createShard);
438                 reply = new akka.actor.Status.Success(null);
439             }
440         } catch (Exception e) {
441             LOG.error("{}: onCreateShard failed", persistenceId(), e);
442             reply = new akka.actor.Status.Failure(e);
443         }
444
445         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
446             getSender().tell(reply, getSelf());
447         }
448     }
449
450     private void doCreateShard(CreateShard createShard) {
451         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
452         String shardName = moduleShardConfig.getShardName();
453
454         configuration.addModuleShardConfiguration(moduleShardConfig);
455
456         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
457         if(shardDatastoreContext == null) {
458             shardDatastoreContext = newShardDatastoreContext(shardName);
459         } else {
460             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
461                     peerAddressResolver).build();
462         }
463
464         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
465
466         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
467                 currentSnapshot.getShardList().contains(shardName);
468
469         Map<String, String> peerAddresses;
470         boolean isActiveMember;
471         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
472                 contains(cluster.getCurrentMemberName())) {
473             peerAddresses = getPeerAddresses(shardName);
474             isActiveMember = true;
475         } else {
476             // The local member is not in the static shard member configuration and the shard did not
477             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
478             // the shard with no peers and with elections disabled so it stays as follower. A
479             // subsequent AddServer request will be needed to make it an active member.
480             isActiveMember = false;
481             peerAddresses = Collections.emptyMap();
482             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
483                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
484         }
485
486         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
487                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
488                 isActiveMember);
489
490         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
491                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
492         info.setActiveMember(isActiveMember);
493         localShards.put(info.getShardName(), info);
494
495         mBean.addLocalShard(shardId.toString());
496
497         if(schemaContext != null) {
498             info.setActor(newShardActor(schemaContext, info));
499         }
500     }
501
502     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
503         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
504                 shardPeerAddressResolver(peerAddressResolver);
505     }
506
507     private DatastoreContext newShardDatastoreContext(String shardName) {
508         return newShardDatastoreContextBuilder(shardName).build();
509     }
510
511     private void checkReady(){
512         if (isReadyWithLeaderId()) {
513             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
514                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
515
516             waitTillReadyCountdownLatch.countDown();
517         }
518     }
519
520     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
521         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
522
523         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
524         if(shardInformation != null) {
525             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
526             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
527             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
528                 primaryShardInfoCache.remove(shardInformation.getShardName());
529             }
530
531             checkReady();
532         } else {
533             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
534         }
535     }
536
537     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
538         ShardInformation shardInfo = message.getShardInfo();
539
540         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
541                 shardInfo.getShardName());
542
543         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
544
545         if(!shardInfo.isShardInitialized()) {
546             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
547             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
548         } else {
549             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
550             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
551         }
552     }
553
554     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
555         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
556                 status.getName(), status.isInitialSyncDone());
557
558         ShardInformation shardInformation = findShardInformation(status.getName());
559
560         if(shardInformation != null) {
561             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
562
563             mBean.setSyncStatus(isInSync());
564         }
565
566     }
567
568     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
569         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
570                 roleChanged.getOldRole(), roleChanged.getNewRole());
571
572         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
573         if(shardInformation != null) {
574             shardInformation.setRole(roleChanged.getNewRole());
575             checkReady();
576             mBean.setSyncStatus(isInSync());
577         }
578     }
579
580
581     private ShardInformation findShardInformation(String memberId) {
582         for(ShardInformation info : localShards.values()){
583             if(info.getShardId().toString().equals(memberId)){
584                 return info;
585             }
586         }
587
588         return null;
589     }
590
591     private boolean isReadyWithLeaderId() {
592         boolean isReady = true;
593         for (ShardInformation info : localShards.values()) {
594             if(!info.isShardReadyWithLeaderId()){
595                 isReady = false;
596                 break;
597             }
598         }
599         return isReady;
600     }
601
602     private boolean isInSync(){
603         for (ShardInformation info : localShards.values()) {
604             if(!info.isInSync()){
605                 return false;
606             }
607         }
608         return true;
609     }
610
611     private void onActorInitialized(Object message) {
612         final ActorRef sender = getSender();
613
614         if (sender == null) {
615             return; //why is a non-actor sending this message? Just ignore.
616         }
617
618         String actorName = sender.path().name();
619         //find shard name from actor name; actor name is stringified shardId
620         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
621
622         if (shardId.getShardName() == null) {
623             return;
624         }
625
626         markShardAsInitialized(shardId.getShardName());
627     }
628
629     private void markShardAsInitialized(String shardName) {
630         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
631
632         ShardInformation shardInformation = localShards.get(shardName);
633         if (shardInformation != null) {
634             shardInformation.setActorInitialized();
635
636             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
637         }
638     }
639
640     @Override
641     protected void handleRecover(Object message) throws Exception {
642         if (message instanceof RecoveryCompleted) {
643             onRecoveryCompleted();
644         } else if (message instanceof SnapshotOffer) {
645             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
646         }
647     }
648
649     private void onRecoveryCompleted() {
650         LOG.info("Recovery complete : {}", persistenceId());
651
652         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
653         // journal on upgrade from Helium.
654         deleteMessages(lastSequenceNr());
655
656         if(currentSnapshot == null && restoreFromSnapshot != null &&
657                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
658             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
659                     restoreFromSnapshot.getShardManagerSnapshot()))) {
660                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
661
662                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
663
664                 applyShardManagerSnapshot(snapshot);
665             } catch(Exception e) {
666                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
667             }
668         }
669
670         createLocalShards();
671     }
672
673     private void findLocalShard(FindLocalShard message) {
674         final ShardInformation shardInformation = localShards.get(message.getShardName());
675
676         if(shardInformation == null){
677             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
678             return;
679         }
680
681         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
682             @Override
683             public Object get() {
684                 return new LocalShardFound(shardInformation.getActor());
685             }
686         });
687     }
688
689     private void sendResponse(ShardInformation shardInformation, boolean doWait,
690             boolean wantShardReady, final Supplier<Object> messageSupplier) {
691         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
692             if(doWait) {
693                 final ActorRef sender = getSender();
694                 final ActorRef self = self();
695
696                 Runnable replyRunnable = new Runnable() {
697                     @Override
698                     public void run() {
699                         sender.tell(messageSupplier.get(), self);
700                     }
701                 };
702
703                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
704                     new OnShardInitialized(replyRunnable);
705
706                 shardInformation.addOnShardInitialized(onShardInitialized);
707
708                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
709                 if(shardInformation.isShardInitialized()) {
710                     // If the shard is already initialized then we'll wait enough time for the shard to
711                     // elect a leader, ie 2 times the election timeout.
712                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
713                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
714                 }
715
716                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
717                         shardInformation.getShardName());
718
719                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
720                         timeout, getSelf(),
721                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
722                         getContext().dispatcher(), getSelf());
723
724                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
725
726             } else if (!shardInformation.isShardInitialized()) {
727                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
728                         shardInformation.getShardName());
729                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
730             } else {
731                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
732                         shardInformation.getShardName());
733                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
734             }
735
736             return;
737         }
738
739         getSender().tell(messageSupplier.get(), getSelf());
740     }
741
742     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
743         return new NoShardLeaderException(null, shardId.toString());
744     }
745
746     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
747         return new NotInitializedException(String.format(
748                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
749     }
750
751     private void memberRemoved(ClusterEvent.MemberRemoved message) {
752         String memberName = message.member().roles().iterator().next();
753
754         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
755                 message.member().address());
756
757         peerAddressResolver.removePeerAddress(memberName);
758
759         for(ShardInformation info : localShards.values()){
760             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
761         }
762     }
763
764     private void memberExited(ClusterEvent.MemberExited message) {
765         String memberName = message.member().roles().iterator().next();
766
767         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
768                 message.member().address());
769
770         peerAddressResolver.removePeerAddress(memberName);
771
772         for(ShardInformation info : localShards.values()){
773             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
774         }
775     }
776
777     private void memberUp(ClusterEvent.MemberUp message) {
778         String memberName = message.member().roles().iterator().next();
779
780         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
781                 message.member().address());
782
783         addPeerAddress(memberName, message.member().address());
784
785         checkReady();
786     }
787
788     private void addPeerAddress(String memberName, Address address) {
789         peerAddressResolver.addPeerAddress(memberName, address);
790
791         for(ShardInformation info : localShards.values()){
792             String shardName = info.getShardName();
793             String peerId = getShardIdentifier(memberName, shardName).toString();
794             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
795
796             info.peerUp(memberName, peerId, getSelf());
797         }
798     }
799
800     private void memberReachable(ClusterEvent.ReachableMember message) {
801         String memberName = message.member().roles().iterator().next();
802         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
803
804         addPeerAddress(memberName, message.member().address());
805
806         markMemberAvailable(memberName);
807     }
808
809     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
810         String memberName = message.member().roles().iterator().next();
811         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
812
813         markMemberUnavailable(memberName);
814     }
815
816     private void markMemberUnavailable(final String memberName) {
817         for(ShardInformation info : localShards.values()){
818             String leaderId = info.getLeaderId();
819             if(leaderId != null && leaderId.contains(memberName)) {
820                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
821                 info.setLeaderAvailable(false);
822
823                 primaryShardInfoCache.remove(info.getShardName());
824             }
825
826             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
827         }
828     }
829
830     private void markMemberAvailable(final String memberName) {
831         for(ShardInformation info : localShards.values()){
832             String leaderId = info.getLeaderId();
833             if(leaderId != null && leaderId.contains(memberName)) {
834                 LOG.debug("Marking Leader {} as available.", leaderId);
835                 info.setLeaderAvailable(true);
836             }
837
838             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
839         }
840     }
841
842     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
843         datastoreContextFactory = factory;
844         for (ShardInformation info : localShards.values()) {
845             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
846         }
847     }
848
849     private void onSwitchShardBehavior(SwitchShardBehavior message) {
850         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
851
852         ShardInformation shardInformation = localShards.get(identifier.getShardName());
853
854         if(shardInformation != null && shardInformation.getActor() != null) {
855             shardInformation.getActor().tell(
856                     new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
857         } else {
858             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
859                     message.getShardName(), message.getNewState());
860         }
861     }
862
863     /**
864      * Notifies all the local shards of a change in the schema context
865      *
866      * @param message
867      */
868     private void updateSchemaContext(final Object message) {
869         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
870
871         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
872
873         for (ShardInformation info : localShards.values()) {
874             if (info.getActor() == null) {
875                 LOG.debug("Creating Shard {}", info.getShardId());
876                 info.setActor(newShardActor(schemaContext, info));
877             } else {
878                 info.getActor().tell(message, getSelf());
879             }
880         }
881     }
882
883     @VisibleForTesting
884     protected ClusterWrapper getCluster() {
885         return cluster;
886     }
887
888     @VisibleForTesting
889     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
890         return getContext().actorOf(info.newProps(schemaContext)
891                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
892     }
893
894     private void findPrimary(FindPrimary message) {
895         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
896
897         final String shardName = message.getShardName();
898         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
899
900         // First see if the there is a local replica for the shard
901         final ShardInformation info = localShards.get(shardName);
902         if (info != null && info.isActiveMember()) {
903             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
904                 @Override
905                 public Object get() {
906                     String primaryPath = info.getSerializedLeaderActor();
907                     Object found = canReturnLocalShardState && info.isLeader() ?
908                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
909                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
910
911                             if(LOG.isDebugEnabled()) {
912                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
913                             }
914
915                             return found;
916                 }
917             });
918
919             return;
920         }
921
922         Collection<String> visitedAddresses;
923         if(message instanceof RemoteFindPrimary) {
924             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
925         } else {
926             visitedAddresses = new ArrayList<>();
927         }
928
929         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
930
931         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
932             if(visitedAddresses.contains(address)) {
933                 continue;
934             }
935
936             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
937                     persistenceId(), shardName, address, visitedAddresses);
938
939             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
940                     message.isWaitUntilReady(), visitedAddresses), getContext());
941             return;
942         }
943
944         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
945
946         getSender().tell(new PrimaryNotFoundException(
947                 String.format("No primary shard found for %s.", shardName)), getSelf());
948     }
949
950     /**
951      * Construct the name of the shard actor given the name of the member on
952      * which the shard resides and the name of the shard
953      *
954      * @param memberName
955      * @param shardName
956      * @return
957      */
958     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
959         return peerAddressResolver.getShardIdentifier(memberName, shardName);
960     }
961
962     /**
963      * Create shards that are local to the member on which the ShardManager
964      * runs
965      *
966      */
967     private void createLocalShards() {
968         String memberName = this.cluster.getCurrentMemberName();
969         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
970
971         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
972         if(restoreFromSnapshot != null)
973         {
974             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
975                 shardSnapshots.put(snapshot.getName(), snapshot);
976             }
977         }
978
979         restoreFromSnapshot = null; // null out to GC
980
981         for(String shardName : memberShardNames){
982             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
983
984             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
985
986             Map<String, String> peerAddresses = getPeerAddresses(shardName);
987             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
988                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
989                         shardSnapshots.get(shardName)), peerAddressResolver));
990             mBean.addLocalShard(shardId.toString());
991         }
992     }
993
994     /**
995      * Given the name of the shard find the addresses of all it's peers
996      *
997      * @param shardName
998      */
999     private Map<String, String> getPeerAddresses(String shardName) {
1000         Collection<String> members = configuration.getMembersFromShardName(shardName);
1001         Map<String, String> peerAddresses = new HashMap<>();
1002
1003         String currentMemberName = this.cluster.getCurrentMemberName();
1004
1005         for(String memberName : members) {
1006             if(!currentMemberName.equals(memberName)) {
1007                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1008                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1009                 peerAddresses.put(shardId.toString(), address);
1010             }
1011         }
1012         return peerAddresses;
1013     }
1014
1015     @Override
1016     public SupervisorStrategy supervisorStrategy() {
1017
1018         return new OneForOneStrategy(10, Duration.create("1 minute"),
1019                 new Function<Throwable, SupervisorStrategy.Directive>() {
1020             @Override
1021             public SupervisorStrategy.Directive apply(Throwable t) {
1022                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1023                 return SupervisorStrategy.resume();
1024             }
1025         }
1026                 );
1027
1028     }
1029
1030     @Override
1031     public String persistenceId() {
1032         return persistenceId;
1033     }
1034
1035     @VisibleForTesting
1036     ShardManagerInfoMBean getMBean(){
1037         return mBean;
1038     }
1039
1040     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1041         if (shardReplicaOperationsInProgress.contains(shardName)) {
1042             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1043             LOG.debug ("{}: {}", persistenceId(), msg);
1044             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1045             return true;
1046         }
1047
1048         return false;
1049     }
1050
1051     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
1052         final String shardName = shardReplicaMsg.getShardName();
1053
1054         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1055
1056         // verify the shard with the specified name is present in the cluster configuration
1057         if (!(this.configuration.isShardConfigured(shardName))) {
1058             String msg = String.format("No module configuration exists for shard %s", shardName);
1059             LOG.debug ("{}: {}", persistenceId(), msg);
1060             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1061             return;
1062         }
1063
1064         // Create the localShard
1065         if (schemaContext == null) {
1066             String msg = String.format(
1067                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1068             LOG.debug ("{}: {}", persistenceId(), msg);
1069             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1070             return;
1071         }
1072
1073         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1074             @Override
1075             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1076                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1077             }
1078
1079             @Override
1080             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1081                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1082             }
1083
1084         });
1085     }
1086
1087     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1088         String msg = String.format("Local shard %s already exists", shardName);
1089         LOG.debug ("{}: {}", persistenceId(), msg);
1090         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
1091     }
1092
1093     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1094         if(isShardReplicaOperationInProgress(shardName, sender)) {
1095             return;
1096         }
1097
1098         shardReplicaOperationsInProgress.add(shardName);
1099
1100         final ShardInformation shardInfo;
1101         final boolean removeShardOnFailure;
1102         ShardInformation existingShardInfo = localShards.get(shardName);
1103         if(existingShardInfo == null) {
1104             removeShardOnFailure = true;
1105             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1106
1107             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1108                     DisableElectionsRaftPolicy.class.getName()).build();
1109
1110             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1111                     Shard.builder(), peerAddressResolver);
1112             shardInfo.setActiveMember(false);
1113             localShards.put(shardName, shardInfo);
1114             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1115         } else {
1116             removeShardOnFailure = false;
1117             shardInfo = existingShardInfo;
1118         }
1119
1120         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1121
1122         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1123         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1124                 response.getPrimaryPath(), shardInfo.getShardId());
1125
1126         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1127                 duration());
1128         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1129             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1130
1131         futureObj.onComplete(new OnComplete<Object>() {
1132             @Override
1133             public void onComplete(Throwable failure, Object addServerResponse) {
1134                 if (failure != null) {
1135                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1136                             response.getPrimaryPath(), shardName, failure);
1137
1138                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1139                             response.getPrimaryPath(), shardName);
1140                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1141                 } else {
1142                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1143                             response.getPrimaryPath(), removeShardOnFailure), sender);
1144                 }
1145             }
1146         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1147     }
1148
1149     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1150             boolean removeShardOnFailure) {
1151         shardReplicaOperationsInProgress.remove(shardName);
1152
1153         if(removeShardOnFailure) {
1154             ShardInformation shardInfo = localShards.remove(shardName);
1155             if (shardInfo.getActor() != null) {
1156                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1157             }
1158         }
1159
1160         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
1161             new RuntimeException(message, failure)), getSelf());
1162     }
1163
1164     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1165             String leaderPath, boolean removeShardOnFailure) {
1166         String shardName = shardInfo.getShardName();
1167         shardReplicaOperationsInProgress.remove(shardName);
1168
1169         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1170
1171         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1172             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1173
1174             // Make the local shard voting capable
1175             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1176             shardInfo.setActiveMember(true);
1177             persistShardList();
1178
1179             mBean.addLocalShard(shardInfo.getShardId().toString());
1180             sender.tell(new akka.actor.Status.Success(null), getSelf());
1181         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1182             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1183         } else {
1184             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1185                     persistenceId(), shardName, replyMsg.getStatus());
1186
1187             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1188
1189             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1190         }
1191     }
1192
1193     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1194                                                String leaderPath, ShardIdentifier shardId) {
1195         Exception failure;
1196         switch (serverChangeStatus) {
1197             case TIMEOUT:
1198                 failure = new TimeoutException(String.format(
1199                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1200                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1201                         leaderPath, shardId.getShardName()));
1202                 break;
1203             case NO_LEADER:
1204                 failure = createNoShardLeaderException(shardId);
1205                 break;
1206             case NOT_SUPPORTED:
1207                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1208                         serverChange.getSimpleName(), shardId.getShardName()));
1209                 break;
1210             default :
1211                 failure = new RuntimeException(String.format(
1212                         "%s request to leader %s for shard %s failed with status %s",
1213                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1214         }
1215         return failure;
1216     }
1217
1218     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1219         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1220
1221         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1222                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1223             @Override
1224             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1225                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1226             }
1227
1228             @Override
1229             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1230                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1231             }
1232         });
1233     }
1234
1235     private void persistShardList() {
1236         List<String> shardList = new ArrayList<>(localShards.keySet());
1237         for (ShardInformation shardInfo : localShards.values()) {
1238             if (!shardInfo.isActiveMember()) {
1239                 shardList.remove(shardInfo.getShardName());
1240             }
1241         }
1242         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1243         saveSnapshot(updateShardManagerSnapshot(shardList));
1244     }
1245
1246     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1247         currentSnapshot = new ShardManagerSnapshot(shardList);
1248         return currentSnapshot;
1249     }
1250
1251     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1252         currentSnapshot = snapshot;
1253
1254         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1255
1256         String currentMember = cluster.getCurrentMemberName();
1257         Set<String> configuredShardList =
1258             new HashSet<>(configuration.getMemberShardNames(currentMember));
1259         for (String shard : currentSnapshot.getShardList()) {
1260             if (!configuredShardList.contains(shard)) {
1261                 // add the current member as a replica for the shard
1262                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1263                 configuration.addMemberReplicaForShard(shard, currentMember);
1264             } else {
1265                 configuredShardList.remove(shard);
1266             }
1267         }
1268         for (String shard : configuredShardList) {
1269             // remove the member as a replica for the shard
1270             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1271             configuration.removeMemberReplicaForShard(shard, currentMember);
1272         }
1273     }
1274
1275     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1276         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1277             persistenceId());
1278         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1279             0, 0));
1280     }
1281
1282     private static class ForwardedAddServerReply {
1283         ShardInformation shardInfo;
1284         AddServerReply addServerReply;
1285         String leaderPath;
1286         boolean removeShardOnFailure;
1287
1288         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1289                 boolean removeShardOnFailure) {
1290             this.shardInfo = shardInfo;
1291             this.addServerReply = addServerReply;
1292             this.leaderPath = leaderPath;
1293             this.removeShardOnFailure = removeShardOnFailure;
1294         }
1295     }
1296
1297     private static class ForwardedAddServerFailure {
1298         String shardName;
1299         String failureMessage;
1300         Throwable failure;
1301         boolean removeShardOnFailure;
1302
1303         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1304                 boolean removeShardOnFailure) {
1305             this.shardName = shardName;
1306             this.failureMessage = failureMessage;
1307             this.failure = failure;
1308             this.removeShardOnFailure = removeShardOnFailure;
1309         }
1310     }
1311
1312     @VisibleForTesting
1313     protected static class ShardInformation {
1314         private final ShardIdentifier shardId;
1315         private final String shardName;
1316         private ActorRef actor;
1317         private final Map<String, String> initialPeerAddresses;
1318         private Optional<DataTree> localShardDataTree;
1319         private boolean leaderAvailable = false;
1320
1321         // flag that determines if the actor is ready for business
1322         private boolean actorInitialized = false;
1323
1324         private boolean followerSyncStatus = false;
1325
1326         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1327         private String role ;
1328         private String leaderId;
1329         private short leaderVersion;
1330
1331         private DatastoreContext datastoreContext;
1332         private Shard.AbstractBuilder<?, ?> builder;
1333         private final ShardPeerAddressResolver addressResolver;
1334         private boolean isActiveMember = true;
1335
1336         private ShardInformation(String shardName, ShardIdentifier shardId,
1337                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1338                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1339             this.shardName = shardName;
1340             this.shardId = shardId;
1341             this.initialPeerAddresses = initialPeerAddresses;
1342             this.datastoreContext = datastoreContext;
1343             this.builder = builder;
1344             this.addressResolver = addressResolver;
1345         }
1346
1347         Props newProps(SchemaContext schemaContext) {
1348             Preconditions.checkNotNull(builder);
1349             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1350                     schemaContext(schemaContext).props();
1351             builder = null;
1352             return props;
1353         }
1354
1355         String getShardName() {
1356             return shardName;
1357         }
1358
1359         @Nullable
1360         ActorRef getActor(){
1361             return actor;
1362         }
1363
1364         void setActor(ActorRef actor) {
1365             this.actor = actor;
1366         }
1367
1368         ShardIdentifier getShardId() {
1369             return shardId;
1370         }
1371
1372         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1373             this.localShardDataTree = localShardDataTree;
1374         }
1375
1376         Optional<DataTree> getLocalShardDataTree() {
1377             return localShardDataTree;
1378         }
1379
1380         DatastoreContext getDatastoreContext() {
1381             return datastoreContext;
1382         }
1383
1384         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1385             this.datastoreContext = datastoreContext;
1386             if (actor != null) {
1387                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1388                 actor.tell(this.datastoreContext, sender);
1389             }
1390         }
1391
1392         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1393             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1394
1395             if(actor != null) {
1396                 if(LOG.isDebugEnabled()) {
1397                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1398                             peerId, peerAddress, actor.path());
1399                 }
1400
1401                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1402             }
1403
1404             notifyOnShardInitializedCallbacks();
1405         }
1406
1407         void peerDown(String memberName, String peerId, ActorRef sender) {
1408             if(actor != null) {
1409                 actor.tell(new PeerDown(memberName, peerId), sender);
1410             }
1411         }
1412
1413         void peerUp(String memberName, String peerId, ActorRef sender) {
1414             if(actor != null) {
1415                 actor.tell(new PeerUp(memberName, peerId), sender);
1416             }
1417         }
1418
1419         boolean isShardReady() {
1420             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1421         }
1422
1423         boolean isShardReadyWithLeaderId() {
1424             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1425                     (isLeader() || addressResolver.resolve(leaderId) != null);
1426         }
1427
1428         boolean isShardInitialized() {
1429             return getActor() != null && actorInitialized;
1430         }
1431
1432         boolean isLeader() {
1433             return Objects.equal(leaderId, shardId.toString());
1434         }
1435
1436         String getSerializedLeaderActor() {
1437             if(isLeader()) {
1438                 return Serialization.serializedActorPath(getActor());
1439             } else {
1440                 return addressResolver.resolve(leaderId);
1441             }
1442         }
1443
1444         void setActorInitialized() {
1445             LOG.debug("Shard {} is initialized", shardId);
1446
1447             this.actorInitialized = true;
1448
1449             notifyOnShardInitializedCallbacks();
1450         }
1451
1452         private void notifyOnShardInitializedCallbacks() {
1453             if(onShardInitializedSet.isEmpty()) {
1454                 return;
1455             }
1456
1457             boolean ready = isShardReadyWithLeaderId();
1458
1459             if(LOG.isDebugEnabled()) {
1460                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1461                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1462             }
1463
1464             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1465             while(iter.hasNext()) {
1466                 OnShardInitialized onShardInitialized = iter.next();
1467                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1468                     iter.remove();
1469                     onShardInitialized.getTimeoutSchedule().cancel();
1470                     onShardInitialized.getReplyRunnable().run();
1471                 }
1472             }
1473         }
1474
1475         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1476             onShardInitializedSet.add(onShardInitialized);
1477         }
1478
1479         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1480             onShardInitializedSet.remove(onShardInitialized);
1481         }
1482
1483         void setRole(String newRole) {
1484             this.role = newRole;
1485
1486             notifyOnShardInitializedCallbacks();
1487         }
1488
1489         void setFollowerSyncStatus(boolean syncStatus){
1490             this.followerSyncStatus = syncStatus;
1491         }
1492
1493         boolean isInSync(){
1494             if(RaftState.Follower.name().equals(this.role)){
1495                 return followerSyncStatus;
1496             } else if(RaftState.Leader.name().equals(this.role)){
1497                 return true;
1498             }
1499
1500             return false;
1501         }
1502
1503         boolean setLeaderId(String leaderId) {
1504             boolean changed = !Objects.equal(this.leaderId, leaderId);
1505             this.leaderId = leaderId;
1506             if(leaderId != null) {
1507                 this.leaderAvailable = true;
1508             }
1509             notifyOnShardInitializedCallbacks();
1510
1511             return changed;
1512         }
1513
1514         String getLeaderId() {
1515             return leaderId;
1516         }
1517
1518         void setLeaderAvailable(boolean leaderAvailable) {
1519             this.leaderAvailable = leaderAvailable;
1520
1521             if(leaderAvailable) {
1522                 notifyOnShardInitializedCallbacks();
1523             }
1524         }
1525
1526         short getLeaderVersion() {
1527             return leaderVersion;
1528         }
1529
1530         void setLeaderVersion(short leaderVersion) {
1531             this.leaderVersion = leaderVersion;
1532         }
1533
1534         boolean isActiveMember() {
1535             return isActiveMember;
1536         }
1537
1538         void setActiveMember(boolean isActiveMember) {
1539             this.isActiveMember = isActiveMember;
1540         }
1541     }
1542
1543     private static class OnShardInitialized {
1544         private final Runnable replyRunnable;
1545         private Cancellable timeoutSchedule;
1546
1547         OnShardInitialized(Runnable replyRunnable) {
1548             this.replyRunnable = replyRunnable;
1549         }
1550
1551         Runnable getReplyRunnable() {
1552             return replyRunnable;
1553         }
1554
1555         Cancellable getTimeoutSchedule() {
1556             return timeoutSchedule;
1557         }
1558
1559         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1560             this.timeoutSchedule = timeoutSchedule;
1561         }
1562     }
1563
1564     private static class OnShardReady extends OnShardInitialized {
1565         OnShardReady(Runnable replyRunnable) {
1566             super(replyRunnable);
1567         }
1568     }
1569
1570     private static class ShardNotInitializedTimeout {
1571         private final ActorRef sender;
1572         private final ShardInformation shardInfo;
1573         private final OnShardInitialized onShardInitialized;
1574
1575         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1576             this.sender = sender;
1577             this.shardInfo = shardInfo;
1578             this.onShardInitialized = onShardInitialized;
1579         }
1580
1581         ActorRef getSender() {
1582             return sender;
1583         }
1584
1585         ShardInformation getShardInfo() {
1586             return shardInfo;
1587         }
1588
1589         OnShardInitialized getOnShardInitialized() {
1590             return onShardInitialized;
1591         }
1592     }
1593
1594     /**
1595      * We no longer persist SchemaContextModules but keep this class around for now for backwards
1596      * compatibility so we don't get de-serialization failures on upgrade from Helium.
1597      */
1598     @Deprecated
1599     static class SchemaContextModules implements Serializable {
1600         private static final long serialVersionUID = -8884620101025936590L;
1601
1602         private final Set<String> modules;
1603
1604         SchemaContextModules(Set<String> modules){
1605             this.modules = modules;
1606         }
1607
1608         public Set<String> getModules() {
1609             return modules;
1610         }
1611     }
1612
1613     public static Builder builder() {
1614         return new Builder();
1615     }
1616
1617     public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
1618         private ClusterWrapper cluster;
1619         private Configuration configuration;
1620         private DatastoreContextFactory datastoreContextFactory;
1621         private CountDownLatch waitTillReadyCountdownLatch;
1622         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1623         private DatastoreSnapshot restoreFromSnapshot;
1624         private volatile boolean sealed;
1625
1626         @SuppressWarnings("unchecked")
1627         private T self() {
1628             return (T) this;
1629         }
1630
1631         protected void checkSealed() {
1632             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1633         }
1634
1635         public T cluster(ClusterWrapper cluster) {
1636             checkSealed();
1637             this.cluster = cluster;
1638             return self();
1639         }
1640
1641         public T configuration(Configuration configuration) {
1642             checkSealed();
1643             this.configuration = configuration;
1644             return self();
1645         }
1646
1647         public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1648             checkSealed();
1649             this.datastoreContextFactory = datastoreContextFactory;
1650             return self();
1651         }
1652
1653         public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1654             checkSealed();
1655             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1656             return self();
1657         }
1658
1659         public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1660             checkSealed();
1661             this.primaryShardInfoCache = primaryShardInfoCache;
1662             return self();
1663         }
1664
1665         public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1666             checkSealed();
1667             this.restoreFromSnapshot = restoreFromSnapshot;
1668             return self();
1669         }
1670
1671         protected void verify() {
1672             sealed = true;
1673             Preconditions.checkNotNull(cluster, "cluster should not be null");
1674             Preconditions.checkNotNull(configuration, "configuration should not be null");
1675             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1676             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1677             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1678         }
1679
1680         public Props props() {
1681             verify();
1682             return Props.create(ShardManager.class, this);
1683         }
1684     }
1685
1686     public static class Builder extends AbstractBuilder<Builder> {
1687     }
1688
1689     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1690         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1691                 getShardInitializationTimeout().duration().$times(2));
1692
1693
1694         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1695         futureObj.onComplete(new OnComplete<Object>() {
1696             @Override
1697             public void onComplete(Throwable failure, Object response) {
1698                 if (failure != null) {
1699                     handler.onFailure(failure);
1700                 } else {
1701                     if(response instanceof RemotePrimaryShardFound) {
1702                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1703                     } else if(response instanceof LocalPrimaryShardFound) {
1704                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1705                     } else {
1706                         handler.onUnknownResponse(response);
1707                     }
1708                 }
1709             }
1710         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1711     }
1712
1713     /**
1714      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1715      * a remote or local find primary message is processed
1716      */
1717     private static interface FindPrimaryResponseHandler {
1718         /**
1719          * Invoked when a Failure message is received as a response
1720          *
1721          * @param failure
1722          */
1723         void onFailure(Throwable failure);
1724
1725         /**
1726          * Invoked when a RemotePrimaryShardFound response is received
1727          *
1728          * @param response
1729          */
1730         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1731
1732         /**
1733          * Invoked when a LocalPrimaryShardFound response is received
1734          * @param response
1735          */
1736         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1737
1738         /**
1739          * Invoked when an unknown response is received. This is another type of failure.
1740          *
1741          * @param response
1742          */
1743         void onUnknownResponse(Object response);
1744     }
1745
1746     /**
1747      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1748      * replica and sends a wrapped Failure response to some targetActor
1749      */
1750     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1751         private final ActorRef targetActor;
1752         private final String shardName;
1753         private final String persistenceId;
1754         private final ActorRef shardManagerActor;
1755
1756         /**
1757          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1758          * @param shardName The name of the shard for which the primary replica had to be found
1759          * @param persistenceId The persistenceId for the ShardManager
1760          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1761          */
1762         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1763             this.targetActor = Preconditions.checkNotNull(targetActor);
1764             this.shardName = Preconditions.checkNotNull(shardName);
1765             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1766             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1767         }
1768
1769         public ActorRef getTargetActor() {
1770             return targetActor;
1771         }
1772
1773         public String getShardName() {
1774             return shardName;
1775         }
1776
1777         @Override
1778         public void onFailure(Throwable failure) {
1779             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1780             targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1781                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1782         }
1783
1784         @Override
1785         public void onUnknownResponse(Object response) {
1786             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1787                     shardName, response);
1788             LOG.debug ("{}: {}", persistenceId, msg);
1789             targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1790                     new RuntimeException(msg)), shardManagerActor);
1791         }
1792     }
1793
1794
1795     /**
1796      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1797      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1798      * as a successful response to find primary.
1799      */
1800     private static class PrimaryShardFoundForContext {
1801         private final String shardName;
1802         private final Object contextMessage;
1803         private final RemotePrimaryShardFound remotePrimaryShardFound;
1804         private final LocalPrimaryShardFound localPrimaryShardFound;
1805
1806         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
1807                 @Nonnull Object primaryFoundMessage) {
1808             this.shardName = Preconditions.checkNotNull(shardName);
1809             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1810             Preconditions.checkNotNull(primaryFoundMessage);
1811             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
1812                     (RemotePrimaryShardFound) primaryFoundMessage : null;
1813             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
1814                     (LocalPrimaryShardFound) primaryFoundMessage : null;
1815         }
1816
1817         @Nonnull
1818         String getPrimaryPath(){
1819             if(remotePrimaryShardFound != null) {
1820                 return remotePrimaryShardFound.getPrimaryPath();
1821             }
1822             return localPrimaryShardFound.getPrimaryPath();
1823         }
1824
1825         @Nonnull
1826         Object getContextMessage() {
1827             return contextMessage;
1828         }
1829
1830         @Nullable
1831         RemotePrimaryShardFound getRemotePrimaryShardFound() {
1832             return remotePrimaryShardFound;
1833         }
1834
1835         @Nonnull
1836         String getShardName() {
1837             return shardName;
1838         }
1839     }
1840
1841     /**
1842      * The WrappedShardResponse class wraps a response from a Shard.
1843      */
1844     private static class WrappedShardResponse {
1845         private final ShardIdentifier shardId;
1846         private final Object response;
1847         private final String leaderPath;
1848
1849         private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1850             this.shardId = shardId;
1851             this.response = response;
1852             this.leaderPath = leaderPath;
1853         }
1854
1855         ShardIdentifier getShardId() {
1856             return shardId;
1857         }
1858
1859         Object getResponse() {
1860             return response;
1861         }
1862
1863         String getLeaderPath() {
1864             return leaderPath;
1865         }
1866     }
1867 }
1868
1869
1870