Move ShardManager into its own package
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.Futures;
22 import akka.dispatch.OnComplete;
23 import akka.japi.Function;
24 import akka.pattern.Patterns;
25 import akka.persistence.RecoveryCompleted;
26 import akka.persistence.SaveSnapshotFailure;
27 import akka.persistence.SaveSnapshotSuccess;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.serialization.Serialization;
31 import akka.util.Timeout;
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.base.Objects;
34 import com.google.common.base.Optional;
35 import com.google.common.base.Preconditions;
36 import com.google.common.base.Strings;
37 import com.google.common.base.Supplier;
38 import com.google.common.collect.Sets;
39 import java.io.ByteArrayInputStream;
40 import java.io.ObjectInputStream;
41 import java.util.ArrayList;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.HashMap;
45 import java.util.HashSet;
46 import java.util.Iterator;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.Set;
50 import java.util.concurrent.CountDownLatch;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import javax.annotation.Nonnull;
54 import javax.annotation.Nullable;
55 import org.apache.commons.lang3.SerializationUtils;
56 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
57 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
58 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
59 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
60 import org.opendaylight.controller.cluster.datastore.Shard;
61 import org.opendaylight.controller.cluster.datastore.ShardManagerSnapshot;
62 import org.opendaylight.controller.cluster.datastore.config.Configuration;
63 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
64 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
65 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
66 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
67 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
68 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
69 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
70 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
71 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
72 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
73 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
74 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
75 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
77 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
78 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
79 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
80 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
81 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
82 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
83 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
84 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
85 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
86 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
87 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
88 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
89 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
90 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
91 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
92 import org.opendaylight.controller.cluster.raft.RaftState;
93 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
94 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
95 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
96 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
97 import org.opendaylight.controller.cluster.raft.messages.AddServer;
98 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
99 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
100 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
101 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
102 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
103 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
104 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
105 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
106 import org.slf4j.Logger;
107 import org.slf4j.LoggerFactory;
108 import scala.concurrent.ExecutionContext;
109 import scala.concurrent.Future;
110 import scala.concurrent.duration.Duration;
111 import scala.concurrent.duration.FiniteDuration;
112
113 /**
114  * The ShardManager has the following jobs,
115  * <ul>
116  * <li> Create all the local shard replicas that belong on this cluster member
117  * <li> Find the address of the local shard
118  * <li> Find the primary replica for any given shard
119  * <li> Monitor the cluster members and store their addresses
120  * <ul>
121  */
122 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
123
124     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
125
126     // Stores a mapping between a shard name and it's corresponding information
127     // Shard names look like inventory, topology etc and are as specified in
128     // configuration
129     private final Map<String, ShardInformation> localShards = new HashMap<>();
130
131     // The type of a ShardManager reflects the type of the datastore itself
132     // A data store could be of type config/operational
133     private final String type;
134
135     private final ClusterWrapper cluster;
136
137     private final Configuration configuration;
138
139     private final String shardDispatcherPath;
140
141     private final ShardManagerInfo mBean;
142
143     private DatastoreContextFactory datastoreContextFactory;
144
145     private final CountDownLatch waitTillReadyCountdownLatch;
146
147     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
148
149     private final ShardPeerAddressResolver peerAddressResolver;
150
151     private SchemaContext schemaContext;
152
153     private DatastoreSnapshot restoreFromSnapshot;
154
155     private ShardManagerSnapshot currentSnapshot;
156
157     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
158
159     private final String persistenceId;
160
161     /**
162      */
163     protected ShardManager(AbstractBuilder<?> builder) {
164
165         this.cluster = builder.cluster;
166         this.configuration = builder.configuration;
167         this.datastoreContextFactory = builder.datastoreContextFactory;
168         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
169         this.shardDispatcherPath =
170                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
171         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
172         this.primaryShardInfoCache = builder.primaryShardInfoCache;
173         this.restoreFromSnapshot = builder.restoreFromSnapshot;
174
175         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
176         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
177
178         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
179
180         // Subscribe this actor to cluster member events
181         cluster.subscribeToMemberEvents(getSelf());
182
183         List<String> localShardActorNames = new ArrayList<>();
184         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
185                 "shard-manager-" + this.type,
186                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
187                 localShardActorNames);
188         mBean.setShardManager(this);
189     }
190
191     @Override
192     public void postStop() {
193         LOG.info("Stopping ShardManager {}", persistenceId());
194
195         mBean.unregisterMBean();
196     }
197
198     @Override
199     public void handleCommand(Object message) throws Exception {
200         if (message  instanceof FindPrimary) {
201             findPrimary((FindPrimary)message);
202         } else if(message instanceof FindLocalShard){
203             findLocalShard((FindLocalShard) message);
204         } else if (message instanceof UpdateSchemaContext) {
205             updateSchemaContext(message);
206         } else if(message instanceof ActorInitialized) {
207             onActorInitialized(message);
208         } else if (message instanceof ClusterEvent.MemberUp){
209             memberUp((ClusterEvent.MemberUp) message);
210         } else if (message instanceof ClusterEvent.MemberExited){
211             memberExited((ClusterEvent.MemberExited) message);
212         } else if(message instanceof ClusterEvent.MemberRemoved) {
213             memberRemoved((ClusterEvent.MemberRemoved) message);
214         } else if(message instanceof ClusterEvent.UnreachableMember) {
215             memberUnreachable((ClusterEvent.UnreachableMember)message);
216         } else if(message instanceof ClusterEvent.ReachableMember) {
217             memberReachable((ClusterEvent.ReachableMember) message);
218         } else if(message instanceof DatastoreContextFactory) {
219             onDatastoreContextFactory((DatastoreContextFactory)message);
220         } else if(message instanceof RoleChangeNotification) {
221             onRoleChangeNotification((RoleChangeNotification) message);
222         } else if(message instanceof FollowerInitialSyncUpStatus){
223             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
224         } else if(message instanceof ShardNotInitializedTimeout) {
225             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
226         } else if(message instanceof ShardLeaderStateChanged) {
227             onLeaderStateChanged((ShardLeaderStateChanged) message);
228         } else if(message instanceof SwitchShardBehavior){
229             onSwitchShardBehavior((SwitchShardBehavior) message);
230         } else if(message instanceof CreateShard) {
231             onCreateShard((CreateShard)message);
232         } else if(message instanceof AddShardReplica){
233             onAddShardReplica((AddShardReplica)message);
234         } else if(message instanceof ForwardedAddServerReply) {
235             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
236             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
237                     msg.removeShardOnFailure);
238         } else if(message instanceof ForwardedAddServerFailure) {
239             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
240             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
241         } else if(message instanceof PrimaryShardFoundForContext) {
242             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
243             onPrimaryShardFoundContext(primaryShardFoundContext);
244         } else if(message instanceof RemoveShardReplica) {
245             onRemoveShardReplica((RemoveShardReplica) message);
246         } else if(message instanceof WrappedShardResponse){
247             onWrappedShardResponse((WrappedShardResponse) message);
248         } else if(message instanceof GetSnapshot) {
249             onGetSnapshot();
250         } else if(message instanceof ServerRemoved){
251             onShardReplicaRemoved((ServerRemoved) message);
252         } else if(message instanceof SaveSnapshotSuccess) {
253             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
254         } else if(message instanceof SaveSnapshotFailure) {
255             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
256                     persistenceId(), ((SaveSnapshotFailure) message).cause());
257         } else if(message instanceof Shutdown) {
258             onShutDown();
259         } else {
260             unknownMessage(message);
261         }
262     }
263
264     private void onShutDown() {
265         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
266         for (ShardInformation info : localShards.values()) {
267             if (info.getActor() != null) {
268                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
269
270                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
271                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
272             }
273         }
274
275         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
276
277         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
278         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
279
280         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
281             @Override
282             public void onComplete(Throwable failure, Iterable<Boolean> results) {
283                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
284
285                 self().tell(PoisonPill.getInstance(), self());
286
287                 if(failure != null) {
288                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
289                 } else {
290                     int nfailed = 0;
291                     for(Boolean r: results) {
292                         if(!r) {
293                             nfailed++;
294                         }
295                     }
296
297                     if(nfailed > 0) {
298                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
299                     }
300                 }
301             }
302         }, dispatcher);
303     }
304
305     private void onWrappedShardResponse(WrappedShardResponse message) {
306         if (message.getResponse() instanceof RemoveServerReply) {
307             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
308                     message.getLeaderPath());
309         }
310     }
311
312     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
313             String leaderPath) {
314         shardReplicaOperationsInProgress.remove(shardId);
315
316         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
317
318         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
319             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
320                     shardId.getShardName());
321             originalSender.tell(new akka.actor.Status.Success(null), getSelf());
322         } else {
323             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
324                     persistenceId(), shardId, replyMsg.getStatus());
325
326             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
327                     leaderPath, shardId);
328             originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
329         }
330     }
331
332     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
333         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
334             addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
335                     getSender());
336         } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
337             removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
338                     primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
339         }
340     }
341
342     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
343             final ActorRef sender) {
344         if(isShardReplicaOperationInProgress(shardName, sender)) {
345             return;
346         }
347
348         shardReplicaOperationsInProgress.add(shardName);
349
350         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
351
352         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
353
354         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
355         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
356                 primaryPath, shardId);
357
358         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
359                 duration());
360         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
361                 new RemoveServer(shardId.toString()), removeServerTimeout);
362
363         futureObj.onComplete(new OnComplete<Object>() {
364             @Override
365             public void onComplete(Throwable failure, Object response) {
366                 if (failure != null) {
367                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
368                             primaryPath, shardName);
369
370                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
371
372                     // FAILURE
373                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
374                 } else {
375                     // SUCCESS
376                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
377                 }
378             }
379         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
380     }
381
382     private void onShardReplicaRemoved(ServerRemoved message) {
383         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
384         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
385         if(shardInformation == null) {
386             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
387             return;
388         } else if(shardInformation.getActor() != null) {
389             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
390             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
391         }
392         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
393         persistShardList();
394     }
395
396     private void onGetSnapshot() {
397         LOG.debug("{}: onGetSnapshot", persistenceId());
398
399         List<String> notInitialized = null;
400         for(ShardInformation shardInfo: localShards.values()) {
401             if(!shardInfo.isShardInitialized()) {
402                 if(notInitialized == null) {
403                     notInitialized = new ArrayList<>();
404                 }
405
406                 notInitialized.add(shardInfo.getShardName());
407             }
408         }
409
410         if(notInitialized != null) {
411             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
412                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
413             return;
414         }
415
416         byte[] shardManagerSnapshot = null;
417         if(currentSnapshot != null) {
418             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
419         }
420
421         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
422                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
423                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
424
425         for(ShardInformation shardInfo: localShards.values()) {
426             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
427         }
428     }
429
430     private void onCreateShard(CreateShard createShard) {
431         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
432
433         Object reply;
434         try {
435             String shardName = createShard.getModuleShardConfig().getShardName();
436             if(localShards.containsKey(shardName)) {
437                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
438                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
439             } else {
440                 doCreateShard(createShard);
441                 reply = new akka.actor.Status.Success(null);
442             }
443         } catch (Exception e) {
444             LOG.error("{}: onCreateShard failed", persistenceId(), e);
445             reply = new akka.actor.Status.Failure(e);
446         }
447
448         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
449             getSender().tell(reply, getSelf());
450         }
451     }
452
453     private void doCreateShard(CreateShard createShard) {
454         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
455         String shardName = moduleShardConfig.getShardName();
456
457         configuration.addModuleShardConfiguration(moduleShardConfig);
458
459         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
460         if(shardDatastoreContext == null) {
461             shardDatastoreContext = newShardDatastoreContext(shardName);
462         } else {
463             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
464                     peerAddressResolver).build();
465         }
466
467         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
468
469         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
470                 currentSnapshot.getShardList().contains(shardName);
471
472         Map<String, String> peerAddresses;
473         boolean isActiveMember;
474         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
475                 contains(cluster.getCurrentMemberName())) {
476             peerAddresses = getPeerAddresses(shardName);
477             isActiveMember = true;
478         } else {
479             // The local member is not in the static shard member configuration and the shard did not
480             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
481             // the shard with no peers and with elections disabled so it stays as follower. A
482             // subsequent AddServer request will be needed to make it an active member.
483             isActiveMember = false;
484             peerAddresses = Collections.emptyMap();
485             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
486                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
487         }
488
489         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
490                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
491                 isActiveMember);
492
493         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
494                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
495         info.setActiveMember(isActiveMember);
496         localShards.put(info.getShardName(), info);
497
498         mBean.addLocalShard(shardId.toString());
499
500         if(schemaContext != null) {
501             info.setActor(newShardActor(schemaContext, info));
502         }
503     }
504
505     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
506         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
507                 shardPeerAddressResolver(peerAddressResolver);
508     }
509
510     private DatastoreContext newShardDatastoreContext(String shardName) {
511         return newShardDatastoreContextBuilder(shardName).build();
512     }
513
514     private void checkReady(){
515         if (isReadyWithLeaderId()) {
516             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
517                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
518
519             waitTillReadyCountdownLatch.countDown();
520         }
521     }
522
523     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
524         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
525
526         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
527         if(shardInformation != null) {
528             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
529             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
530             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
531                 primaryShardInfoCache.remove(shardInformation.getShardName());
532             }
533
534             checkReady();
535         } else {
536             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
537         }
538     }
539
540     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
541         ShardInformation shardInfo = message.getShardInfo();
542
543         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
544                 shardInfo.getShardName());
545
546         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
547
548         if(!shardInfo.isShardInitialized()) {
549             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
550             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
551         } else {
552             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
553             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
554         }
555     }
556
557     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
558         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
559                 status.getName(), status.isInitialSyncDone());
560
561         ShardInformation shardInformation = findShardInformation(status.getName());
562
563         if(shardInformation != null) {
564             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
565
566             mBean.setSyncStatus(isInSync());
567         }
568
569     }
570
571     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
572         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
573                 roleChanged.getOldRole(), roleChanged.getNewRole());
574
575         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
576         if(shardInformation != null) {
577             shardInformation.setRole(roleChanged.getNewRole());
578             checkReady();
579             mBean.setSyncStatus(isInSync());
580         }
581     }
582
583
584     private ShardInformation findShardInformation(String memberId) {
585         for(ShardInformation info : localShards.values()){
586             if(info.getShardId().toString().equals(memberId)){
587                 return info;
588             }
589         }
590
591         return null;
592     }
593
594     private boolean isReadyWithLeaderId() {
595         boolean isReady = true;
596         for (ShardInformation info : localShards.values()) {
597             if(!info.isShardReadyWithLeaderId()){
598                 isReady = false;
599                 break;
600             }
601         }
602         return isReady;
603     }
604
605     private boolean isInSync(){
606         for (ShardInformation info : localShards.values()) {
607             if(!info.isInSync()){
608                 return false;
609             }
610         }
611         return true;
612     }
613
614     private void onActorInitialized(Object message) {
615         final ActorRef sender = getSender();
616
617         if (sender == null) {
618             return; //why is a non-actor sending this message? Just ignore.
619         }
620
621         String actorName = sender.path().name();
622         //find shard name from actor name; actor name is stringified shardId
623         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
624
625         if (shardId.getShardName() == null) {
626             return;
627         }
628
629         markShardAsInitialized(shardId.getShardName());
630     }
631
632     private void markShardAsInitialized(String shardName) {
633         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
634
635         ShardInformation shardInformation = localShards.get(shardName);
636         if (shardInformation != null) {
637             shardInformation.setActorInitialized();
638
639             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
640         }
641     }
642
643     @Override
644     protected void handleRecover(Object message) throws Exception {
645         if (message instanceof RecoveryCompleted) {
646             onRecoveryCompleted();
647         } else if (message instanceof SnapshotOffer) {
648             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
649         }
650     }
651
652     private void onRecoveryCompleted() {
653         LOG.info("Recovery complete : {}", persistenceId());
654
655         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
656         // journal on upgrade from Helium.
657         deleteMessages(lastSequenceNr());
658
659         if(currentSnapshot == null && restoreFromSnapshot != null &&
660                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
661             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
662                     restoreFromSnapshot.getShardManagerSnapshot()))) {
663                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
664
665                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
666
667                 applyShardManagerSnapshot(snapshot);
668             } catch(Exception e) {
669                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
670             }
671         }
672
673         createLocalShards();
674     }
675
676     private void findLocalShard(FindLocalShard message) {
677         final ShardInformation shardInformation = localShards.get(message.getShardName());
678
679         if(shardInformation == null){
680             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
681             return;
682         }
683
684         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
685             @Override
686             public Object get() {
687                 return new LocalShardFound(shardInformation.getActor());
688             }
689         });
690     }
691
692     private void sendResponse(ShardInformation shardInformation, boolean doWait,
693             boolean wantShardReady, final Supplier<Object> messageSupplier) {
694         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
695             if(doWait) {
696                 final ActorRef sender = getSender();
697                 final ActorRef self = self();
698
699                 Runnable replyRunnable = new Runnable() {
700                     @Override
701                     public void run() {
702                         sender.tell(messageSupplier.get(), self);
703                     }
704                 };
705
706                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
707                     new OnShardInitialized(replyRunnable);
708
709                 shardInformation.addOnShardInitialized(onShardInitialized);
710
711                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
712                 if(shardInformation.isShardInitialized()) {
713                     // If the shard is already initialized then we'll wait enough time for the shard to
714                     // elect a leader, ie 2 times the election timeout.
715                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
716                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
717                 }
718
719                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
720                         shardInformation.getShardName());
721
722                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
723                         timeout, getSelf(),
724                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
725                         getContext().dispatcher(), getSelf());
726
727                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
728
729             } else if (!shardInformation.isShardInitialized()) {
730                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
731                         shardInformation.getShardName());
732                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
733             } else {
734                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
735                         shardInformation.getShardName());
736                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
737             }
738
739             return;
740         }
741
742         getSender().tell(messageSupplier.get(), getSelf());
743     }
744
745     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
746         return new NoShardLeaderException(null, shardId.toString());
747     }
748
749     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
750         return new NotInitializedException(String.format(
751                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
752     }
753
754     private void memberRemoved(ClusterEvent.MemberRemoved message) {
755         String memberName = message.member().roles().iterator().next();
756
757         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
758                 message.member().address());
759
760         peerAddressResolver.removePeerAddress(memberName);
761
762         for(ShardInformation info : localShards.values()){
763             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
764         }
765     }
766
767     private void memberExited(ClusterEvent.MemberExited message) {
768         String memberName = message.member().roles().iterator().next();
769
770         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
771                 message.member().address());
772
773         peerAddressResolver.removePeerAddress(memberName);
774
775         for(ShardInformation info : localShards.values()){
776             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
777         }
778     }
779
780     private void memberUp(ClusterEvent.MemberUp message) {
781         String memberName = message.member().roles().iterator().next();
782
783         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
784                 message.member().address());
785
786         addPeerAddress(memberName, message.member().address());
787
788         checkReady();
789     }
790
791     private void addPeerAddress(String memberName, Address address) {
792         peerAddressResolver.addPeerAddress(memberName, address);
793
794         for(ShardInformation info : localShards.values()){
795             String shardName = info.getShardName();
796             String peerId = getShardIdentifier(memberName, shardName).toString();
797             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
798
799             info.peerUp(memberName, peerId, getSelf());
800         }
801     }
802
803     private void memberReachable(ClusterEvent.ReachableMember message) {
804         String memberName = message.member().roles().iterator().next();
805         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
806
807         addPeerAddress(memberName, message.member().address());
808
809         markMemberAvailable(memberName);
810     }
811
812     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
813         String memberName = message.member().roles().iterator().next();
814         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
815
816         markMemberUnavailable(memberName);
817     }
818
819     private void markMemberUnavailable(final String memberName) {
820         for(ShardInformation info : localShards.values()){
821             String leaderId = info.getLeaderId();
822             if(leaderId != null && leaderId.contains(memberName)) {
823                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
824                 info.setLeaderAvailable(false);
825
826                 primaryShardInfoCache.remove(info.getShardName());
827             }
828
829             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
830         }
831     }
832
833     private void markMemberAvailable(final String memberName) {
834         for(ShardInformation info : localShards.values()){
835             String leaderId = info.getLeaderId();
836             if(leaderId != null && leaderId.contains(memberName)) {
837                 LOG.debug("Marking Leader {} as available.", leaderId);
838                 info.setLeaderAvailable(true);
839             }
840
841             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
842         }
843     }
844
845     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
846         datastoreContextFactory = factory;
847         for (ShardInformation info : localShards.values()) {
848             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
849         }
850     }
851
852     private void onSwitchShardBehavior(SwitchShardBehavior message) {
853         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
854
855         ShardInformation shardInformation = localShards.get(identifier.getShardName());
856
857         if(shardInformation != null && shardInformation.getActor() != null) {
858             shardInformation.getActor().tell(
859                     new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
860         } else {
861             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
862                     message.getShardName(), message.getNewState());
863         }
864     }
865
866     /**
867      * Notifies all the local shards of a change in the schema context
868      *
869      * @param message
870      */
871     private void updateSchemaContext(final Object message) {
872         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
873
874         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
875
876         for (ShardInformation info : localShards.values()) {
877             if (info.getActor() == null) {
878                 LOG.debug("Creating Shard {}", info.getShardId());
879                 info.setActor(newShardActor(schemaContext, info));
880             } else {
881                 info.getActor().tell(message, getSelf());
882             }
883         }
884     }
885
886     @VisibleForTesting
887     protected ClusterWrapper getCluster() {
888         return cluster;
889     }
890
891     @VisibleForTesting
892     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
893         return getContext().actorOf(info.newProps(schemaContext)
894                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
895     }
896
897     private void findPrimary(FindPrimary message) {
898         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
899
900         final String shardName = message.getShardName();
901         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
902
903         // First see if the there is a local replica for the shard
904         final ShardInformation info = localShards.get(shardName);
905         if (info != null && info.isActiveMember()) {
906             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
907                 @Override
908                 public Object get() {
909                     String primaryPath = info.getSerializedLeaderActor();
910                     Object found = canReturnLocalShardState && info.isLeader() ?
911                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
912                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
913
914                             if(LOG.isDebugEnabled()) {
915                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
916                             }
917
918                             return found;
919                 }
920             });
921
922             return;
923         }
924
925         Collection<String> visitedAddresses;
926         if(message instanceof RemoteFindPrimary) {
927             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
928         } else {
929             visitedAddresses = new ArrayList<>();
930         }
931
932         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
933
934         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
935             if(visitedAddresses.contains(address)) {
936                 continue;
937             }
938
939             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
940                     persistenceId(), shardName, address, visitedAddresses);
941
942             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
943                     message.isWaitUntilReady(), visitedAddresses), getContext());
944             return;
945         }
946
947         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
948
949         getSender().tell(new PrimaryNotFoundException(
950                 String.format("No primary shard found for %s.", shardName)), getSelf());
951     }
952
953     /**
954      * Construct the name of the shard actor given the name of the member on
955      * which the shard resides and the name of the shard
956      *
957      * @param memberName
958      * @param shardName
959      * @return
960      */
961     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
962         return peerAddressResolver.getShardIdentifier(memberName, shardName);
963     }
964
965     /**
966      * Create shards that are local to the member on which the ShardManager
967      * runs
968      *
969      */
970     private void createLocalShards() {
971         String memberName = this.cluster.getCurrentMemberName();
972         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
973
974         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
975         if(restoreFromSnapshot != null)
976         {
977             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
978                 shardSnapshots.put(snapshot.getName(), snapshot);
979             }
980         }
981
982         restoreFromSnapshot = null; // null out to GC
983
984         for(String shardName : memberShardNames){
985             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
986
987             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
988
989             Map<String, String> peerAddresses = getPeerAddresses(shardName);
990             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
991                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
992                         shardSnapshots.get(shardName)), peerAddressResolver));
993             mBean.addLocalShard(shardId.toString());
994         }
995     }
996
997     /**
998      * Given the name of the shard find the addresses of all it's peers
999      *
1000      * @param shardName
1001      */
1002     private Map<String, String> getPeerAddresses(String shardName) {
1003         Collection<String> members = configuration.getMembersFromShardName(shardName);
1004         Map<String, String> peerAddresses = new HashMap<>();
1005
1006         String currentMemberName = this.cluster.getCurrentMemberName();
1007
1008         for(String memberName : members) {
1009             if(!currentMemberName.equals(memberName)) {
1010                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1011                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1012                 peerAddresses.put(shardId.toString(), address);
1013             }
1014         }
1015         return peerAddresses;
1016     }
1017
1018     @Override
1019     public SupervisorStrategy supervisorStrategy() {
1020
1021         return new OneForOneStrategy(10, Duration.create("1 minute"),
1022                 new Function<Throwable, SupervisorStrategy.Directive>() {
1023             @Override
1024             public SupervisorStrategy.Directive apply(Throwable t) {
1025                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1026                 return SupervisorStrategy.resume();
1027             }
1028         }
1029                 );
1030
1031     }
1032
1033     @Override
1034     public String persistenceId() {
1035         return persistenceId;
1036     }
1037
1038     @VisibleForTesting
1039     ShardManagerInfoMBean getMBean(){
1040         return mBean;
1041     }
1042
1043     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1044         if (shardReplicaOperationsInProgress.contains(shardName)) {
1045             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1046             LOG.debug ("{}: {}", persistenceId(), msg);
1047             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1048             return true;
1049         }
1050
1051         return false;
1052     }
1053
1054     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
1055         final String shardName = shardReplicaMsg.getShardName();
1056
1057         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1058
1059         // verify the shard with the specified name is present in the cluster configuration
1060         if (!(this.configuration.isShardConfigured(shardName))) {
1061             String msg = String.format("No module configuration exists for shard %s", shardName);
1062             LOG.debug ("{}: {}", persistenceId(), msg);
1063             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1064             return;
1065         }
1066
1067         // Create the localShard
1068         if (schemaContext == null) {
1069             String msg = String.format(
1070                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1071             LOG.debug ("{}: {}", persistenceId(), msg);
1072             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1073             return;
1074         }
1075
1076         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1077             @Override
1078             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1079                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1080             }
1081
1082             @Override
1083             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1084                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1085             }
1086
1087         });
1088     }
1089
1090     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1091         String msg = String.format("Local shard %s already exists", shardName);
1092         LOG.debug ("{}: {}", persistenceId(), msg);
1093         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
1094     }
1095
1096     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1097         if(isShardReplicaOperationInProgress(shardName, sender)) {
1098             return;
1099         }
1100
1101         shardReplicaOperationsInProgress.add(shardName);
1102
1103         final ShardInformation shardInfo;
1104         final boolean removeShardOnFailure;
1105         ShardInformation existingShardInfo = localShards.get(shardName);
1106         if(existingShardInfo == null) {
1107             removeShardOnFailure = true;
1108             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1109
1110             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1111                     DisableElectionsRaftPolicy.class.getName()).build();
1112
1113             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1114                     Shard.builder(), peerAddressResolver);
1115             shardInfo.setActiveMember(false);
1116             localShards.put(shardName, shardInfo);
1117             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1118         } else {
1119             removeShardOnFailure = false;
1120             shardInfo = existingShardInfo;
1121         }
1122
1123         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1124
1125         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1126         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1127                 response.getPrimaryPath(), shardInfo.getShardId());
1128
1129         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1130                 duration());
1131         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1132             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1133
1134         futureObj.onComplete(new OnComplete<Object>() {
1135             @Override
1136             public void onComplete(Throwable failure, Object addServerResponse) {
1137                 if (failure != null) {
1138                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1139                             response.getPrimaryPath(), shardName, failure);
1140
1141                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1142                             response.getPrimaryPath(), shardName);
1143                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1144                 } else {
1145                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1146                             response.getPrimaryPath(), removeShardOnFailure), sender);
1147                 }
1148             }
1149         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1150     }
1151
1152     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1153             boolean removeShardOnFailure) {
1154         shardReplicaOperationsInProgress.remove(shardName);
1155
1156         if(removeShardOnFailure) {
1157             ShardInformation shardInfo = localShards.remove(shardName);
1158             if (shardInfo.getActor() != null) {
1159                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1160             }
1161         }
1162
1163         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
1164             new RuntimeException(message, failure)), getSelf());
1165     }
1166
1167     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1168             String leaderPath, boolean removeShardOnFailure) {
1169         String shardName = shardInfo.getShardName();
1170         shardReplicaOperationsInProgress.remove(shardName);
1171
1172         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1173
1174         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1175             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1176
1177             // Make the local shard voting capable
1178             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1179             shardInfo.setActiveMember(true);
1180             persistShardList();
1181
1182             mBean.addLocalShard(shardInfo.getShardId().toString());
1183             sender.tell(new akka.actor.Status.Success(null), getSelf());
1184         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1185             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1186         } else {
1187             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1188                     persistenceId(), shardName, replyMsg.getStatus());
1189
1190             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1191
1192             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1193         }
1194     }
1195
1196     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1197                                                String leaderPath, ShardIdentifier shardId) {
1198         Exception failure;
1199         switch (serverChangeStatus) {
1200             case TIMEOUT:
1201                 failure = new TimeoutException(String.format(
1202                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1203                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1204                         leaderPath, shardId.getShardName()));
1205                 break;
1206             case NO_LEADER:
1207                 failure = createNoShardLeaderException(shardId);
1208                 break;
1209             case NOT_SUPPORTED:
1210                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1211                         serverChange.getSimpleName(), shardId.getShardName()));
1212                 break;
1213             default :
1214                 failure = new RuntimeException(String.format(
1215                         "%s request to leader %s for shard %s failed with status %s",
1216                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1217         }
1218         return failure;
1219     }
1220
1221     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1222         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1223
1224         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1225                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1226             @Override
1227             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1228                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1229             }
1230
1231             @Override
1232             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1233                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1234             }
1235         });
1236     }
1237
1238     private void persistShardList() {
1239         List<String> shardList = new ArrayList<>(localShards.keySet());
1240         for (ShardInformation shardInfo : localShards.values()) {
1241             if (!shardInfo.isActiveMember()) {
1242                 shardList.remove(shardInfo.getShardName());
1243             }
1244         }
1245         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1246         saveSnapshot(updateShardManagerSnapshot(shardList));
1247     }
1248
1249     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1250         currentSnapshot = new ShardManagerSnapshot(shardList);
1251         return currentSnapshot;
1252     }
1253
1254     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1255         currentSnapshot = snapshot;
1256
1257         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1258
1259         String currentMember = cluster.getCurrentMemberName();
1260         Set<String> configuredShardList =
1261             new HashSet<>(configuration.getMemberShardNames(currentMember));
1262         for (String shard : currentSnapshot.getShardList()) {
1263             if (!configuredShardList.contains(shard)) {
1264                 // add the current member as a replica for the shard
1265                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1266                 configuration.addMemberReplicaForShard(shard, currentMember);
1267             } else {
1268                 configuredShardList.remove(shard);
1269             }
1270         }
1271         for (String shard : configuredShardList) {
1272             // remove the member as a replica for the shard
1273             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1274             configuration.removeMemberReplicaForShard(shard, currentMember);
1275         }
1276     }
1277
1278     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1279         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1280             persistenceId());
1281         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1282             0, 0));
1283     }
1284
1285     private static class ForwardedAddServerReply {
1286         ShardInformation shardInfo;
1287         AddServerReply addServerReply;
1288         String leaderPath;
1289         boolean removeShardOnFailure;
1290
1291         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1292                 boolean removeShardOnFailure) {
1293             this.shardInfo = shardInfo;
1294             this.addServerReply = addServerReply;
1295             this.leaderPath = leaderPath;
1296             this.removeShardOnFailure = removeShardOnFailure;
1297         }
1298     }
1299
1300     private static class ForwardedAddServerFailure {
1301         String shardName;
1302         String failureMessage;
1303         Throwable failure;
1304         boolean removeShardOnFailure;
1305
1306         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1307                 boolean removeShardOnFailure) {
1308             this.shardName = shardName;
1309             this.failureMessage = failureMessage;
1310             this.failure = failure;
1311             this.removeShardOnFailure = removeShardOnFailure;
1312         }
1313     }
1314
1315     @VisibleForTesting
1316     protected static class ShardInformation {
1317         private final ShardIdentifier shardId;
1318         private final String shardName;
1319         private ActorRef actor;
1320         private final Map<String, String> initialPeerAddresses;
1321         private Optional<DataTree> localShardDataTree;
1322         private boolean leaderAvailable = false;
1323
1324         // flag that determines if the actor is ready for business
1325         private boolean actorInitialized = false;
1326
1327         private boolean followerSyncStatus = false;
1328
1329         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1330         private String role ;
1331         private String leaderId;
1332         private short leaderVersion;
1333
1334         private DatastoreContext datastoreContext;
1335         private Shard.AbstractBuilder<?, ?> builder;
1336         private final ShardPeerAddressResolver addressResolver;
1337         private boolean isActiveMember = true;
1338
1339         private ShardInformation(String shardName, ShardIdentifier shardId,
1340                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1341                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1342             this.shardName = shardName;
1343             this.shardId = shardId;
1344             this.initialPeerAddresses = initialPeerAddresses;
1345             this.datastoreContext = datastoreContext;
1346             this.builder = builder;
1347             this.addressResolver = addressResolver;
1348         }
1349
1350         Props newProps(SchemaContext schemaContext) {
1351             Preconditions.checkNotNull(builder);
1352             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1353                     schemaContext(schemaContext).props();
1354             builder = null;
1355             return props;
1356         }
1357
1358         String getShardName() {
1359             return shardName;
1360         }
1361
1362         @Nullable
1363         ActorRef getActor(){
1364             return actor;
1365         }
1366
1367         void setActor(ActorRef actor) {
1368             this.actor = actor;
1369         }
1370
1371         ShardIdentifier getShardId() {
1372             return shardId;
1373         }
1374
1375         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1376             this.localShardDataTree = localShardDataTree;
1377         }
1378
1379         Optional<DataTree> getLocalShardDataTree() {
1380             return localShardDataTree;
1381         }
1382
1383         DatastoreContext getDatastoreContext() {
1384             return datastoreContext;
1385         }
1386
1387         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1388             this.datastoreContext = datastoreContext;
1389             if (actor != null) {
1390                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1391                 actor.tell(this.datastoreContext, sender);
1392             }
1393         }
1394
1395         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1396             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1397
1398             if(actor != null) {
1399                 if(LOG.isDebugEnabled()) {
1400                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1401                             peerId, peerAddress, actor.path());
1402                 }
1403
1404                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1405             }
1406
1407             notifyOnShardInitializedCallbacks();
1408         }
1409
1410         void peerDown(String memberName, String peerId, ActorRef sender) {
1411             if(actor != null) {
1412                 actor.tell(new PeerDown(memberName, peerId), sender);
1413             }
1414         }
1415
1416         void peerUp(String memberName, String peerId, ActorRef sender) {
1417             if(actor != null) {
1418                 actor.tell(new PeerUp(memberName, peerId), sender);
1419             }
1420         }
1421
1422         boolean isShardReady() {
1423             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1424         }
1425
1426         boolean isShardReadyWithLeaderId() {
1427             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1428                     (isLeader() || addressResolver.resolve(leaderId) != null);
1429         }
1430
1431         boolean isShardInitialized() {
1432             return getActor() != null && actorInitialized;
1433         }
1434
1435         boolean isLeader() {
1436             return Objects.equal(leaderId, shardId.toString());
1437         }
1438
1439         String getSerializedLeaderActor() {
1440             if(isLeader()) {
1441                 return Serialization.serializedActorPath(getActor());
1442             } else {
1443                 return addressResolver.resolve(leaderId);
1444             }
1445         }
1446
1447         void setActorInitialized() {
1448             LOG.debug("Shard {} is initialized", shardId);
1449
1450             this.actorInitialized = true;
1451
1452             notifyOnShardInitializedCallbacks();
1453         }
1454
1455         private void notifyOnShardInitializedCallbacks() {
1456             if(onShardInitializedSet.isEmpty()) {
1457                 return;
1458             }
1459
1460             boolean ready = isShardReadyWithLeaderId();
1461
1462             if(LOG.isDebugEnabled()) {
1463                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1464                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1465             }
1466
1467             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1468             while(iter.hasNext()) {
1469                 OnShardInitialized onShardInitialized = iter.next();
1470                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1471                     iter.remove();
1472                     onShardInitialized.getTimeoutSchedule().cancel();
1473                     onShardInitialized.getReplyRunnable().run();
1474                 }
1475             }
1476         }
1477
1478         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1479             onShardInitializedSet.add(onShardInitialized);
1480         }
1481
1482         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1483             onShardInitializedSet.remove(onShardInitialized);
1484         }
1485
1486         void setRole(String newRole) {
1487             this.role = newRole;
1488
1489             notifyOnShardInitializedCallbacks();
1490         }
1491
1492         void setFollowerSyncStatus(boolean syncStatus){
1493             this.followerSyncStatus = syncStatus;
1494         }
1495
1496         boolean isInSync(){
1497             if(RaftState.Follower.name().equals(this.role)){
1498                 return followerSyncStatus;
1499             } else if(RaftState.Leader.name().equals(this.role)){
1500                 return true;
1501             }
1502
1503             return false;
1504         }
1505
1506         boolean setLeaderId(String leaderId) {
1507             boolean changed = !Objects.equal(this.leaderId, leaderId);
1508             this.leaderId = leaderId;
1509             if(leaderId != null) {
1510                 this.leaderAvailable = true;
1511             }
1512             notifyOnShardInitializedCallbacks();
1513
1514             return changed;
1515         }
1516
1517         String getLeaderId() {
1518             return leaderId;
1519         }
1520
1521         void setLeaderAvailable(boolean leaderAvailable) {
1522             this.leaderAvailable = leaderAvailable;
1523
1524             if(leaderAvailable) {
1525                 notifyOnShardInitializedCallbacks();
1526             }
1527         }
1528
1529         short getLeaderVersion() {
1530             return leaderVersion;
1531         }
1532
1533         void setLeaderVersion(short leaderVersion) {
1534             this.leaderVersion = leaderVersion;
1535         }
1536
1537         boolean isActiveMember() {
1538             return isActiveMember;
1539         }
1540
1541         void setActiveMember(boolean isActiveMember) {
1542             this.isActiveMember = isActiveMember;
1543         }
1544     }
1545
1546     private static class OnShardInitialized {
1547         private final Runnable replyRunnable;
1548         private Cancellable timeoutSchedule;
1549
1550         OnShardInitialized(Runnable replyRunnable) {
1551             this.replyRunnable = replyRunnable;
1552         }
1553
1554         Runnable getReplyRunnable() {
1555             return replyRunnable;
1556         }
1557
1558         Cancellable getTimeoutSchedule() {
1559             return timeoutSchedule;
1560         }
1561
1562         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1563             this.timeoutSchedule = timeoutSchedule;
1564         }
1565     }
1566
1567     private static class OnShardReady extends OnShardInitialized {
1568         OnShardReady(Runnable replyRunnable) {
1569             super(replyRunnable);
1570         }
1571     }
1572
1573     private static class ShardNotInitializedTimeout {
1574         private final ActorRef sender;
1575         private final ShardInformation shardInfo;
1576         private final OnShardInitialized onShardInitialized;
1577
1578         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1579             this.sender = sender;
1580             this.shardInfo = shardInfo;
1581             this.onShardInitialized = onShardInitialized;
1582         }
1583
1584         ActorRef getSender() {
1585             return sender;
1586         }
1587
1588         ShardInformation getShardInfo() {
1589             return shardInfo;
1590         }
1591
1592         OnShardInitialized getOnShardInitialized() {
1593             return onShardInitialized;
1594         }
1595     }
1596
1597     public static Builder builder() {
1598         return new Builder();
1599     }
1600
1601     public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
1602         private ClusterWrapper cluster;
1603         private Configuration configuration;
1604         private DatastoreContextFactory datastoreContextFactory;
1605         private CountDownLatch waitTillReadyCountdownLatch;
1606         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1607         private DatastoreSnapshot restoreFromSnapshot;
1608         private volatile boolean sealed;
1609
1610         @SuppressWarnings("unchecked")
1611         private T self() {
1612             return (T) this;
1613         }
1614
1615         protected void checkSealed() {
1616             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1617         }
1618
1619         public T cluster(ClusterWrapper cluster) {
1620             checkSealed();
1621             this.cluster = cluster;
1622             return self();
1623         }
1624
1625         public T configuration(Configuration configuration) {
1626             checkSealed();
1627             this.configuration = configuration;
1628             return self();
1629         }
1630
1631         public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1632             checkSealed();
1633             this.datastoreContextFactory = datastoreContextFactory;
1634             return self();
1635         }
1636
1637         public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1638             checkSealed();
1639             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1640             return self();
1641         }
1642
1643         public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1644             checkSealed();
1645             this.primaryShardInfoCache = primaryShardInfoCache;
1646             return self();
1647         }
1648
1649         public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1650             checkSealed();
1651             this.restoreFromSnapshot = restoreFromSnapshot;
1652             return self();
1653         }
1654
1655         protected void verify() {
1656             sealed = true;
1657             Preconditions.checkNotNull(cluster, "cluster should not be null");
1658             Preconditions.checkNotNull(configuration, "configuration should not be null");
1659             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1660             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1661             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1662         }
1663
1664         public Props props() {
1665             verify();
1666             return Props.create(ShardManager.class, this);
1667         }
1668     }
1669
1670     public static class Builder extends AbstractBuilder<Builder> {
1671     }
1672
1673     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1674         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1675                 getShardInitializationTimeout().duration().$times(2));
1676
1677
1678         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1679         futureObj.onComplete(new OnComplete<Object>() {
1680             @Override
1681             public void onComplete(Throwable failure, Object response) {
1682                 if (failure != null) {
1683                     handler.onFailure(failure);
1684                 } else {
1685                     if(response instanceof RemotePrimaryShardFound) {
1686                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1687                     } else if(response instanceof LocalPrimaryShardFound) {
1688                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1689                     } else {
1690                         handler.onUnknownResponse(response);
1691                     }
1692                 }
1693             }
1694         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1695     }
1696
1697     /**
1698      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1699      * a remote or local find primary message is processed
1700      */
1701     private static interface FindPrimaryResponseHandler {
1702         /**
1703          * Invoked when a Failure message is received as a response
1704          *
1705          * @param failure
1706          */
1707         void onFailure(Throwable failure);
1708
1709         /**
1710          * Invoked when a RemotePrimaryShardFound response is received
1711          *
1712          * @param response
1713          */
1714         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1715
1716         /**
1717          * Invoked when a LocalPrimaryShardFound response is received
1718          * @param response
1719          */
1720         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1721
1722         /**
1723          * Invoked when an unknown response is received. This is another type of failure.
1724          *
1725          * @param response
1726          */
1727         void onUnknownResponse(Object response);
1728     }
1729
1730     /**
1731      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1732      * replica and sends a wrapped Failure response to some targetActor
1733      */
1734     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1735         private final ActorRef targetActor;
1736         private final String shardName;
1737         private final String persistenceId;
1738         private final ActorRef shardManagerActor;
1739
1740         /**
1741          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1742          * @param shardName The name of the shard for which the primary replica had to be found
1743          * @param persistenceId The persistenceId for the ShardManager
1744          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1745          */
1746         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1747             this.targetActor = Preconditions.checkNotNull(targetActor);
1748             this.shardName = Preconditions.checkNotNull(shardName);
1749             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1750             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1751         }
1752
1753         public ActorRef getTargetActor() {
1754             return targetActor;
1755         }
1756
1757         public String getShardName() {
1758             return shardName;
1759         }
1760
1761         @Override
1762         public void onFailure(Throwable failure) {
1763             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1764             targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1765                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1766         }
1767
1768         @Override
1769         public void onUnknownResponse(Object response) {
1770             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1771                     shardName, response);
1772             LOG.debug ("{}: {}", persistenceId, msg);
1773             targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1774                     new RuntimeException(msg)), shardManagerActor);
1775         }
1776     }
1777
1778
1779     /**
1780      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1781      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1782      * as a successful response to find primary.
1783      */
1784     private static class PrimaryShardFoundForContext {
1785         private final String shardName;
1786         private final Object contextMessage;
1787         private final RemotePrimaryShardFound remotePrimaryShardFound;
1788         private final LocalPrimaryShardFound localPrimaryShardFound;
1789
1790         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
1791                 @Nonnull Object primaryFoundMessage) {
1792             this.shardName = Preconditions.checkNotNull(shardName);
1793             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1794             Preconditions.checkNotNull(primaryFoundMessage);
1795             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
1796                     (RemotePrimaryShardFound) primaryFoundMessage : null;
1797             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
1798                     (LocalPrimaryShardFound) primaryFoundMessage : null;
1799         }
1800
1801         @Nonnull
1802         String getPrimaryPath(){
1803             if(remotePrimaryShardFound != null) {
1804                 return remotePrimaryShardFound.getPrimaryPath();
1805             }
1806             return localPrimaryShardFound.getPrimaryPath();
1807         }
1808
1809         @Nonnull
1810         Object getContextMessage() {
1811             return contextMessage;
1812         }
1813
1814         @Nullable
1815         RemotePrimaryShardFound getRemotePrimaryShardFound() {
1816             return remotePrimaryShardFound;
1817         }
1818
1819         @Nonnull
1820         String getShardName() {
1821             return shardName;
1822         }
1823     }
1824
1825     /**
1826      * The WrappedShardResponse class wraps a response from a Shard.
1827      */
1828     private static class WrappedShardResponse {
1829         private final ShardIdentifier shardId;
1830         private final Object response;
1831         private final String leaderPath;
1832
1833         private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1834             this.shardId = shardId;
1835             this.response = response;
1836             this.leaderPath = leaderPath;
1837         }
1838
1839         ShardIdentifier getShardId() {
1840             return shardId;
1841         }
1842
1843         Object getResponse() {
1844             return response;
1845         }
1846
1847         String getLeaderPath() {
1848             return leaderPath;
1849         }
1850     }
1851 }
1852
1853
1854