Fix a typo in getDdatastoreContextFactory()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Status;
18 import akka.actor.SupervisorStrategy;
19 import akka.cluster.ClusterEvent;
20 import akka.dispatch.Futures;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Function;
23 import akka.pattern.Patterns;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotOffer;
28 import akka.persistence.SnapshotSelectionCriteria;
29 import akka.util.Timeout;
30 import com.google.common.annotations.VisibleForTesting;
31 import com.google.common.base.Preconditions;
32 import java.io.ByteArrayInputStream;
33 import java.io.ObjectInputStream;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.HashSet;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Set;
42 import java.util.concurrent.CountDownLatch;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.TimeoutException;
45 import java.util.function.Supplier;
46 import javax.annotation.Nonnull;
47 import javax.annotation.Nullable;
48 import org.apache.commons.lang3.SerializationUtils;
49 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
50 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
51 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
52 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
53 import org.opendaylight.controller.cluster.datastore.Shard;
54 import org.opendaylight.controller.cluster.datastore.config.Configuration;
55 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
56 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
58 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
59 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
60 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
61 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
62 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
66 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
67 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
68 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
69 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
70 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
71 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
72 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
73 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
74 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
75 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
76 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
77 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
78 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
79 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
80 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
81 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
82 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
83 import org.opendaylight.controller.cluster.raft.messages.AddServer;
84 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
85 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
86 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
87 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
88 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
89 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
90 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
91 import org.slf4j.Logger;
92 import org.slf4j.LoggerFactory;
93 import scala.concurrent.ExecutionContext;
94 import scala.concurrent.Future;
95 import scala.concurrent.duration.Duration;
96 import scala.concurrent.duration.FiniteDuration;
97
98 /**
99  * The ShardManager has the following jobs,
100  * <ul>
101  * <li> Create all the local shard replicas that belong on this cluster member
102  * <li> Find the address of the local shard
103  * <li> Find the primary replica for any given shard
104  * <li> Monitor the cluster members and store their addresses
105  * <ul>
106  */
107 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
108
109     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
110
111     // Stores a mapping between a shard name and it's corresponding information
112     // Shard names look like inventory, topology etc and are as specified in
113     // configuration
114     private final Map<String, ShardInformation> localShards = new HashMap<>();
115
116     // The type of a ShardManager reflects the type of the datastore itself
117     // A data store could be of type config/operational
118     private final String type;
119
120     private final ClusterWrapper cluster;
121
122     private final Configuration configuration;
123
124     private final String shardDispatcherPath;
125
126     private final ShardManagerInfo mBean;
127
128     private DatastoreContextFactory datastoreContextFactory;
129
130     private final CountDownLatch waitTillReadyCountdownLatch;
131
132     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
133
134     private final ShardPeerAddressResolver peerAddressResolver;
135
136     private SchemaContext schemaContext;
137
138     private DatastoreSnapshot restoreFromSnapshot;
139
140     private ShardManagerSnapshot currentSnapshot;
141
142     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
143
144     private final String persistenceId;
145
146     ShardManager(AbstractShardManagerCreator<?> builder) {
147         this.cluster = builder.getCluster();
148         this.configuration = builder.getConfiguration();
149         this.datastoreContextFactory = builder.getDatastoreContextFactory();
150         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
151         this.shardDispatcherPath =
152                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
153         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
154         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
155         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
156
157         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
158         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
159
160         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
161
162         // Subscribe this actor to cluster member events
163         cluster.subscribeToMemberEvents(getSelf());
164
165         mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
166                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
167         mBean.registerMBean();
168     }
169
170     @Override
171     public void postStop() {
172         LOG.info("Stopping ShardManager {}", persistenceId());
173
174         mBean.unregisterMBean();
175     }
176
177     @Override
178     public void handleCommand(Object message) throws Exception {
179         if (message  instanceof FindPrimary) {
180             findPrimary((FindPrimary)message);
181         } else if(message instanceof FindLocalShard){
182             findLocalShard((FindLocalShard) message);
183         } else if (message instanceof UpdateSchemaContext) {
184             updateSchemaContext(message);
185         } else if(message instanceof ActorInitialized) {
186             onActorInitialized(message);
187         } else if (message instanceof ClusterEvent.MemberUp){
188             memberUp((ClusterEvent.MemberUp) message);
189         } else if (message instanceof ClusterEvent.MemberExited){
190             memberExited((ClusterEvent.MemberExited) message);
191         } else if(message instanceof ClusterEvent.MemberRemoved) {
192             memberRemoved((ClusterEvent.MemberRemoved) message);
193         } else if(message instanceof ClusterEvent.UnreachableMember) {
194             memberUnreachable((ClusterEvent.UnreachableMember)message);
195         } else if(message instanceof ClusterEvent.ReachableMember) {
196             memberReachable((ClusterEvent.ReachableMember) message);
197         } else if(message instanceof DatastoreContextFactory) {
198             onDatastoreContextFactory((DatastoreContextFactory)message);
199         } else if(message instanceof RoleChangeNotification) {
200             onRoleChangeNotification((RoleChangeNotification) message);
201         } else if(message instanceof FollowerInitialSyncUpStatus){
202             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
203         } else if(message instanceof ShardNotInitializedTimeout) {
204             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
205         } else if(message instanceof ShardLeaderStateChanged) {
206             onLeaderStateChanged((ShardLeaderStateChanged) message);
207         } else if(message instanceof SwitchShardBehavior){
208             onSwitchShardBehavior((SwitchShardBehavior) message);
209         } else if(message instanceof CreateShard) {
210             onCreateShard((CreateShard)message);
211         } else if(message instanceof AddShardReplica){
212             onAddShardReplica((AddShardReplica)message);
213         } else if(message instanceof ForwardedAddServerReply) {
214             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
215             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
216                     msg.removeShardOnFailure);
217         } else if(message instanceof ForwardedAddServerFailure) {
218             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
219             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
220         } else if(message instanceof PrimaryShardFoundForContext) {
221             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
222             onPrimaryShardFoundContext(primaryShardFoundContext);
223         } else if(message instanceof RemoveShardReplica) {
224             onRemoveShardReplica((RemoveShardReplica) message);
225         } else if(message instanceof WrappedShardResponse){
226             onWrappedShardResponse((WrappedShardResponse) message);
227         } else if(message instanceof GetSnapshot) {
228             onGetSnapshot();
229         } else if(message instanceof ServerRemoved){
230             onShardReplicaRemoved((ServerRemoved) message);
231         } else if(message instanceof SaveSnapshotSuccess) {
232             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
233         } else if(message instanceof SaveSnapshotFailure) {
234             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
235                     persistenceId(), ((SaveSnapshotFailure) message).cause());
236         } else if(message instanceof Shutdown) {
237             onShutDown();
238         } else if (message instanceof GetLocalShardIds) {
239             onGetLocalShardIds();
240         } else {
241             unknownMessage(message);
242         }
243     }
244
245     private void onShutDown() {
246         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
247         for (ShardInformation info : localShards.values()) {
248             if (info.getActor() != null) {
249                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
250
251                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
252                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
253             }
254         }
255
256         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
257
258         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
259         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
260
261         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
262             @Override
263             public void onComplete(Throwable failure, Iterable<Boolean> results) {
264                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
265
266                 self().tell(PoisonPill.getInstance(), self());
267
268                 if(failure != null) {
269                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
270                 } else {
271                     int nfailed = 0;
272                     for(Boolean r: results) {
273                         if(!r) {
274                             nfailed++;
275                         }
276                     }
277
278                     if(nfailed > 0) {
279                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
280                     }
281                 }
282             }
283         }, dispatcher);
284     }
285
286     private void onWrappedShardResponse(WrappedShardResponse message) {
287         if (message.getResponse() instanceof RemoveServerReply) {
288             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
289                     message.getLeaderPath());
290         }
291     }
292
293     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
294             String leaderPath) {
295         shardReplicaOperationsInProgress.remove(shardId);
296
297         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
298
299         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
300             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
301                     shardId.getShardName());
302             originalSender.tell(new Status.Success(null), getSelf());
303         } else {
304             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
305                     persistenceId(), shardId, replyMsg.getStatus());
306
307             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
308             originalSender.tell(new Status.Failure(failure), getSelf());
309         }
310     }
311
312     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
313         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
314             addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
315                     getSender());
316         } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
317             removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
318                     primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
319         }
320     }
321
322     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
323             final ActorRef sender) {
324         if(isShardReplicaOperationInProgress(shardName, sender)) {
325             return;
326         }
327
328         shardReplicaOperationsInProgress.add(shardName);
329
330         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
331
332         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
333
334         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
335         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
336                 primaryPath, shardId);
337
338         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
339                 duration());
340         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
341                 new RemoveServer(shardId.toString()), removeServerTimeout);
342
343         futureObj.onComplete(new OnComplete<Object>() {
344             @Override
345             public void onComplete(Throwable failure, Object response) {
346                 if (failure != null) {
347                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
348                             primaryPath, shardName);
349
350                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
351
352                     // FAILURE
353                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
354                 } else {
355                     // SUCCESS
356                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
357                 }
358             }
359         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
360     }
361
362     private void onShardReplicaRemoved(ServerRemoved message) {
363         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
364         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
365         if(shardInformation == null) {
366             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
367             return;
368         } else if(shardInformation.getActor() != null) {
369             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
370             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
371         }
372         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
373         persistShardList();
374     }
375
376     private void onGetSnapshot() {
377         LOG.debug("{}: onGetSnapshot", persistenceId());
378
379         List<String> notInitialized = null;
380         for(ShardInformation shardInfo: localShards.values()) {
381             if(!shardInfo.isShardInitialized()) {
382                 if(notInitialized == null) {
383                     notInitialized = new ArrayList<>();
384                 }
385
386                 notInitialized.add(shardInfo.getShardName());
387             }
388         }
389
390         if(notInitialized != null) {
391             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
392                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
393             return;
394         }
395
396         byte[] shardManagerSnapshot = null;
397         if(currentSnapshot != null) {
398             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
399         }
400
401         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
402                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
403                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
404
405         for(ShardInformation shardInfo: localShards.values()) {
406             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
407         }
408     }
409
410     private void onCreateShard(CreateShard createShard) {
411         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
412
413         Object reply;
414         try {
415             String shardName = createShard.getModuleShardConfig().getShardName();
416             if(localShards.containsKey(shardName)) {
417                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
418                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
419             } else {
420                 doCreateShard(createShard);
421                 reply = new Status.Success(null);
422             }
423         } catch (Exception e) {
424             LOG.error("{}: onCreateShard failed", persistenceId(), e);
425             reply = new Status.Failure(e);
426         }
427
428         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
429             getSender().tell(reply, getSelf());
430         }
431     }
432
433     private void doCreateShard(CreateShard createShard) {
434         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
435         String shardName = moduleShardConfig.getShardName();
436
437         configuration.addModuleShardConfiguration(moduleShardConfig);
438
439         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
440         if(shardDatastoreContext == null) {
441             shardDatastoreContext = newShardDatastoreContext(shardName);
442         } else {
443             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
444                     peerAddressResolver).build();
445         }
446
447         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
448
449         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
450                 currentSnapshot.getShardList().contains(shardName);
451
452         Map<String, String> peerAddresses;
453         boolean isActiveMember;
454         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
455                 contains(cluster.getCurrentMemberName())) {
456             peerAddresses = getPeerAddresses(shardName);
457             isActiveMember = true;
458         } else {
459             // The local member is not in the static shard member configuration and the shard did not
460             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
461             // the shard with no peers and with elections disabled so it stays as follower. A
462             // subsequent AddServer request will be needed to make it an active member.
463             isActiveMember = false;
464             peerAddresses = Collections.emptyMap();
465             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
466                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
467         }
468
469         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
470                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
471                 isActiveMember);
472
473         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
474                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
475         info.setActiveMember(isActiveMember);
476         localShards.put(info.getShardName(), info);
477
478         if(schemaContext != null) {
479             info.setActor(newShardActor(schemaContext, info));
480         }
481     }
482
483     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
484         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
485                 shardPeerAddressResolver(peerAddressResolver);
486     }
487
488     private DatastoreContext newShardDatastoreContext(String shardName) {
489         return newShardDatastoreContextBuilder(shardName).build();
490     }
491
492     private void checkReady(){
493         if (isReadyWithLeaderId()) {
494             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
495                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
496
497             waitTillReadyCountdownLatch.countDown();
498         }
499     }
500
501     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
502         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
503
504         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
505         if(shardInformation != null) {
506             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
507             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
508             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
509                 primaryShardInfoCache.remove(shardInformation.getShardName());
510             }
511
512             checkReady();
513         } else {
514             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
515         }
516     }
517
518     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
519         ShardInformation shardInfo = message.getShardInfo();
520
521         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
522                 shardInfo.getShardName());
523
524         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
525
526         if(!shardInfo.isShardInitialized()) {
527             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
528             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
529         } else {
530             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
531             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
532         }
533     }
534
535     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
536         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
537                 status.getName(), status.isInitialSyncDone());
538
539         ShardInformation shardInformation = findShardInformation(status.getName());
540
541         if(shardInformation != null) {
542             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
543
544             mBean.setSyncStatus(isInSync());
545         }
546
547     }
548
549     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
550         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
551                 roleChanged.getOldRole(), roleChanged.getNewRole());
552
553         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
554         if(shardInformation != null) {
555             shardInformation.setRole(roleChanged.getNewRole());
556             checkReady();
557             mBean.setSyncStatus(isInSync());
558         }
559     }
560
561
562     private ShardInformation findShardInformation(String memberId) {
563         for(ShardInformation info : localShards.values()){
564             if(info.getShardId().toString().equals(memberId)){
565                 return info;
566             }
567         }
568
569         return null;
570     }
571
572     private boolean isReadyWithLeaderId() {
573         boolean isReady = true;
574         for (ShardInformation info : localShards.values()) {
575             if(!info.isShardReadyWithLeaderId()){
576                 isReady = false;
577                 break;
578             }
579         }
580         return isReady;
581     }
582
583     private boolean isInSync(){
584         for (ShardInformation info : localShards.values()) {
585             if(!info.isInSync()){
586                 return false;
587             }
588         }
589         return true;
590     }
591
592     private void onActorInitialized(Object message) {
593         final ActorRef sender = getSender();
594
595         if (sender == null) {
596             return; //why is a non-actor sending this message? Just ignore.
597         }
598
599         String actorName = sender.path().name();
600         //find shard name from actor name; actor name is stringified shardId
601         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
602
603         if (shardId.getShardName() == null) {
604             return;
605         }
606
607         markShardAsInitialized(shardId.getShardName());
608     }
609
610     private void markShardAsInitialized(String shardName) {
611         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
612
613         ShardInformation shardInformation = localShards.get(shardName);
614         if (shardInformation != null) {
615             shardInformation.setActorInitialized();
616
617             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
618         }
619     }
620
621     @Override
622     protected void handleRecover(Object message) throws Exception {
623         if (message instanceof RecoveryCompleted) {
624             onRecoveryCompleted();
625         } else if (message instanceof SnapshotOffer) {
626             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
627         }
628     }
629
630     private void onRecoveryCompleted() {
631         LOG.info("Recovery complete : {}", persistenceId());
632
633         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
634         // journal on upgrade from Helium.
635         deleteMessages(lastSequenceNr());
636
637         if(currentSnapshot == null && restoreFromSnapshot != null &&
638                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
639             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
640                     restoreFromSnapshot.getShardManagerSnapshot()))) {
641                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
642
643                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
644
645                 applyShardManagerSnapshot(snapshot);
646             } catch(Exception e) {
647                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
648             }
649         }
650
651         createLocalShards();
652     }
653
654     private void findLocalShard(FindLocalShard message) {
655         final ShardInformation shardInformation = localShards.get(message.getShardName());
656
657         if(shardInformation == null){
658             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
659             return;
660         }
661
662         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
663             @Override
664             public Object get() {
665                 return new LocalShardFound(shardInformation.getActor());
666             }
667         });
668     }
669
670     private void sendResponse(ShardInformation shardInformation, boolean doWait,
671             boolean wantShardReady, final Supplier<Object> messageSupplier) {
672         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
673             if(doWait) {
674                 final ActorRef sender = getSender();
675                 final ActorRef self = self();
676
677                 Runnable replyRunnable = new Runnable() {
678                     @Override
679                     public void run() {
680                         sender.tell(messageSupplier.get(), self);
681                     }
682                 };
683
684                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
685                     new OnShardInitialized(replyRunnable);
686
687                 shardInformation.addOnShardInitialized(onShardInitialized);
688
689                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
690                 if(shardInformation.isShardInitialized()) {
691                     // If the shard is already initialized then we'll wait enough time for the shard to
692                     // elect a leader, ie 2 times the election timeout.
693                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
694                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
695                 }
696
697                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
698                         shardInformation.getShardName());
699
700                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
701                         timeout, getSelf(),
702                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
703                         getContext().dispatcher(), getSelf());
704
705                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
706
707             } else if (!shardInformation.isShardInitialized()) {
708                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
709                         shardInformation.getShardName());
710                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
711             } else {
712                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
713                         shardInformation.getShardName());
714                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
715             }
716
717             return;
718         }
719
720         getSender().tell(messageSupplier.get(), getSelf());
721     }
722
723     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
724         return new NoShardLeaderException(null, shardId.toString());
725     }
726
727     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
728         return new NotInitializedException(String.format(
729                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
730     }
731
732     private void memberRemoved(ClusterEvent.MemberRemoved message) {
733         String memberName = message.member().roles().iterator().next();
734
735         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
736                 message.member().address());
737
738         peerAddressResolver.removePeerAddress(memberName);
739
740         for(ShardInformation info : localShards.values()){
741             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
742         }
743     }
744
745     private void memberExited(ClusterEvent.MemberExited message) {
746         String memberName = message.member().roles().iterator().next();
747
748         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
749                 message.member().address());
750
751         peerAddressResolver.removePeerAddress(memberName);
752
753         for(ShardInformation info : localShards.values()){
754             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
755         }
756     }
757
758     private void memberUp(ClusterEvent.MemberUp message) {
759         String memberName = message.member().roles().iterator().next();
760
761         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
762                 message.member().address());
763
764         addPeerAddress(memberName, message.member().address());
765
766         checkReady();
767     }
768
769     private void addPeerAddress(String memberName, Address address) {
770         peerAddressResolver.addPeerAddress(memberName, address);
771
772         for(ShardInformation info : localShards.values()){
773             String shardName = info.getShardName();
774             String peerId = getShardIdentifier(memberName, shardName).toString();
775             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
776
777             info.peerUp(memberName, peerId, getSelf());
778         }
779     }
780
781     private void memberReachable(ClusterEvent.ReachableMember message) {
782         String memberName = message.member().roles().iterator().next();
783         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
784
785         addPeerAddress(memberName, message.member().address());
786
787         markMemberAvailable(memberName);
788     }
789
790     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
791         String memberName = message.member().roles().iterator().next();
792         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
793
794         markMemberUnavailable(memberName);
795     }
796
797     private void markMemberUnavailable(final String memberName) {
798         for(ShardInformation info : localShards.values()){
799             String leaderId = info.getLeaderId();
800             if(leaderId != null && leaderId.contains(memberName)) {
801                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
802                 info.setLeaderAvailable(false);
803
804                 primaryShardInfoCache.remove(info.getShardName());
805             }
806
807             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
808         }
809     }
810
811     private void markMemberAvailable(final String memberName) {
812         for(ShardInformation info : localShards.values()){
813             String leaderId = info.getLeaderId();
814             if(leaderId != null && leaderId.contains(memberName)) {
815                 LOG.debug("Marking Leader {} as available.", leaderId);
816                 info.setLeaderAvailable(true);
817             }
818
819             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
820         }
821     }
822
823     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
824         datastoreContextFactory = factory;
825         for (ShardInformation info : localShards.values()) {
826             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
827         }
828     }
829
830     private void onGetLocalShardIds() {
831         final List<String> response = new ArrayList<>(localShards.size());
832
833         for (ShardInformation info : localShards.values()) {
834             response.add(info.getShardId().toString());
835         }
836
837         getSender().tell(new Status.Success(response), getSelf());
838     }
839
840     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
841         final ShardIdentifier identifier = message.getShardId();
842
843         if (identifier != null) {
844             final ShardInformation info = localShards.get(identifier.getShardName());
845             if (info == null) {
846                 getSender().tell(new Status.Failure(
847                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
848                 return;
849             }
850
851             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
852         } else {
853             for (ShardInformation info : localShards.values()) {
854                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
855             }
856         }
857
858         getSender().tell(new Status.Success(null), getSelf());
859     }
860
861     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
862         final ActorRef actor = info.getActor();
863         if (actor != null) {
864             actor.tell(switchBehavior, getSelf());
865           } else {
866             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
867                 info.getShardName(), switchBehavior.getNewState());
868         }
869     }
870
871     /**
872      * Notifies all the local shards of a change in the schema context
873      *
874      * @param message
875      */
876     private void updateSchemaContext(final Object message) {
877         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
878
879         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
880
881         for (ShardInformation info : localShards.values()) {
882             if (info.getActor() == null) {
883                 LOG.debug("Creating Shard {}", info.getShardId());
884                 info.setActor(newShardActor(schemaContext, info));
885             } else {
886                 info.getActor().tell(message, getSelf());
887             }
888         }
889     }
890
891     @VisibleForTesting
892     protected ClusterWrapper getCluster() {
893         return cluster;
894     }
895
896     @VisibleForTesting
897     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
898         return getContext().actorOf(info.newProps(schemaContext)
899                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
900     }
901
902     private void findPrimary(FindPrimary message) {
903         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
904
905         final String shardName = message.getShardName();
906         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
907
908         // First see if the there is a local replica for the shard
909         final ShardInformation info = localShards.get(shardName);
910         if (info != null && info.isActiveMember()) {
911             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
912                 @Override
913                 public Object get() {
914                     String primaryPath = info.getSerializedLeaderActor();
915                     Object found = canReturnLocalShardState && info.isLeader() ?
916                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
917                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
918
919                             if(LOG.isDebugEnabled()) {
920                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
921                             }
922
923                             return found;
924                 }
925             });
926
927             return;
928         }
929
930         final Collection<String> visitedAddresses;
931         if(message instanceof RemoteFindPrimary) {
932             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
933         } else {
934             visitedAddresses = new ArrayList<>(1);
935         }
936
937         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
938
939         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
940             if(visitedAddresses.contains(address)) {
941                 continue;
942             }
943
944             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
945                     persistenceId(), shardName, address, visitedAddresses);
946
947             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
948                     message.isWaitUntilReady(), visitedAddresses), getContext());
949             return;
950         }
951
952         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
953
954         getSender().tell(new PrimaryNotFoundException(
955                 String.format("No primary shard found for %s.", shardName)), getSelf());
956     }
957
958     /**
959      * Construct the name of the shard actor given the name of the member on
960      * which the shard resides and the name of the shard
961      *
962      * @param memberName
963      * @param shardName
964      * @return
965      */
966     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
967         return peerAddressResolver.getShardIdentifier(memberName, shardName);
968     }
969
970     /**
971      * Create shards that are local to the member on which the ShardManager
972      * runs
973      *
974      */
975     private void createLocalShards() {
976         String memberName = this.cluster.getCurrentMemberName();
977         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
978
979         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
980         if(restoreFromSnapshot != null)
981         {
982             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
983                 shardSnapshots.put(snapshot.getName(), snapshot);
984             }
985         }
986
987         restoreFromSnapshot = null; // null out to GC
988
989         for(String shardName : memberShardNames){
990             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
991
992             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
993
994             Map<String, String> peerAddresses = getPeerAddresses(shardName);
995             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
996                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
997                         shardSnapshots.get(shardName)), peerAddressResolver));
998         }
999     }
1000
1001     /**
1002      * Given the name of the shard find the addresses of all it's peers
1003      *
1004      * @param shardName
1005      */
1006     private Map<String, String> getPeerAddresses(String shardName) {
1007         Collection<String> members = configuration.getMembersFromShardName(shardName);
1008         Map<String, String> peerAddresses = new HashMap<>();
1009
1010         String currentMemberName = this.cluster.getCurrentMemberName();
1011
1012         for(String memberName : members) {
1013             if(!currentMemberName.equals(memberName)) {
1014                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1015                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1016                 peerAddresses.put(shardId.toString(), address);
1017             }
1018         }
1019         return peerAddresses;
1020     }
1021
1022     @Override
1023     public SupervisorStrategy supervisorStrategy() {
1024
1025         return new OneForOneStrategy(10, Duration.create("1 minute"),
1026                 new Function<Throwable, SupervisorStrategy.Directive>() {
1027             @Override
1028             public SupervisorStrategy.Directive apply(Throwable t) {
1029                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1030                 return SupervisorStrategy.resume();
1031             }
1032         }
1033                 );
1034
1035     }
1036
1037     @Override
1038     public String persistenceId() {
1039         return persistenceId;
1040     }
1041
1042     @VisibleForTesting
1043     ShardManagerInfoMBean getMBean(){
1044         return mBean;
1045     }
1046
1047     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1048         if (shardReplicaOperationsInProgress.contains(shardName)) {
1049             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1050             LOG.debug ("{}: {}", persistenceId(), msg);
1051             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1052             return true;
1053         }
1054
1055         return false;
1056     }
1057
1058     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
1059         final String shardName = shardReplicaMsg.getShardName();
1060
1061         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1062
1063         // verify the shard with the specified name is present in the cluster configuration
1064         if (!(this.configuration.isShardConfigured(shardName))) {
1065             String msg = String.format("No module configuration exists for shard %s", shardName);
1066             LOG.debug ("{}: {}", persistenceId(), msg);
1067             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1068             return;
1069         }
1070
1071         // Create the localShard
1072         if (schemaContext == null) {
1073             String msg = String.format(
1074                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1075             LOG.debug ("{}: {}", persistenceId(), msg);
1076             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1077             return;
1078         }
1079
1080         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1081             @Override
1082             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1083                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1084             }
1085
1086             @Override
1087             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1088                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1089             }
1090
1091         });
1092     }
1093
1094     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1095         String msg = String.format("Local shard %s already exists", shardName);
1096         LOG.debug ("{}: {}", persistenceId(), msg);
1097         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1098     }
1099
1100     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1101         if(isShardReplicaOperationInProgress(shardName, sender)) {
1102             return;
1103         }
1104
1105         shardReplicaOperationsInProgress.add(shardName);
1106
1107         final ShardInformation shardInfo;
1108         final boolean removeShardOnFailure;
1109         ShardInformation existingShardInfo = localShards.get(shardName);
1110         if(existingShardInfo == null) {
1111             removeShardOnFailure = true;
1112             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1113
1114             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1115                     DisableElectionsRaftPolicy.class.getName()).build();
1116
1117             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1118                     Shard.builder(), peerAddressResolver);
1119             shardInfo.setActiveMember(false);
1120             localShards.put(shardName, shardInfo);
1121             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1122         } else {
1123             removeShardOnFailure = false;
1124             shardInfo = existingShardInfo;
1125         }
1126
1127         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1128
1129         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1130         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1131                 response.getPrimaryPath(), shardInfo.getShardId());
1132
1133         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1134                 duration());
1135         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1136             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1137
1138         futureObj.onComplete(new OnComplete<Object>() {
1139             @Override
1140             public void onComplete(Throwable failure, Object addServerResponse) {
1141                 if (failure != null) {
1142                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1143                             response.getPrimaryPath(), shardName, failure);
1144
1145                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1146                             response.getPrimaryPath(), shardName);
1147                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1148                 } else {
1149                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1150                             response.getPrimaryPath(), removeShardOnFailure), sender);
1151                 }
1152             }
1153         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1154     }
1155
1156     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1157             boolean removeShardOnFailure) {
1158         shardReplicaOperationsInProgress.remove(shardName);
1159
1160         if(removeShardOnFailure) {
1161             ShardInformation shardInfo = localShards.remove(shardName);
1162             if (shardInfo.getActor() != null) {
1163                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1164             }
1165         }
1166
1167         sender.tell(new Status.Failure(message == null ? failure :
1168             new RuntimeException(message, failure)), getSelf());
1169     }
1170
1171     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1172             String leaderPath, boolean removeShardOnFailure) {
1173         String shardName = shardInfo.getShardName();
1174         shardReplicaOperationsInProgress.remove(shardName);
1175
1176         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1177
1178         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1179             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1180
1181             // Make the local shard voting capable
1182             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1183             shardInfo.setActiveMember(true);
1184             persistShardList();
1185
1186             sender.tell(new Status.Success(null), getSelf());
1187         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1188             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1189         } else {
1190             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1191                     persistenceId(), shardName, replyMsg.getStatus());
1192
1193             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1194
1195             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1196         }
1197     }
1198
1199     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1200                                                String leaderPath, ShardIdentifier shardId) {
1201         Exception failure;
1202         switch (serverChangeStatus) {
1203             case TIMEOUT:
1204                 failure = new TimeoutException(String.format(
1205                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1206                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1207                         leaderPath, shardId.getShardName()));
1208                 break;
1209             case NO_LEADER:
1210                 failure = createNoShardLeaderException(shardId);
1211                 break;
1212             case NOT_SUPPORTED:
1213                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1214                         serverChange.getSimpleName(), shardId.getShardName()));
1215                 break;
1216             default :
1217                 failure = new RuntimeException(String.format(
1218                         "%s request to leader %s for shard %s failed with status %s",
1219                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1220         }
1221         return failure;
1222     }
1223
1224     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1225         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1226
1227         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1228                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1229             @Override
1230             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1231                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1232             }
1233
1234             @Override
1235             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1236                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1237             }
1238         });
1239     }
1240
1241     private void persistShardList() {
1242         List<String> shardList = new ArrayList<>(localShards.keySet());
1243         for (ShardInformation shardInfo : localShards.values()) {
1244             if (!shardInfo.isActiveMember()) {
1245                 shardList.remove(shardInfo.getShardName());
1246             }
1247         }
1248         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1249         saveSnapshot(updateShardManagerSnapshot(shardList));
1250     }
1251
1252     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1253         currentSnapshot = new ShardManagerSnapshot(shardList);
1254         return currentSnapshot;
1255     }
1256
1257     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1258         currentSnapshot = snapshot;
1259
1260         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1261
1262         String currentMember = cluster.getCurrentMemberName();
1263         Set<String> configuredShardList =
1264             new HashSet<>(configuration.getMemberShardNames(currentMember));
1265         for (String shard : currentSnapshot.getShardList()) {
1266             if (!configuredShardList.contains(shard)) {
1267                 // add the current member as a replica for the shard
1268                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1269                 configuration.addMemberReplicaForShard(shard, currentMember);
1270             } else {
1271                 configuredShardList.remove(shard);
1272             }
1273         }
1274         for (String shard : configuredShardList) {
1275             // remove the member as a replica for the shard
1276             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1277             configuration.removeMemberReplicaForShard(shard, currentMember);
1278         }
1279     }
1280
1281     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1282         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1283             persistenceId());
1284         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1285             0, 0));
1286     }
1287
1288     private static final class ForwardedAddServerReply {
1289         ShardInformation shardInfo;
1290         AddServerReply addServerReply;
1291         String leaderPath;
1292         boolean removeShardOnFailure;
1293
1294         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1295                 boolean removeShardOnFailure) {
1296             this.shardInfo = shardInfo;
1297             this.addServerReply = addServerReply;
1298             this.leaderPath = leaderPath;
1299             this.removeShardOnFailure = removeShardOnFailure;
1300         }
1301     }
1302
1303     private static final class ForwardedAddServerFailure {
1304         String shardName;
1305         String failureMessage;
1306         Throwable failure;
1307         boolean removeShardOnFailure;
1308
1309         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1310                 boolean removeShardOnFailure) {
1311             this.shardName = shardName;
1312             this.failureMessage = failureMessage;
1313             this.failure = failure;
1314             this.removeShardOnFailure = removeShardOnFailure;
1315         }
1316     }
1317
1318     static class OnShardInitialized {
1319         private final Runnable replyRunnable;
1320         private Cancellable timeoutSchedule;
1321
1322         OnShardInitialized(Runnable replyRunnable) {
1323             this.replyRunnable = replyRunnable;
1324         }
1325
1326         Runnable getReplyRunnable() {
1327             return replyRunnable;
1328         }
1329
1330         Cancellable getTimeoutSchedule() {
1331             return timeoutSchedule;
1332         }
1333
1334         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1335             this.timeoutSchedule = timeoutSchedule;
1336         }
1337     }
1338
1339     static class OnShardReady extends OnShardInitialized {
1340         OnShardReady(Runnable replyRunnable) {
1341             super(replyRunnable);
1342         }
1343     }
1344
1345     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1346         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1347                 getShardInitializationTimeout().duration().$times(2));
1348
1349
1350         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1351         futureObj.onComplete(new OnComplete<Object>() {
1352             @Override
1353             public void onComplete(Throwable failure, Object response) {
1354                 if (failure != null) {
1355                     handler.onFailure(failure);
1356                 } else {
1357                     if(response instanceof RemotePrimaryShardFound) {
1358                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1359                     } else if(response instanceof LocalPrimaryShardFound) {
1360                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1361                     } else {
1362                         handler.onUnknownResponse(response);
1363                     }
1364                 }
1365             }
1366         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1367     }
1368
1369     /**
1370      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1371      * a remote or local find primary message is processed
1372      */
1373     private static interface FindPrimaryResponseHandler {
1374         /**
1375          * Invoked when a Failure message is received as a response
1376          *
1377          * @param failure
1378          */
1379         void onFailure(Throwable failure);
1380
1381         /**
1382          * Invoked when a RemotePrimaryShardFound response is received
1383          *
1384          * @param response
1385          */
1386         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1387
1388         /**
1389          * Invoked when a LocalPrimaryShardFound response is received
1390          * @param response
1391          */
1392         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1393
1394         /**
1395          * Invoked when an unknown response is received. This is another type of failure.
1396          *
1397          * @param response
1398          */
1399         void onUnknownResponse(Object response);
1400     }
1401
1402     /**
1403      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1404      * replica and sends a wrapped Failure response to some targetActor
1405      */
1406     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1407         private final ActorRef targetActor;
1408         private final String shardName;
1409         private final String persistenceId;
1410         private final ActorRef shardManagerActor;
1411
1412         /**
1413          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1414          * @param shardName The name of the shard for which the primary replica had to be found
1415          * @param persistenceId The persistenceId for the ShardManager
1416          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1417          */
1418         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1419             this.targetActor = Preconditions.checkNotNull(targetActor);
1420             this.shardName = Preconditions.checkNotNull(shardName);
1421             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1422             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1423         }
1424
1425         public ActorRef getTargetActor() {
1426             return targetActor;
1427         }
1428
1429         public String getShardName() {
1430             return shardName;
1431         }
1432
1433         @Override
1434         public void onFailure(Throwable failure) {
1435             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1436             targetActor.tell(new Status.Failure(new RuntimeException(
1437                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1438         }
1439
1440         @Override
1441         public void onUnknownResponse(Object response) {
1442             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1443                     shardName, response);
1444             LOG.debug ("{}: {}", persistenceId, msg);
1445             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1446                     new RuntimeException(msg)), shardManagerActor);
1447         }
1448     }
1449
1450     /**
1451      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1452      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1453      * as a successful response to find primary.
1454      */
1455     private static class PrimaryShardFoundForContext {
1456         private final String shardName;
1457         private final Object contextMessage;
1458         private final RemotePrimaryShardFound remotePrimaryShardFound;
1459         private final LocalPrimaryShardFound localPrimaryShardFound;
1460
1461         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
1462                 @Nonnull Object primaryFoundMessage) {
1463             this.shardName = Preconditions.checkNotNull(shardName);
1464             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1465             Preconditions.checkNotNull(primaryFoundMessage);
1466             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
1467                     (RemotePrimaryShardFound) primaryFoundMessage : null;
1468             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
1469                     (LocalPrimaryShardFound) primaryFoundMessage : null;
1470         }
1471
1472         @Nonnull
1473         String getPrimaryPath(){
1474             if(remotePrimaryShardFound != null) {
1475                 return remotePrimaryShardFound.getPrimaryPath();
1476             }
1477             return localPrimaryShardFound.getPrimaryPath();
1478         }
1479
1480         @Nonnull
1481         Object getContextMessage() {
1482             return contextMessage;
1483         }
1484
1485         @Nullable
1486         RemotePrimaryShardFound getRemotePrimaryShardFound() {
1487             return remotePrimaryShardFound;
1488         }
1489
1490         @Nonnull
1491         String getShardName() {
1492             return shardName;
1493         }
1494     }
1495
1496     /**
1497      * The WrappedShardResponse class wraps a response from a Shard.
1498      */
1499     private static final class WrappedShardResponse {
1500         private final ShardIdentifier shardId;
1501         private final Object response;
1502         private final String leaderPath;
1503
1504         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1505             this.shardId = shardId;
1506             this.response = response;
1507             this.leaderPath = leaderPath;
1508         }
1509
1510         ShardIdentifier getShardId() {
1511             return shardId;
1512         }
1513
1514         Object getResponse() {
1515             return response;
1516         }
1517
1518         String getLeaderPath() {
1519             return leaderPath;
1520         }
1521     }
1522
1523     private static final class ShardNotInitializedTimeout {
1524         private final ActorRef sender;
1525         private final ShardInformation shardInfo;
1526         private final OnShardInitialized onShardInitialized;
1527
1528         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1529             this.sender = sender;
1530             this.shardInfo = shardInfo;
1531             this.onShardInitialized = onShardInitialized;
1532         }
1533
1534         ActorRef getSender() {
1535             return sender;
1536         }
1537
1538         ShardInformation getShardInfo() {
1539             return shardInfo;
1540         }
1541
1542         OnShardInitialized getOnShardInitialized() {
1543             return onShardInitialized;
1544         }
1545     }
1546 }
1547
1548
1549