BUG-5626: Eliminate ShardIdentifier.Builder
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Status;
18 import akka.actor.SupervisorStrategy;
19 import akka.actor.SupervisorStrategy.Directive;
20 import akka.cluster.ClusterEvent;
21 import akka.cluster.Member;
22 import akka.dispatch.Futures;
23 import akka.dispatch.OnComplete;
24 import akka.japi.Function;
25 import akka.pattern.Patterns;
26 import akka.persistence.RecoveryCompleted;
27 import akka.persistence.SaveSnapshotFailure;
28 import akka.persistence.SaveSnapshotSuccess;
29 import akka.persistence.SnapshotOffer;
30 import akka.persistence.SnapshotSelectionCriteria;
31 import akka.util.Timeout;
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.base.Preconditions;
34 import java.io.ByteArrayInputStream;
35 import java.io.ObjectInputStream;
36 import java.util.ArrayList;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.HashMap;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Set;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.TimeoutException;
47 import java.util.function.Supplier;
48 import javax.annotation.Nonnull;
49 import javax.annotation.Nullable;
50 import org.apache.commons.lang3.SerializationUtils;
51 import org.opendaylight.controller.cluster.access.concepts.MemberName;
52 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
53 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
54 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
55 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
56 import org.opendaylight.controller.cluster.datastore.Shard;
57 import org.opendaylight.controller.cluster.datastore.config.Configuration;
58 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
59 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
60 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
61 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
63 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
64 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
65 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
66 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
67 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
68 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
69 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
70 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
71 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
72 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
73 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
74 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
76 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
77 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
78 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
79 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
80 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
81 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
82 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
83 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
84 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
85 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
86 import org.opendaylight.controller.cluster.raft.messages.AddServer;
87 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
88 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
89 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
90 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
91 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
92 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
93 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96 import scala.concurrent.ExecutionContext;
97 import scala.concurrent.Future;
98 import scala.concurrent.duration.Duration;
99 import scala.concurrent.duration.FiniteDuration;
100
101 /**
102  * The ShardManager has the following jobs,
103  * <ul>
104  * <li> Create all the local shard replicas that belong on this cluster member
105  * <li> Find the address of the local shard
106  * <li> Find the primary replica for any given shard
107  * <li> Monitor the cluster members and store their addresses
108  * <ul>
109  */
110 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
111
112     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
113
114     // Stores a mapping between a shard name and it's corresponding information
115     // Shard names look like inventory, topology etc and are as specified in
116     // configuration
117     private final Map<String, ShardInformation> localShards = new HashMap<>();
118
119     // The type of a ShardManager reflects the type of the datastore itself
120     // A data store could be of type config/operational
121     private final String type;
122
123     private final ClusterWrapper cluster;
124
125     private final Configuration configuration;
126
127     private final String shardDispatcherPath;
128
129     private final ShardManagerInfo mBean;
130
131     private DatastoreContextFactory datastoreContextFactory;
132
133     private final CountDownLatch waitTillReadyCountdownLatch;
134
135     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
136
137     private final ShardPeerAddressResolver peerAddressResolver;
138
139     private SchemaContext schemaContext;
140
141     private DatastoreSnapshot restoreFromSnapshot;
142
143     private ShardManagerSnapshot currentSnapshot;
144
145     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
146
147     private final String persistenceId;
148
149     ShardManager(AbstractShardManagerCreator<?> builder) {
150         this.cluster = builder.getCluster();
151         this.configuration = builder.getConfiguration();
152         this.datastoreContextFactory = builder.getDatastoreContextFactory();
153         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
154         this.shardDispatcherPath =
155                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
156         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
157         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
158         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
159
160         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
161         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
162
163         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
164
165         // Subscribe this actor to cluster member events
166         cluster.subscribeToMemberEvents(getSelf());
167
168         mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
169                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
170         mBean.registerMBean();
171     }
172
173     @Override
174     public void postStop() {
175         LOG.info("Stopping ShardManager {}", persistenceId());
176
177         mBean.unregisterMBean();
178     }
179
180     @Override
181     public void handleCommand(Object message) throws Exception {
182         if (message  instanceof FindPrimary) {
183             findPrimary((FindPrimary)message);
184         } else if(message instanceof FindLocalShard){
185             findLocalShard((FindLocalShard) message);
186         } else if (message instanceof UpdateSchemaContext) {
187             updateSchemaContext(message);
188         } else if(message instanceof ActorInitialized) {
189             onActorInitialized(message);
190         } else if (message instanceof ClusterEvent.MemberUp){
191             memberUp((ClusterEvent.MemberUp) message);
192         } else if (message instanceof ClusterEvent.MemberExited){
193             memberExited((ClusterEvent.MemberExited) message);
194         } else if(message instanceof ClusterEvent.MemberRemoved) {
195             memberRemoved((ClusterEvent.MemberRemoved) message);
196         } else if(message instanceof ClusterEvent.UnreachableMember) {
197             memberUnreachable((ClusterEvent.UnreachableMember)message);
198         } else if(message instanceof ClusterEvent.ReachableMember) {
199             memberReachable((ClusterEvent.ReachableMember) message);
200         } else if(message instanceof DatastoreContextFactory) {
201             onDatastoreContextFactory((DatastoreContextFactory)message);
202         } else if(message instanceof RoleChangeNotification) {
203             onRoleChangeNotification((RoleChangeNotification) message);
204         } else if(message instanceof FollowerInitialSyncUpStatus){
205             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
206         } else if(message instanceof ShardNotInitializedTimeout) {
207             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
208         } else if(message instanceof ShardLeaderStateChanged) {
209             onLeaderStateChanged((ShardLeaderStateChanged) message);
210         } else if(message instanceof SwitchShardBehavior){
211             onSwitchShardBehavior((SwitchShardBehavior) message);
212         } else if(message instanceof CreateShard) {
213             onCreateShard((CreateShard)message);
214         } else if(message instanceof AddShardReplica){
215             onAddShardReplica((AddShardReplica)message);
216         } else if(message instanceof ForwardedAddServerReply) {
217             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
218             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
219                     msg.removeShardOnFailure);
220         } else if(message instanceof ForwardedAddServerFailure) {
221             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
222             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
223         } else if(message instanceof PrimaryShardFoundForContext) {
224             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
225             onPrimaryShardFoundContext(primaryShardFoundContext);
226         } else if(message instanceof RemoveShardReplica) {
227             onRemoveShardReplica((RemoveShardReplica) message);
228         } else if(message instanceof WrappedShardResponse){
229             onWrappedShardResponse((WrappedShardResponse) message);
230         } else if(message instanceof GetSnapshot) {
231             onGetSnapshot();
232         } else if(message instanceof ServerRemoved){
233             onShardReplicaRemoved((ServerRemoved) message);
234         } else if(message instanceof SaveSnapshotSuccess) {
235             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
236         } else if(message instanceof SaveSnapshotFailure) {
237             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
238                     persistenceId(), ((SaveSnapshotFailure) message).cause());
239         } else if(message instanceof Shutdown) {
240             onShutDown();
241         } else if (message instanceof GetLocalShardIds) {
242             onGetLocalShardIds();
243         } else {
244             unknownMessage(message);
245         }
246     }
247
248     private void onShutDown() {
249         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
250         for (ShardInformation info : localShards.values()) {
251             if (info.getActor() != null) {
252                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
253
254                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
255                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
256             }
257         }
258
259         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
260
261         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
262         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
263
264         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
265             @Override
266             public void onComplete(Throwable failure, Iterable<Boolean> results) {
267                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
268
269                 self().tell(PoisonPill.getInstance(), self());
270
271                 if(failure != null) {
272                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
273                 } else {
274                     int nfailed = 0;
275                     for(Boolean r: results) {
276                         if(!r) {
277                             nfailed++;
278                         }
279                     }
280
281                     if(nfailed > 0) {
282                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
283                     }
284                 }
285             }
286         }, dispatcher);
287     }
288
289     private void onWrappedShardResponse(WrappedShardResponse message) {
290         if (message.getResponse() instanceof RemoveServerReply) {
291             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
292                     message.getLeaderPath());
293         }
294     }
295
296     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
297             String leaderPath) {
298         shardReplicaOperationsInProgress.remove(shardId);
299
300         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
301
302         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
303             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
304                     shardId.getShardName());
305             originalSender.tell(new Status.Success(null), getSelf());
306         } else {
307             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
308                     persistenceId(), shardId, replyMsg.getStatus());
309
310             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
311             originalSender.tell(new Status.Failure(failure), getSelf());
312         }
313     }
314
315     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
316         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
317             addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
318                     getSender());
319         } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
320             removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
321                     primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
322         }
323     }
324
325     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
326             final ActorRef sender) {
327         if(isShardReplicaOperationInProgress(shardName, sender)) {
328             return;
329         }
330
331         shardReplicaOperationsInProgress.add(shardName);
332
333         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
334
335         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
336
337         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
338         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
339                 primaryPath, shardId);
340
341         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
342                 duration());
343         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
344                 new RemoveServer(shardId.toString()), removeServerTimeout);
345
346         futureObj.onComplete(new OnComplete<Object>() {
347             @Override
348             public void onComplete(Throwable failure, Object response) {
349                 if (failure != null) {
350                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
351                             primaryPath, shardName);
352
353                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
354
355                     // FAILURE
356                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
357                 } else {
358                     // SUCCESS
359                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
360                 }
361             }
362         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
363     }
364
365     private void onShardReplicaRemoved(ServerRemoved message) {
366         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
367         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
368         if(shardInformation == null) {
369             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
370             return;
371         } else if(shardInformation.getActor() != null) {
372             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
373             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
374         }
375         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
376         persistShardList();
377     }
378
379     private void onGetSnapshot() {
380         LOG.debug("{}: onGetSnapshot", persistenceId());
381
382         List<String> notInitialized = null;
383         for(ShardInformation shardInfo: localShards.values()) {
384             if(!shardInfo.isShardInitialized()) {
385                 if(notInitialized == null) {
386                     notInitialized = new ArrayList<>();
387                 }
388
389                 notInitialized.add(shardInfo.getShardName());
390             }
391         }
392
393         if(notInitialized != null) {
394             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
395                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
396             return;
397         }
398
399         byte[] shardManagerSnapshot = null;
400         if(currentSnapshot != null) {
401             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
402         }
403
404         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
405                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
406                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
407
408         for(ShardInformation shardInfo: localShards.values()) {
409             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
410         }
411     }
412
413     private void onCreateShard(CreateShard createShard) {
414         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
415
416         Object reply;
417         try {
418             String shardName = createShard.getModuleShardConfig().getShardName();
419             if(localShards.containsKey(shardName)) {
420                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
421                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
422             } else {
423                 doCreateShard(createShard);
424                 reply = new Status.Success(null);
425             }
426         } catch (Exception e) {
427             LOG.error("{}: onCreateShard failed", persistenceId(), e);
428             reply = new Status.Failure(e);
429         }
430
431         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
432             getSender().tell(reply, getSelf());
433         }
434     }
435
436     private void doCreateShard(CreateShard createShard) {
437         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
438         String shardName = moduleShardConfig.getShardName();
439
440         configuration.addModuleShardConfiguration(moduleShardConfig);
441
442         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
443         if(shardDatastoreContext == null) {
444             shardDatastoreContext = newShardDatastoreContext(shardName);
445         } else {
446             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
447                     peerAddressResolver).build();
448         }
449
450         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
451
452         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
453                 currentSnapshot.getShardList().contains(shardName);
454
455         Map<String, String> peerAddresses;
456         boolean isActiveMember;
457         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
458                 contains(cluster.getCurrentMemberName())) {
459             peerAddresses = getPeerAddresses(shardName);
460             isActiveMember = true;
461         } else {
462             // The local member is not in the static shard member configuration and the shard did not
463             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
464             // the shard with no peers and with elections disabled so it stays as follower. A
465             // subsequent AddServer request will be needed to make it an active member.
466             isActiveMember = false;
467             peerAddresses = Collections.emptyMap();
468             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
469                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
470         }
471
472         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
473                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
474                 isActiveMember);
475
476         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
477                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
478         info.setActiveMember(isActiveMember);
479         localShards.put(info.getShardName(), info);
480
481         if(schemaContext != null) {
482             info.setActor(newShardActor(schemaContext, info));
483         }
484     }
485
486     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
487         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
488                 shardPeerAddressResolver(peerAddressResolver);
489     }
490
491     private DatastoreContext newShardDatastoreContext(String shardName) {
492         return newShardDatastoreContextBuilder(shardName).build();
493     }
494
495     private void checkReady(){
496         if (isReadyWithLeaderId()) {
497             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
498                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
499
500             waitTillReadyCountdownLatch.countDown();
501         }
502     }
503
504     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
505         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
506
507         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
508         if(shardInformation != null) {
509             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
510             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
511             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
512                 primaryShardInfoCache.remove(shardInformation.getShardName());
513             }
514
515             checkReady();
516         } else {
517             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
518         }
519     }
520
521     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
522         ShardInformation shardInfo = message.getShardInfo();
523
524         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
525                 shardInfo.getShardName());
526
527         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
528
529         if(!shardInfo.isShardInitialized()) {
530             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
531             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
532         } else {
533             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
534             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
535         }
536     }
537
538     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
539         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
540                 status.getName(), status.isInitialSyncDone());
541
542         ShardInformation shardInformation = findShardInformation(status.getName());
543
544         if(shardInformation != null) {
545             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
546
547             mBean.setSyncStatus(isInSync());
548         }
549
550     }
551
552     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
553         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
554                 roleChanged.getOldRole(), roleChanged.getNewRole());
555
556         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
557         if(shardInformation != null) {
558             shardInformation.setRole(roleChanged.getNewRole());
559             checkReady();
560             mBean.setSyncStatus(isInSync());
561         }
562     }
563
564
565     private ShardInformation findShardInformation(String memberId) {
566         for(ShardInformation info : localShards.values()){
567             if(info.getShardId().toString().equals(memberId)){
568                 return info;
569             }
570         }
571
572         return null;
573     }
574
575     private boolean isReadyWithLeaderId() {
576         boolean isReady = true;
577         for (ShardInformation info : localShards.values()) {
578             if(!info.isShardReadyWithLeaderId()){
579                 isReady = false;
580                 break;
581             }
582         }
583         return isReady;
584     }
585
586     private boolean isInSync(){
587         for (ShardInformation info : localShards.values()) {
588             if(!info.isInSync()){
589                 return false;
590             }
591         }
592         return true;
593     }
594
595     private void onActorInitialized(Object message) {
596         final ActorRef sender = getSender();
597
598         if (sender == null) {
599             return; //why is a non-actor sending this message? Just ignore.
600         }
601
602         String actorName = sender.path().name();
603         //find shard name from actor name; actor name is stringified shardId
604
605         final ShardIdentifier shardId;
606         try {
607             shardId = ShardIdentifier.fromShardIdString(actorName);
608         } catch (IllegalArgumentException e) {
609             LOG.debug("{}: ignoring actor {}", actorName, e);
610             return;
611         }
612
613         markShardAsInitialized(shardId.getShardName());
614     }
615
616     private void markShardAsInitialized(String shardName) {
617         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
618
619         ShardInformation shardInformation = localShards.get(shardName);
620         if (shardInformation != null) {
621             shardInformation.setActorInitialized();
622
623             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
624         }
625     }
626
627     @Override
628     protected void handleRecover(Object message) throws Exception {
629         if (message instanceof RecoveryCompleted) {
630             onRecoveryCompleted();
631         } else if (message instanceof SnapshotOffer) {
632             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
633         }
634     }
635
636     private void onRecoveryCompleted() {
637         LOG.info("Recovery complete : {}", persistenceId());
638
639         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
640         // journal on upgrade from Helium.
641         deleteMessages(lastSequenceNr());
642
643         if(currentSnapshot == null && restoreFromSnapshot != null &&
644                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
645             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
646                     restoreFromSnapshot.getShardManagerSnapshot()))) {
647                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
648
649                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
650
651                 applyShardManagerSnapshot(snapshot);
652             } catch(Exception e) {
653                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
654             }
655         }
656
657         createLocalShards();
658     }
659
660     private void findLocalShard(FindLocalShard message) {
661         final ShardInformation shardInformation = localShards.get(message.getShardName());
662
663         if(shardInformation == null){
664             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
665             return;
666         }
667
668         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor()));
669     }
670
671     private void sendResponse(ShardInformation shardInformation, boolean doWait,
672             boolean wantShardReady, final Supplier<Object> messageSupplier) {
673         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
674             if(doWait) {
675                 final ActorRef sender = getSender();
676                 final ActorRef self = self();
677
678                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
679
680                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
681                     new OnShardInitialized(replyRunnable);
682
683                 shardInformation.addOnShardInitialized(onShardInitialized);
684
685                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
686                 if(shardInformation.isShardInitialized()) {
687                     // If the shard is already initialized then we'll wait enough time for the shard to
688                     // elect a leader, ie 2 times the election timeout.
689                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
690                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
691                 }
692
693                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
694                         shardInformation.getShardName());
695
696                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
697                         timeout, getSelf(),
698                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
699                         getContext().dispatcher(), getSelf());
700
701                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
702
703             } else if (!shardInformation.isShardInitialized()) {
704                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
705                         shardInformation.getShardName());
706                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
707             } else {
708                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
709                         shardInformation.getShardName());
710                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
711             }
712
713             return;
714         }
715
716         getSender().tell(messageSupplier.get(), getSelf());
717     }
718
719     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
720         return new NoShardLeaderException(null, shardId.toString());
721     }
722
723     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
724         return new NotInitializedException(String.format(
725                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
726     }
727
728     @VisibleForTesting
729     static MemberName memberToName(final Member member) {
730         return MemberName.forName(member.roles().iterator().next());
731     }
732
733     private void memberRemoved(ClusterEvent.MemberRemoved message) {
734         MemberName memberName = memberToName(message.member());
735
736         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
737                 message.member().address());
738
739         peerAddressResolver.removePeerAddress(memberName);
740
741         for(ShardInformation info : localShards.values()){
742             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
743         }
744     }
745
746     private void memberExited(ClusterEvent.MemberExited message) {
747         MemberName memberName = memberToName(message.member());
748
749         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
750                 message.member().address());
751
752         peerAddressResolver.removePeerAddress(memberName);
753
754         for(ShardInformation info : localShards.values()){
755             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
756         }
757     }
758
759     private void memberUp(ClusterEvent.MemberUp message) {
760         MemberName memberName = memberToName(message.member());
761
762         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
763                 message.member().address());
764
765         addPeerAddress(memberName, message.member().address());
766
767         checkReady();
768     }
769
770     private void addPeerAddress(MemberName memberName, Address address) {
771         peerAddressResolver.addPeerAddress(memberName, address);
772
773         for(ShardInformation info : localShards.values()){
774             String shardName = info.getShardName();
775             String peerId = getShardIdentifier(memberName, shardName).toString();
776             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
777
778             info.peerUp(memberName, peerId, getSelf());
779         }
780     }
781
782     private void memberReachable(ClusterEvent.ReachableMember message) {
783         MemberName memberName = memberToName(message.member());
784         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
785
786         addPeerAddress(memberName, message.member().address());
787
788         markMemberAvailable(memberName);
789     }
790
791     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
792         MemberName memberName = memberToName(message.member());
793         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
794
795         markMemberUnavailable(memberName);
796     }
797
798     private void markMemberUnavailable(final MemberName memberName) {
799         final String memberStr = memberName.getName();
800         for (ShardInformation info : localShards.values()) {
801             String leaderId = info.getLeaderId();
802             // XXX: why are we using String#contains() here?
803             if (leaderId != null && leaderId.contains(memberStr)) {
804                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
805                 info.setLeaderAvailable(false);
806
807                 primaryShardInfoCache.remove(info.getShardName());
808             }
809
810             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
811         }
812     }
813
814     private void markMemberAvailable(final MemberName memberName) {
815         final String memberStr = memberName.getName();
816         for (ShardInformation info : localShards.values()) {
817             String leaderId = info.getLeaderId();
818             // XXX: why are we using String#contains() here?
819             if (leaderId != null && leaderId.contains(memberStr)) {
820                 LOG.debug("Marking Leader {} as available.", leaderId);
821                 info.setLeaderAvailable(true);
822             }
823
824             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
825         }
826     }
827
828     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
829         datastoreContextFactory = factory;
830         for (ShardInformation info : localShards.values()) {
831             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
832         }
833     }
834
835     private void onGetLocalShardIds() {
836         final List<String> response = new ArrayList<>(localShards.size());
837
838         for (ShardInformation info : localShards.values()) {
839             response.add(info.getShardId().toString());
840         }
841
842         getSender().tell(new Status.Success(response), getSelf());
843     }
844
845     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
846         final ShardIdentifier identifier = message.getShardId();
847
848         if (identifier != null) {
849             final ShardInformation info = localShards.get(identifier.getShardName());
850             if (info == null) {
851                 getSender().tell(new Status.Failure(
852                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
853                 return;
854             }
855
856             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
857         } else {
858             for (ShardInformation info : localShards.values()) {
859                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
860             }
861         }
862
863         getSender().tell(new Status.Success(null), getSelf());
864     }
865
866     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
867         final ActorRef actor = info.getActor();
868         if (actor != null) {
869             actor.tell(switchBehavior, getSelf());
870           } else {
871             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
872                 info.getShardName(), switchBehavior.getNewState());
873         }
874     }
875
876     /**
877      * Notifies all the local shards of a change in the schema context
878      *
879      * @param message
880      */
881     private void updateSchemaContext(final Object message) {
882         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
883
884         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
885
886         for (ShardInformation info : localShards.values()) {
887             if (info.getActor() == null) {
888                 LOG.debug("Creating Shard {}", info.getShardId());
889                 info.setActor(newShardActor(schemaContext, info));
890             } else {
891                 info.getActor().tell(message, getSelf());
892             }
893         }
894     }
895
896     @VisibleForTesting
897     protected ClusterWrapper getCluster() {
898         return cluster;
899     }
900
901     @VisibleForTesting
902     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
903         return getContext().actorOf(info.newProps(schemaContext)
904                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
905     }
906
907     private void findPrimary(FindPrimary message) {
908         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
909
910         final String shardName = message.getShardName();
911         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
912
913         // First see if the there is a local replica for the shard
914         final ShardInformation info = localShards.get(shardName);
915         if (info != null && info.isActiveMember()) {
916             sendResponse(info, message.isWaitUntilReady(), true, () -> {
917                 String primaryPath = info.getSerializedLeaderActor();
918                 Object found = canReturnLocalShardState && info.isLeader() ?
919                         new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
920                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
921
922                         if(LOG.isDebugEnabled()) {
923                             LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
924                         }
925
926                         return found;
927             });
928
929             return;
930         }
931
932         final Collection<String> visitedAddresses;
933         if(message instanceof RemoteFindPrimary) {
934             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
935         } else {
936             visitedAddresses = new ArrayList<>(1);
937         }
938
939         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
940
941         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
942             if(visitedAddresses.contains(address)) {
943                 continue;
944             }
945
946             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
947                     persistenceId(), shardName, address, visitedAddresses);
948
949             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
950                     message.isWaitUntilReady(), visitedAddresses), getContext());
951             return;
952         }
953
954         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
955
956         getSender().tell(new PrimaryNotFoundException(
957                 String.format("No primary shard found for %s.", shardName)), getSelf());
958     }
959
960     /**
961      * Construct the name of the shard actor given the name of the member on
962      * which the shard resides and the name of the shard
963      *
964      * @param memberName
965      * @param shardName
966      * @return
967      */
968     private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
969         return peerAddressResolver.getShardIdentifier(memberName, shardName);
970     }
971
972     /**
973      * Create shards that are local to the member on which the ShardManager
974      * runs
975      *
976      */
977     private void createLocalShards() {
978         MemberName memberName = this.cluster.getCurrentMemberName();
979         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
980
981         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
982         if(restoreFromSnapshot != null)
983         {
984             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
985                 shardSnapshots.put(snapshot.getName(), snapshot);
986             }
987         }
988
989         restoreFromSnapshot = null; // null out to GC
990
991         for(String shardName : memberShardNames){
992             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
993
994             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
995
996             Map<String, String> peerAddresses = getPeerAddresses(shardName);
997             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
998                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
999                         shardSnapshots.get(shardName)), peerAddressResolver));
1000         }
1001     }
1002
1003     /**
1004      * Given the name of the shard find the addresses of all it's peers
1005      *
1006      * @param shardName
1007      */
1008     private Map<String, String> getPeerAddresses(String shardName) {
1009         Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1010         Map<String, String> peerAddresses = new HashMap<>();
1011
1012         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1013
1014         for (MemberName memberName : members) {
1015             if (!currentMemberName.equals(memberName)) {
1016                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1017                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1018                 peerAddresses.put(shardId.toString(), address);
1019             }
1020         }
1021         return peerAddresses;
1022     }
1023
1024     @Override
1025     public SupervisorStrategy supervisorStrategy() {
1026
1027         return new OneForOneStrategy(10, Duration.create("1 minute"),
1028                 (Function<Throwable, Directive>) t -> {
1029                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1030                     return SupervisorStrategy.resume();
1031                 }
1032                 );
1033
1034     }
1035
1036     @Override
1037     public String persistenceId() {
1038         return persistenceId;
1039     }
1040
1041     @VisibleForTesting
1042     ShardManagerInfoMBean getMBean(){
1043         return mBean;
1044     }
1045
1046     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1047         if (shardReplicaOperationsInProgress.contains(shardName)) {
1048             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1049             LOG.debug ("{}: {}", persistenceId(), msg);
1050             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1051             return true;
1052         }
1053
1054         return false;
1055     }
1056
1057     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
1058         final String shardName = shardReplicaMsg.getShardName();
1059
1060         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1061
1062         // verify the shard with the specified name is present in the cluster configuration
1063         if (!(this.configuration.isShardConfigured(shardName))) {
1064             String msg = String.format("No module configuration exists for shard %s", shardName);
1065             LOG.debug ("{}: {}", persistenceId(), msg);
1066             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1067             return;
1068         }
1069
1070         // Create the localShard
1071         if (schemaContext == null) {
1072             String msg = String.format(
1073                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1074             LOG.debug ("{}: {}", persistenceId(), msg);
1075             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1076             return;
1077         }
1078
1079         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1080             @Override
1081             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1082                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1083             }
1084
1085             @Override
1086             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1087                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1088             }
1089
1090         });
1091     }
1092
1093     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1094         String msg = String.format("Local shard %s already exists", shardName);
1095         LOG.debug ("{}: {}", persistenceId(), msg);
1096         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1097     }
1098
1099     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1100         if(isShardReplicaOperationInProgress(shardName, sender)) {
1101             return;
1102         }
1103
1104         shardReplicaOperationsInProgress.add(shardName);
1105
1106         final ShardInformation shardInfo;
1107         final boolean removeShardOnFailure;
1108         ShardInformation existingShardInfo = localShards.get(shardName);
1109         if(existingShardInfo == null) {
1110             removeShardOnFailure = true;
1111             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1112
1113             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1114                     DisableElectionsRaftPolicy.class.getName()).build();
1115
1116             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1117                     Shard.builder(), peerAddressResolver);
1118             shardInfo.setActiveMember(false);
1119             localShards.put(shardName, shardInfo);
1120             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1121         } else {
1122             removeShardOnFailure = false;
1123             shardInfo = existingShardInfo;
1124         }
1125
1126         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1127
1128         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1129         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1130                 response.getPrimaryPath(), shardInfo.getShardId());
1131
1132         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1133                 duration());
1134         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1135             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1136
1137         futureObj.onComplete(new OnComplete<Object>() {
1138             @Override
1139             public void onComplete(Throwable failure, Object addServerResponse) {
1140                 if (failure != null) {
1141                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1142                             response.getPrimaryPath(), shardName, failure);
1143
1144                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1145                             response.getPrimaryPath(), shardName);
1146                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1147                 } else {
1148                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1149                             response.getPrimaryPath(), removeShardOnFailure), sender);
1150                 }
1151             }
1152         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1153     }
1154
1155     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1156             boolean removeShardOnFailure) {
1157         shardReplicaOperationsInProgress.remove(shardName);
1158
1159         if(removeShardOnFailure) {
1160             ShardInformation shardInfo = localShards.remove(shardName);
1161             if (shardInfo.getActor() != null) {
1162                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1163             }
1164         }
1165
1166         sender.tell(new Status.Failure(message == null ? failure :
1167             new RuntimeException(message, failure)), getSelf());
1168     }
1169
1170     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1171             String leaderPath, boolean removeShardOnFailure) {
1172         String shardName = shardInfo.getShardName();
1173         shardReplicaOperationsInProgress.remove(shardName);
1174
1175         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1176
1177         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1178             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1179
1180             // Make the local shard voting capable
1181             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1182             shardInfo.setActiveMember(true);
1183             persistShardList();
1184
1185             sender.tell(new Status.Success(null), getSelf());
1186         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1187             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1188         } else {
1189             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1190                     persistenceId(), shardName, replyMsg.getStatus());
1191
1192             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1193
1194             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1195         }
1196     }
1197
1198     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1199                                                String leaderPath, ShardIdentifier shardId) {
1200         Exception failure;
1201         switch (serverChangeStatus) {
1202             case TIMEOUT:
1203                 failure = new TimeoutException(String.format(
1204                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1205                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1206                         leaderPath, shardId.getShardName()));
1207                 break;
1208             case NO_LEADER:
1209                 failure = createNoShardLeaderException(shardId);
1210                 break;
1211             case NOT_SUPPORTED:
1212                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1213                         serverChange.getSimpleName(), shardId.getShardName()));
1214                 break;
1215             default :
1216                 failure = new RuntimeException(String.format(
1217                         "%s request to leader %s for shard %s failed with status %s",
1218                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1219         }
1220         return failure;
1221     }
1222
1223     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1224         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1225
1226         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1227                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1228             @Override
1229             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1230                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1231             }
1232
1233             @Override
1234             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1235                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1236             }
1237         });
1238     }
1239
1240     private void persistShardList() {
1241         List<String> shardList = new ArrayList<>(localShards.keySet());
1242         for (ShardInformation shardInfo : localShards.values()) {
1243             if (!shardInfo.isActiveMember()) {
1244                 shardList.remove(shardInfo.getShardName());
1245             }
1246         }
1247         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1248         saveSnapshot(updateShardManagerSnapshot(shardList));
1249     }
1250
1251     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1252         currentSnapshot = new ShardManagerSnapshot(shardList);
1253         return currentSnapshot;
1254     }
1255
1256     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1257         currentSnapshot = snapshot;
1258
1259         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1260
1261         final MemberName currentMember = cluster.getCurrentMemberName();
1262         Set<String> configuredShardList =
1263             new HashSet<>(configuration.getMemberShardNames(currentMember));
1264         for (String shard : currentSnapshot.getShardList()) {
1265             if (!configuredShardList.contains(shard)) {
1266                 // add the current member as a replica for the shard
1267                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1268                 configuration.addMemberReplicaForShard(shard, currentMember);
1269             } else {
1270                 configuredShardList.remove(shard);
1271             }
1272         }
1273         for (String shard : configuredShardList) {
1274             // remove the member as a replica for the shard
1275             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1276             configuration.removeMemberReplicaForShard(shard, currentMember);
1277         }
1278     }
1279
1280     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1281         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1282             persistenceId());
1283         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1284             0, 0));
1285     }
1286
1287     private static final class ForwardedAddServerReply {
1288         ShardInformation shardInfo;
1289         AddServerReply addServerReply;
1290         String leaderPath;
1291         boolean removeShardOnFailure;
1292
1293         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1294                 boolean removeShardOnFailure) {
1295             this.shardInfo = shardInfo;
1296             this.addServerReply = addServerReply;
1297             this.leaderPath = leaderPath;
1298             this.removeShardOnFailure = removeShardOnFailure;
1299         }
1300     }
1301
1302     private static final class ForwardedAddServerFailure {
1303         String shardName;
1304         String failureMessage;
1305         Throwable failure;
1306         boolean removeShardOnFailure;
1307
1308         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1309                 boolean removeShardOnFailure) {
1310             this.shardName = shardName;
1311             this.failureMessage = failureMessage;
1312             this.failure = failure;
1313             this.removeShardOnFailure = removeShardOnFailure;
1314         }
1315     }
1316
1317     static class OnShardInitialized {
1318         private final Runnable replyRunnable;
1319         private Cancellable timeoutSchedule;
1320
1321         OnShardInitialized(Runnable replyRunnable) {
1322             this.replyRunnable = replyRunnable;
1323         }
1324
1325         Runnable getReplyRunnable() {
1326             return replyRunnable;
1327         }
1328
1329         Cancellable getTimeoutSchedule() {
1330             return timeoutSchedule;
1331         }
1332
1333         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1334             this.timeoutSchedule = timeoutSchedule;
1335         }
1336     }
1337
1338     static class OnShardReady extends OnShardInitialized {
1339         OnShardReady(Runnable replyRunnable) {
1340             super(replyRunnable);
1341         }
1342     }
1343
1344     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1345         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1346                 getShardInitializationTimeout().duration().$times(2));
1347
1348
1349         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1350         futureObj.onComplete(new OnComplete<Object>() {
1351             @Override
1352             public void onComplete(Throwable failure, Object response) {
1353                 if (failure != null) {
1354                     handler.onFailure(failure);
1355                 } else {
1356                     if(response instanceof RemotePrimaryShardFound) {
1357                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1358                     } else if(response instanceof LocalPrimaryShardFound) {
1359                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1360                     } else {
1361                         handler.onUnknownResponse(response);
1362                     }
1363                 }
1364             }
1365         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1366     }
1367
1368     /**
1369      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1370      * a remote or local find primary message is processed
1371      */
1372     private static interface FindPrimaryResponseHandler {
1373         /**
1374          * Invoked when a Failure message is received as a response
1375          *
1376          * @param failure
1377          */
1378         void onFailure(Throwable failure);
1379
1380         /**
1381          * Invoked when a RemotePrimaryShardFound response is received
1382          *
1383          * @param response
1384          */
1385         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1386
1387         /**
1388          * Invoked when a LocalPrimaryShardFound response is received
1389          * @param response
1390          */
1391         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1392
1393         /**
1394          * Invoked when an unknown response is received. This is another type of failure.
1395          *
1396          * @param response
1397          */
1398         void onUnknownResponse(Object response);
1399     }
1400
1401     /**
1402      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1403      * replica and sends a wrapped Failure response to some targetActor
1404      */
1405     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1406         private final ActorRef targetActor;
1407         private final String shardName;
1408         private final String persistenceId;
1409         private final ActorRef shardManagerActor;
1410
1411         /**
1412          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1413          * @param shardName The name of the shard for which the primary replica had to be found
1414          * @param persistenceId The persistenceId for the ShardManager
1415          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1416          */
1417         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1418             this.targetActor = Preconditions.checkNotNull(targetActor);
1419             this.shardName = Preconditions.checkNotNull(shardName);
1420             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1421             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1422         }
1423
1424         public ActorRef getTargetActor() {
1425             return targetActor;
1426         }
1427
1428         public String getShardName() {
1429             return shardName;
1430         }
1431
1432         @Override
1433         public void onFailure(Throwable failure) {
1434             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1435             targetActor.tell(new Status.Failure(new RuntimeException(
1436                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1437         }
1438
1439         @Override
1440         public void onUnknownResponse(Object response) {
1441             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1442                     shardName, response);
1443             LOG.debug ("{}: {}", persistenceId, msg);
1444             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1445                     new RuntimeException(msg)), shardManagerActor);
1446         }
1447     }
1448
1449     /**
1450      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1451      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1452      * as a successful response to find primary.
1453      */
1454     private static class PrimaryShardFoundForContext {
1455         private final String shardName;
1456         private final Object contextMessage;
1457         private final RemotePrimaryShardFound remotePrimaryShardFound;
1458         private final LocalPrimaryShardFound localPrimaryShardFound;
1459
1460         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
1461                 @Nonnull Object primaryFoundMessage) {
1462             this.shardName = Preconditions.checkNotNull(shardName);
1463             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1464             Preconditions.checkNotNull(primaryFoundMessage);
1465             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
1466                     (RemotePrimaryShardFound) primaryFoundMessage : null;
1467             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
1468                     (LocalPrimaryShardFound) primaryFoundMessage : null;
1469         }
1470
1471         @Nonnull
1472         String getPrimaryPath(){
1473             if(remotePrimaryShardFound != null) {
1474                 return remotePrimaryShardFound.getPrimaryPath();
1475             }
1476             return localPrimaryShardFound.getPrimaryPath();
1477         }
1478
1479         @Nonnull
1480         Object getContextMessage() {
1481             return contextMessage;
1482         }
1483
1484         @Nullable
1485         RemotePrimaryShardFound getRemotePrimaryShardFound() {
1486             return remotePrimaryShardFound;
1487         }
1488
1489         @Nonnull
1490         String getShardName() {
1491             return shardName;
1492         }
1493     }
1494
1495     /**
1496      * The WrappedShardResponse class wraps a response from a Shard.
1497      */
1498     private static final class WrappedShardResponse {
1499         private final ShardIdentifier shardId;
1500         private final Object response;
1501         private final String leaderPath;
1502
1503         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1504             this.shardId = shardId;
1505             this.response = response;
1506             this.leaderPath = leaderPath;
1507         }
1508
1509         ShardIdentifier getShardId() {
1510             return shardId;
1511         }
1512
1513         Object getResponse() {
1514             return response;
1515         }
1516
1517         String getLeaderPath() {
1518             return leaderPath;
1519         }
1520     }
1521
1522     private static final class ShardNotInitializedTimeout {
1523         private final ActorRef sender;
1524         private final ShardInformation shardInfo;
1525         private final OnShardInitialized onShardInitialized;
1526
1527         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1528             this.sender = sender;
1529             this.shardInfo = shardInfo;
1530             this.onShardInitialized = onShardInitialized;
1531         }
1532
1533         ActorRef getSender() {
1534             return sender;
1535         }
1536
1537         ShardInformation getShardInfo() {
1538             return shardInfo;
1539         }
1540
1541         OnShardInitialized getOnShardInitialized() {
1542             return onShardInitialized;
1543         }
1544     }
1545 }
1546
1547
1548