bbf5cf31003d9079761e57c3c6db648761ad3070
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.actor.SupervisorStrategy.Directive;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberWeaklyUp;
23 import akka.cluster.Member;
24 import akka.dispatch.Futures;
25 import akka.dispatch.OnComplete;
26 import akka.japi.Function;
27 import akka.pattern.Patterns;
28 import akka.persistence.RecoveryCompleted;
29 import akka.persistence.SaveSnapshotFailure;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.persistence.SnapshotOffer;
32 import akka.persistence.SnapshotSelectionCriteria;
33 import akka.util.Timeout;
34 import com.google.common.annotations.VisibleForTesting;
35 import com.google.common.base.Preconditions;
36 import java.io.ByteArrayInputStream;
37 import java.io.IOException;
38 import java.io.ObjectInputStream;
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Map.Entry;
47 import java.util.Set;
48 import java.util.concurrent.CountDownLatch;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import java.util.function.Consumer;
52 import java.util.function.Supplier;
53 import org.apache.commons.lang3.SerializationUtils;
54 import org.opendaylight.controller.cluster.access.concepts.MemberName;
55 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
56 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
57 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
58 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.Configuration;
61 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
63 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
64 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
65 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
66 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
67 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
68 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
69 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
70 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
71 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
72 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
73 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
74 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
77 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
78 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
79 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
80 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
81 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
82 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
83 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
84 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
86 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
87 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
88 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
89 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
90 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
91 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
92 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
93 import org.opendaylight.controller.cluster.raft.messages.AddServer;
94 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
95 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
96 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
97 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
98 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
99 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
100 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
101 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
102 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
103 import org.slf4j.Logger;
104 import org.slf4j.LoggerFactory;
105 import scala.concurrent.ExecutionContext;
106 import scala.concurrent.Future;
107 import scala.concurrent.duration.Duration;
108 import scala.concurrent.duration.FiniteDuration;
109
110 /**
111  * Manages the shards for a data store. The ShardManager has the following jobs:
112  * <ul>
113  * <li> Create all the local shard replicas that belong on this cluster member
114  * <li> Find the address of the local shard
115  * <li> Find the primary replica for any given shard
116  * <li> Monitor the cluster members and store their addresses
117  * </ul>
118  */
119 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
120     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
121
122     // Stores a mapping between a shard name and it's corresponding information
123     // Shard names look like inventory, topology etc and are as specified in
124     // configuration
125     private final Map<String, ShardInformation> localShards = new HashMap<>();
126
127     // The type of a ShardManager reflects the type of the datastore itself
128     // A data store could be of type config/operational
129     private final String type;
130
131     private final ClusterWrapper cluster;
132
133     private final Configuration configuration;
134
135     private final String shardDispatcherPath;
136
137     private final ShardManagerInfo shardManagerMBean;
138
139     private DatastoreContextFactory datastoreContextFactory;
140
141     private final CountDownLatch waitTillReadyCountdownLatch;
142
143     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
144
145     private final ShardPeerAddressResolver peerAddressResolver;
146
147     private SchemaContext schemaContext;
148
149     private DatastoreSnapshot restoreFromSnapshot;
150
151     private ShardManagerSnapshot currentSnapshot;
152
153     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
154
155     private final String persistenceId;
156
157     ShardManager(AbstractShardManagerCreator<?> builder) {
158         this.cluster = builder.getCluster();
159         this.configuration = builder.getConfiguration();
160         this.datastoreContextFactory = builder.getDatastoreContextFactory();
161         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
162         this.shardDispatcherPath =
163                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
164         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
165         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
166         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
167
168         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
169         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
170
171         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
172
173         // Subscribe this actor to cluster member events
174         cluster.subscribeToMemberEvents(getSelf());
175
176         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
177                 "shard-manager-" + this.type,
178                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
179         shardManagerMBean.registerMBean();
180     }
181
182     @Override
183     public void postStop() {
184         LOG.info("Stopping ShardManager {}", persistenceId());
185
186         shardManagerMBean.unregisterMBean();
187     }
188
189     @Override
190     public void handleCommand(Object message) throws Exception {
191         if (message  instanceof FindPrimary) {
192             findPrimary((FindPrimary)message);
193         } else if (message instanceof FindLocalShard) {
194             findLocalShard((FindLocalShard) message);
195         } else if (message instanceof UpdateSchemaContext) {
196             updateSchemaContext(message);
197         } else if (message instanceof ActorInitialized) {
198             onActorInitialized(message);
199         } else if (message instanceof ClusterEvent.MemberUp) {
200             memberUp((ClusterEvent.MemberUp) message);
201         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
202             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
203         } else if (message instanceof ClusterEvent.MemberExited) {
204             memberExited((ClusterEvent.MemberExited) message);
205         } else if (message instanceof ClusterEvent.MemberRemoved) {
206             memberRemoved((ClusterEvent.MemberRemoved) message);
207         } else if (message instanceof ClusterEvent.UnreachableMember) {
208             memberUnreachable((ClusterEvent.UnreachableMember) message);
209         } else if (message instanceof ClusterEvent.ReachableMember) {
210             memberReachable((ClusterEvent.ReachableMember) message);
211         } else if (message instanceof DatastoreContextFactory) {
212             onDatastoreContextFactory((DatastoreContextFactory) message);
213         } else if (message instanceof RoleChangeNotification) {
214             onRoleChangeNotification((RoleChangeNotification) message);
215         } else if (message instanceof FollowerInitialSyncUpStatus) {
216             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
217         } else if (message instanceof ShardNotInitializedTimeout) {
218             onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
219         } else if (message instanceof ShardLeaderStateChanged) {
220             onLeaderStateChanged((ShardLeaderStateChanged) message);
221         } else if (message instanceof SwitchShardBehavior) {
222             onSwitchShardBehavior((SwitchShardBehavior) message);
223         } else if (message instanceof CreateShard) {
224             onCreateShard((CreateShard) message);
225         } else if (message instanceof AddShardReplica) {
226             onAddShardReplica((AddShardReplica) message);
227         } else if (message instanceof ForwardedAddServerReply) {
228             ForwardedAddServerReply msg = (ForwardedAddServerReply) message;
229             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure);
230         } else if (message instanceof ForwardedAddServerFailure) {
231             ForwardedAddServerFailure msg = (ForwardedAddServerFailure) message;
232             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
233         } else if (message instanceof RemoveShardReplica) {
234             onRemoveShardReplica((RemoveShardReplica) message);
235         } else if (message instanceof WrappedShardResponse) {
236             onWrappedShardResponse((WrappedShardResponse) message);
237         } else if (message instanceof GetSnapshot) {
238             onGetSnapshot();
239         } else if (message instanceof ServerRemoved) {
240             onShardReplicaRemoved((ServerRemoved) message);
241         } else if (message instanceof ChangeShardMembersVotingStatus) {
242             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
243         } else if (message instanceof FlipShardMembersVotingStatus) {
244             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
245         } else if (message instanceof SaveSnapshotSuccess) {
246             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
247         } else if (message instanceof SaveSnapshotFailure) {
248             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
249                     ((SaveSnapshotFailure) message).cause());
250         } else if (message instanceof Shutdown) {
251             onShutDown();
252         } else if (message instanceof GetLocalShardIds) {
253             onGetLocalShardIds();
254         } else if (message instanceof RunnableMessage) {
255             ((RunnableMessage)message).run();
256         } else {
257             unknownMessage(message);
258         }
259     }
260
261     private void onShutDown() {
262         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
263         for (ShardInformation info : localShards.values()) {
264             if (info.getActor() != null) {
265                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
266
267                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
268                         .getElectionTimeOutInterval().$times(2);
269                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
270             }
271         }
272
273         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
274
275         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
276                 .getDispatcher(Dispatchers.DispatcherType.Client);
277         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
278
279         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
280             @Override
281             public void onComplete(Throwable failure, Iterable<Boolean> results) {
282                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
283
284                 self().tell(PoisonPill.getInstance(), self());
285
286                 if (failure != null) {
287                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
288                 } else {
289                     int nfailed = 0;
290                     for (Boolean result: results) {
291                         if (!result) {
292                             nfailed++;
293                         }
294                     }
295
296                     if (nfailed > 0) {
297                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
298                     }
299                 }
300             }
301         }, dispatcher);
302     }
303
304     private void onWrappedShardResponse(WrappedShardResponse message) {
305         if (message.getResponse() instanceof RemoveServerReply) {
306             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
307                     message.getLeaderPath());
308         }
309     }
310
311     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
312             String leaderPath) {
313         shardReplicaOperationsInProgress.remove(shardId.getShardName());
314
315         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
316
317         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
318             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
319                     shardId.getShardName());
320             originalSender.tell(new Status.Success(null), getSelf());
321         } else {
322             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
323                     persistenceId(), shardId, replyMsg.getStatus());
324
325             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
326             originalSender.tell(new Status.Failure(failure), getSelf());
327         }
328     }
329
330     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
331             final ActorRef sender) {
332         if (isShardReplicaOperationInProgress(shardName, sender)) {
333             return;
334         }
335
336         shardReplicaOperationsInProgress.add(shardName);
337
338         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
339
340         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
341
342         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
343         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
344                 primaryPath, shardId);
345
346         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
347         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
348                 new RemoveServer(shardId.toString()), removeServerTimeout);
349
350         futureObj.onComplete(new OnComplete<Object>() {
351             @Override
352             public void onComplete(Throwable failure, Object response) {
353                 if (failure != null) {
354                     shardReplicaOperationsInProgress.remove(shardName);
355                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
356                             primaryPath, shardName);
357
358                     LOG.debug("{}: {}", persistenceId(), msg, failure);
359
360                     // FAILURE
361                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
362                 } else {
363                     // SUCCESS
364                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
365                 }
366             }
367         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
368     }
369
370     private void onShardReplicaRemoved(ServerRemoved message) {
371         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
372         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
373         if (shardInformation == null) {
374             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
375             return;
376         } else if (shardInformation.getActor() != null) {
377             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
378             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
379         }
380         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
381         persistShardList();
382     }
383
384     private void onGetSnapshot() {
385         LOG.debug("{}: onGetSnapshot", persistenceId());
386
387         List<String> notInitialized = null;
388         for (ShardInformation shardInfo : localShards.values()) {
389             if (!shardInfo.isShardInitialized()) {
390                 if (notInitialized == null) {
391                     notInitialized = new ArrayList<>();
392                 }
393
394                 notInitialized.add(shardInfo.getShardName());
395             }
396         }
397
398         if (notInitialized != null) {
399             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
400                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
401             return;
402         }
403
404         byte[] shardManagerSnapshot = null;
405         if (currentSnapshot != null) {
406             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
407         }
408
409         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
410                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
411                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
412
413         for (ShardInformation shardInfo: localShards.values()) {
414             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
415         }
416     }
417
418     @SuppressWarnings("checkstyle:IllegalCatch")
419     private void onCreateShard(CreateShard createShard) {
420         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
421
422         Object reply;
423         try {
424             String shardName = createShard.getModuleShardConfig().getShardName();
425             if (localShards.containsKey(shardName)) {
426                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
427                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
428             } else {
429                 doCreateShard(createShard);
430                 reply = new Status.Success(null);
431             }
432         } catch (Exception e) {
433             LOG.error("{}: onCreateShard failed", persistenceId(), e);
434             reply = new Status.Failure(e);
435         }
436
437         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
438             getSender().tell(reply, getSelf());
439         }
440     }
441
442     private void doCreateShard(CreateShard createShard) {
443         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
444         String shardName = moduleShardConfig.getShardName();
445
446         configuration.addModuleShardConfiguration(moduleShardConfig);
447
448         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
449         if (shardDatastoreContext == null) {
450             shardDatastoreContext = newShardDatastoreContext(shardName);
451         } else {
452             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
453                     peerAddressResolver).build();
454         }
455
456         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
457
458         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
459                 && currentSnapshot.getShardList().contains(shardName);
460
461         Map<String, String> peerAddresses;
462         boolean isActiveMember;
463         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
464                 .contains(cluster.getCurrentMemberName())) {
465             peerAddresses = getPeerAddresses(shardName);
466             isActiveMember = true;
467         } else {
468             // The local member is not in the static shard member configuration and the shard did not
469             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
470             // the shard with no peers and with elections disabled so it stays as follower. A
471             // subsequent AddServer request will be needed to make it an active member.
472             isActiveMember = false;
473             peerAddresses = Collections.emptyMap();
474             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
475                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
476         }
477
478         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
479                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
480                 isActiveMember);
481
482         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
483                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
484         info.setActiveMember(isActiveMember);
485         localShards.put(info.getShardName(), info);
486
487         if (schemaContext != null) {
488             info.setActor(newShardActor(schemaContext, info));
489         }
490     }
491
492     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
493         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
494                 .shardPeerAddressResolver(peerAddressResolver);
495     }
496
497     private DatastoreContext newShardDatastoreContext(String shardName) {
498         return newShardDatastoreContextBuilder(shardName).build();
499     }
500
501     private void checkReady() {
502         if (isReadyWithLeaderId()) {
503             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
504                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
505
506             waitTillReadyCountdownLatch.countDown();
507         }
508     }
509
510     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
511         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
512
513         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
514         if (shardInformation != null) {
515             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
516             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
517             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
518                 primaryShardInfoCache.remove(shardInformation.getShardName());
519             }
520
521             checkReady();
522         } else {
523             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
524         }
525     }
526
527     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
528         ShardInformation shardInfo = message.getShardInfo();
529
530         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
531                 shardInfo.getShardName());
532
533         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
534
535         if (!shardInfo.isShardInitialized()) {
536             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
537             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
538         } else {
539             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
540             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
541         }
542     }
543
544     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
545         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
546                 status.getName(), status.isInitialSyncDone());
547
548         ShardInformation shardInformation = findShardInformation(status.getName());
549
550         if (shardInformation != null) {
551             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
552
553             shardManagerMBean.setSyncStatus(isInSync());
554         }
555
556     }
557
558     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
559         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
560                 roleChanged.getOldRole(), roleChanged.getNewRole());
561
562         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
563         if (shardInformation != null) {
564             shardInformation.setRole(roleChanged.getNewRole());
565             checkReady();
566             shardManagerMBean.setSyncStatus(isInSync());
567         }
568     }
569
570
571     private ShardInformation findShardInformation(String memberId) {
572         for (ShardInformation info : localShards.values()) {
573             if (info.getShardId().toString().equals(memberId)) {
574                 return info;
575             }
576         }
577
578         return null;
579     }
580
581     private boolean isReadyWithLeaderId() {
582         boolean isReady = true;
583         for (ShardInformation info : localShards.values()) {
584             if (!info.isShardReadyWithLeaderId()) {
585                 isReady = false;
586                 break;
587             }
588         }
589         return isReady;
590     }
591
592     private boolean isInSync() {
593         for (ShardInformation info : localShards.values()) {
594             if (!info.isInSync()) {
595                 return false;
596             }
597         }
598         return true;
599     }
600
601     private void onActorInitialized(Object message) {
602         final ActorRef sender = getSender();
603
604         if (sender == null) {
605             return; //why is a non-actor sending this message? Just ignore.
606         }
607
608         String actorName = sender.path().name();
609         //find shard name from actor name; actor name is stringified shardId
610
611         final ShardIdentifier shardId;
612         try {
613             shardId = ShardIdentifier.fromShardIdString(actorName);
614         } catch (IllegalArgumentException e) {
615             LOG.debug("{}: ignoring actor {}", actorName, e);
616             return;
617         }
618
619         markShardAsInitialized(shardId.getShardName());
620     }
621
622     private void markShardAsInitialized(String shardName) {
623         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
624
625         ShardInformation shardInformation = localShards.get(shardName);
626         if (shardInformation != null) {
627             shardInformation.setActorInitialized();
628
629             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
630         }
631     }
632
633     @Override
634     protected void handleRecover(Object message) throws Exception {
635         if (message instanceof RecoveryCompleted) {
636             onRecoveryCompleted();
637         } else if (message instanceof SnapshotOffer) {
638             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
639         }
640     }
641
642     @SuppressWarnings("checkstyle:IllegalCatch")
643     private void onRecoveryCompleted() {
644         LOG.info("Recovery complete : {}", persistenceId());
645
646         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
647         // journal on upgrade from Helium.
648         deleteMessages(lastSequenceNr());
649
650         if (currentSnapshot == null && restoreFromSnapshot != null
651                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
652             try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
653                     restoreFromSnapshot.getShardManagerSnapshot()))) {
654                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
655
656                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
657
658                 applyShardManagerSnapshot(snapshot);
659             } catch (ClassNotFoundException | IOException e) {
660                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
661             }
662         }
663
664         createLocalShards();
665     }
666
667     private void sendResponse(ShardInformation shardInformation, boolean doWait,
668             boolean wantShardReady, final Supplier<Object> messageSupplier) {
669         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
670             if (doWait) {
671                 final ActorRef sender = getSender();
672                 final ActorRef self = self();
673
674                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
675
676                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
677                     new OnShardInitialized(replyRunnable);
678
679                 shardInformation.addOnShardInitialized(onShardInitialized);
680
681                 FiniteDuration timeout = shardInformation.getDatastoreContext()
682                         .getShardInitializationTimeout().duration();
683                 if (shardInformation.isShardInitialized()) {
684                     // If the shard is already initialized then we'll wait enough time for the shard to
685                     // elect a leader, ie 2 times the election timeout.
686                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
687                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
688                 }
689
690                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
691                         shardInformation.getShardName());
692
693                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
694                         timeout, getSelf(),
695                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
696                         getContext().dispatcher(), getSelf());
697
698                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
699
700             } else if (!shardInformation.isShardInitialized()) {
701                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
702                         shardInformation.getShardName());
703                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
704             } else {
705                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
706                         shardInformation.getShardName());
707                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
708             }
709
710             return;
711         }
712
713         getSender().tell(messageSupplier.get(), getSelf());
714     }
715
716     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
717         return new NoShardLeaderException(null, shardId.toString());
718     }
719
720     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
721         return new NotInitializedException(String.format(
722                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
723     }
724
725     @VisibleForTesting
726     static MemberName memberToName(final Member member) {
727         return MemberName.forName(member.roles().iterator().next());
728     }
729
730     private void memberRemoved(ClusterEvent.MemberRemoved message) {
731         MemberName memberName = memberToName(message.member());
732
733         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
734                 message.member().address());
735
736         peerAddressResolver.removePeerAddress(memberName);
737
738         for (ShardInformation info : localShards.values()) {
739             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
740         }
741     }
742
743     private void memberExited(ClusterEvent.MemberExited message) {
744         MemberName memberName = memberToName(message.member());
745
746         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
747                 message.member().address());
748
749         peerAddressResolver.removePeerAddress(memberName);
750
751         for (ShardInformation info : localShards.values()) {
752             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
753         }
754     }
755
756     private void memberUp(ClusterEvent.MemberUp message) {
757         MemberName memberName = memberToName(message.member());
758
759         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
760                 message.member().address());
761
762         memberUp(memberName, message.member().address());
763     }
764
765     private void memberUp(MemberName memberName, Address address) {
766         addPeerAddress(memberName, address);
767         checkReady();
768     }
769
770     private void memberWeaklyUp(MemberWeaklyUp message) {
771         MemberName memberName = memberToName(message.member());
772
773         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
774                 message.member().address());
775
776         memberUp(memberName, message.member().address());
777     }
778
779     private void addPeerAddress(MemberName memberName, Address address) {
780         peerAddressResolver.addPeerAddress(memberName, address);
781
782         for (ShardInformation info : localShards.values()) {
783             String shardName = info.getShardName();
784             String peerId = getShardIdentifier(memberName, shardName).toString();
785             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
786
787             info.peerUp(memberName, peerId, getSelf());
788         }
789     }
790
791     private void memberReachable(ClusterEvent.ReachableMember message) {
792         MemberName memberName = memberToName(message.member());
793         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
794
795         addPeerAddress(memberName, message.member().address());
796
797         markMemberAvailable(memberName);
798     }
799
800     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
801         MemberName memberName = memberToName(message.member());
802         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
803
804         markMemberUnavailable(memberName);
805     }
806
807     private void markMemberUnavailable(final MemberName memberName) {
808         final String memberStr = memberName.getName();
809         for (ShardInformation info : localShards.values()) {
810             String leaderId = info.getLeaderId();
811             // XXX: why are we using String#contains() here?
812             if (leaderId != null && leaderId.contains(memberStr)) {
813                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
814                 info.setLeaderAvailable(false);
815
816                 primaryShardInfoCache.remove(info.getShardName());
817             }
818
819             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
820         }
821     }
822
823     private void markMemberAvailable(final MemberName memberName) {
824         final String memberStr = memberName.getName();
825         for (ShardInformation info : localShards.values()) {
826             String leaderId = info.getLeaderId();
827             // XXX: why are we using String#contains() here?
828             if (leaderId != null && leaderId.contains(memberStr)) {
829                 LOG.debug("Marking Leader {} as available.", leaderId);
830                 info.setLeaderAvailable(true);
831             }
832
833             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
834         }
835     }
836
837     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
838         datastoreContextFactory = factory;
839         for (ShardInformation info : localShards.values()) {
840             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
841         }
842     }
843
844     private void onGetLocalShardIds() {
845         final List<String> response = new ArrayList<>(localShards.size());
846
847         for (ShardInformation info : localShards.values()) {
848             response.add(info.getShardId().toString());
849         }
850
851         getSender().tell(new Status.Success(response), getSelf());
852     }
853
854     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
855         final ShardIdentifier identifier = message.getShardId();
856
857         if (identifier != null) {
858             final ShardInformation info = localShards.get(identifier.getShardName());
859             if (info == null) {
860                 getSender().tell(new Status.Failure(
861                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
862                 return;
863             }
864
865             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
866         } else {
867             for (ShardInformation info : localShards.values()) {
868                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
869             }
870         }
871
872         getSender().tell(new Status.Success(null), getSelf());
873     }
874
875     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
876         final ActorRef actor = info.getActor();
877         if (actor != null) {
878             actor.tell(switchBehavior, getSelf());
879         } else {
880             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
881                 info.getShardName(), switchBehavior.getNewState());
882         }
883     }
884
885     /**
886      * Notifies all the local shards of a change in the schema context.
887      *
888      * @param message the message to send
889      */
890     private void updateSchemaContext(final Object message) {
891         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
892
893         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
894
895         for (ShardInformation info : localShards.values()) {
896             if (info.getActor() == null) {
897                 LOG.debug("Creating Shard {}", info.getShardId());
898                 info.setActor(newShardActor(schemaContext, info));
899             } else {
900                 info.getActor().tell(message, getSelf());
901             }
902         }
903     }
904
905     @VisibleForTesting
906     protected ClusterWrapper getCluster() {
907         return cluster;
908     }
909
910     @VisibleForTesting
911     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
912         return getContext().actorOf(info.newProps(schemaContext)
913                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
914     }
915
916     private void findPrimary(FindPrimary message) {
917         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
918
919         final String shardName = message.getShardName();
920         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
921
922         // First see if the there is a local replica for the shard
923         final ShardInformation info = localShards.get(shardName);
924         if (info != null && info.isActiveMember()) {
925             sendResponse(info, message.isWaitUntilReady(), true, () -> {
926                 String primaryPath = info.getSerializedLeaderActor();
927                 Object found = canReturnLocalShardState && info.isLeader()
928                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
929                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
930
931                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
932                 return found;
933             });
934
935             return;
936         }
937
938         final Collection<String> visitedAddresses;
939         if (message instanceof RemoteFindPrimary) {
940             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
941         } else {
942             visitedAddresses = new ArrayList<>(1);
943         }
944
945         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
946
947         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
948             if (visitedAddresses.contains(address)) {
949                 continue;
950             }
951
952             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
953                     persistenceId(), shardName, address, visitedAddresses);
954
955             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
956                     message.isWaitUntilReady(), visitedAddresses), getContext());
957             return;
958         }
959
960         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
961
962         getSender().tell(new PrimaryNotFoundException(
963                 String.format("No primary shard found for %s.", shardName)), getSelf());
964     }
965
966     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
967         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
968                 .getShardInitializationTimeout().duration().$times(2));
969
970         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
971         futureObj.onComplete(new OnComplete<Object>() {
972             @Override
973             public void onComplete(Throwable failure, Object response) {
974                 if (failure != null) {
975                     handler.onFailure(failure);
976                 } else {
977                     if (response instanceof RemotePrimaryShardFound) {
978                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
979                     } else if (response instanceof LocalPrimaryShardFound) {
980                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
981                     } else {
982                         handler.onUnknownResponse(response);
983                     }
984                 }
985             }
986         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
987     }
988
989     /**
990      * Construct the name of the shard actor given the name of the member on
991      * which the shard resides and the name of the shard.
992      *
993      * @param memberName the member name
994      * @param shardName the shard name
995      * @return a b
996      */
997     private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
998         return peerAddressResolver.getShardIdentifier(memberName, shardName);
999     }
1000
1001     /**
1002      * Create shards that are local to the member on which the ShardManager runs.
1003      */
1004     private void createLocalShards() {
1005         MemberName memberName = this.cluster.getCurrentMemberName();
1006         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
1007
1008         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1009         if (restoreFromSnapshot != null) {
1010             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1011                 shardSnapshots.put(snapshot.getName(), snapshot);
1012             }
1013         }
1014
1015         restoreFromSnapshot = null; // null out to GC
1016
1017         for (String shardName : memberShardNames) {
1018             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1019
1020             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1021
1022             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1023             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
1024                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
1025                         shardSnapshots.get(shardName)), peerAddressResolver));
1026         }
1027     }
1028
1029     /**
1030      * Given the name of the shard find the addresses of all it's peers.
1031      *
1032      * @param shardName the shard name
1033      */
1034     private Map<String, String> getPeerAddresses(String shardName) {
1035         Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1036         Map<String, String> peerAddresses = new HashMap<>();
1037
1038         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1039
1040         for (MemberName memberName : members) {
1041             if (!currentMemberName.equals(memberName)) {
1042                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1043                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1044                 peerAddresses.put(shardId.toString(), address);
1045             }
1046         }
1047         return peerAddresses;
1048     }
1049
1050     @Override
1051     public SupervisorStrategy supervisorStrategy() {
1052
1053         return new OneForOneStrategy(10, Duration.create("1 minute"),
1054                 (Function<Throwable, Directive>) t -> {
1055                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1056                     return SupervisorStrategy.resume();
1057                 });
1058     }
1059
1060     @Override
1061     public String persistenceId() {
1062         return persistenceId;
1063     }
1064
1065     @VisibleForTesting
1066     ShardManagerInfoMBean getMBean() {
1067         return shardManagerMBean;
1068     }
1069
1070     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1071         if (shardReplicaOperationsInProgress.contains(shardName)) {
1072             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1073             LOG.debug("{}: {}", persistenceId(), msg);
1074             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1075             return true;
1076         }
1077
1078         return false;
1079     }
1080
1081     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1082         final String shardName = shardReplicaMsg.getShardName();
1083
1084         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1085
1086         // verify the shard with the specified name is present in the cluster configuration
1087         if (!this.configuration.isShardConfigured(shardName)) {
1088             String msg = String.format("No module configuration exists for shard %s", shardName);
1089             LOG.debug("{}: {}", persistenceId(), msg);
1090             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1091             return;
1092         }
1093
1094         // Create the localShard
1095         if (schemaContext == null) {
1096             String msg = String.format(
1097                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1098             LOG.debug("{}: {}", persistenceId(), msg);
1099             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1100             return;
1101         }
1102
1103         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1104                 getSelf()) {
1105             @Override
1106             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1107                 getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()),
1108                         getTargetActor());
1109             }
1110
1111             @Override
1112             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1113                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1114             }
1115
1116         });
1117     }
1118
1119     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1120         String msg = String.format("Local shard %s already exists", shardName);
1121         LOG.debug("{}: {}", persistenceId(), msg);
1122         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1123     }
1124
1125     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1126         if (isShardReplicaOperationInProgress(shardName, sender)) {
1127             return;
1128         }
1129
1130         shardReplicaOperationsInProgress.add(shardName);
1131
1132         final ShardInformation shardInfo;
1133         final boolean removeShardOnFailure;
1134         ShardInformation existingShardInfo = localShards.get(shardName);
1135         if (existingShardInfo == null) {
1136             removeShardOnFailure = true;
1137             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1138
1139             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1140                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1141
1142             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1143                     Shard.builder(), peerAddressResolver);
1144             shardInfo.setActiveMember(false);
1145             localShards.put(shardName, shardInfo);
1146             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1147         } else {
1148             removeShardOnFailure = false;
1149             shardInfo = existingShardInfo;
1150         }
1151
1152         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1153
1154         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1155         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1156                 response.getPrimaryPath(), shardInfo.getShardId());
1157
1158         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1159                 .getShardLeaderElectionTimeout().duration());
1160         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1161             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1162
1163         futureObj.onComplete(new OnComplete<Object>() {
1164             @Override
1165             public void onComplete(Throwable failure, Object addServerResponse) {
1166                 if (failure != null) {
1167                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1168                             response.getPrimaryPath(), shardName, failure);
1169
1170                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1171                             response.getPrimaryPath(), shardName);
1172                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1173                 } else {
1174                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1175                             response.getPrimaryPath(), removeShardOnFailure), sender);
1176                 }
1177             }
1178         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1179     }
1180
1181     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1182             boolean removeShardOnFailure) {
1183         shardReplicaOperationsInProgress.remove(shardName);
1184
1185         if (removeShardOnFailure) {
1186             ShardInformation shardInfo = localShards.remove(shardName);
1187             if (shardInfo.getActor() != null) {
1188                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1189             }
1190         }
1191
1192         sender.tell(new Status.Failure(message == null ? failure :
1193             new RuntimeException(message, failure)), getSelf());
1194     }
1195
1196     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1197             String leaderPath, boolean removeShardOnFailure) {
1198         String shardName = shardInfo.getShardName();
1199         shardReplicaOperationsInProgress.remove(shardName);
1200
1201         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1202
1203         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1204             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1205
1206             // Make the local shard voting capable
1207             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1208             shardInfo.setActiveMember(true);
1209             persistShardList();
1210
1211             sender.tell(new Status.Success(null), getSelf());
1212         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1213             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1214         } else {
1215             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1216                     persistenceId(), shardName, replyMsg.getStatus());
1217
1218             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1219                     shardInfo.getShardId());
1220
1221             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1222         }
1223     }
1224
1225     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1226                                                String leaderPath, ShardIdentifier shardId) {
1227         Exception failure;
1228         switch (serverChangeStatus) {
1229             case TIMEOUT:
1230                 failure = new TimeoutException(String.format(
1231                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
1232                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
1233                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
1234                 break;
1235             case NO_LEADER:
1236                 failure = createNoShardLeaderException(shardId);
1237                 break;
1238             case NOT_SUPPORTED:
1239                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1240                         serverChange.getSimpleName(), shardId.getShardName()));
1241                 break;
1242             default :
1243                 failure = new RuntimeException(String.format(
1244                         "%s request to leader %s for shard %s failed with status %s",
1245                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1246         }
1247         return failure;
1248     }
1249
1250     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1251         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1252
1253         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1254                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1255             @Override
1256             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1257                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1258             }
1259
1260             @Override
1261             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1262                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1263             }
1264
1265             private void doRemoveShardReplicaAsync(final String primaryPath) {
1266                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1267                         primaryPath, getSender()), getTargetActor());
1268             }
1269         });
1270     }
1271
1272     private void persistShardList() {
1273         List<String> shardList = new ArrayList<>(localShards.keySet());
1274         for (ShardInformation shardInfo : localShards.values()) {
1275             if (!shardInfo.isActiveMember()) {
1276                 shardList.remove(shardInfo.getShardName());
1277             }
1278         }
1279         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1280         saveSnapshot(updateShardManagerSnapshot(shardList));
1281     }
1282
1283     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1284         currentSnapshot = new ShardManagerSnapshot(shardList);
1285         return currentSnapshot;
1286     }
1287
1288     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1289         currentSnapshot = snapshot;
1290
1291         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1292
1293         final MemberName currentMember = cluster.getCurrentMemberName();
1294         Set<String> configuredShardList =
1295             new HashSet<>(configuration.getMemberShardNames(currentMember));
1296         for (String shard : currentSnapshot.getShardList()) {
1297             if (!configuredShardList.contains(shard)) {
1298                 // add the current member as a replica for the shard
1299                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1300                 configuration.addMemberReplicaForShard(shard, currentMember);
1301             } else {
1302                 configuredShardList.remove(shard);
1303             }
1304         }
1305         for (String shard : configuredShardList) {
1306             // remove the member as a replica for the shard
1307             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1308             configuration.removeMemberReplicaForShard(shard, currentMember);
1309         }
1310     }
1311
1312     private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
1313         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1314             persistenceId());
1315         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1316             0, 0));
1317     }
1318
1319     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1320         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1321
1322         String shardName = changeMembersVotingStatus.getShardName();
1323         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1324         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1325             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1326                     e.getValue());
1327         }
1328
1329         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1330
1331         findLocalShard(shardName, getSender(),
1332             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1333             localShardFound.getPath(), getSender()));
1334     }
1335
1336     private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
1337         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1338
1339         ActorRef sender = getSender();
1340         final String shardName = flipMembersVotingStatus.getShardName();
1341         findLocalShard(shardName, sender, localShardFound -> {
1342             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1343                     Timeout.apply(30, TimeUnit.SECONDS));
1344
1345             future.onComplete(new OnComplete<Object>() {
1346                 @Override
1347                 public void onComplete(Throwable failure, Object response) {
1348                     if (failure != null) {
1349                         sender.tell(new Status.Failure(new RuntimeException(
1350                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1351                         return;
1352                     }
1353
1354                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1355                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1356                     for ( Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1357                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1358                     }
1359
1360                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1361                             .toString(), !raftState.isVoting());
1362
1363                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1364                             shardName, localShardFound.getPath(), sender);
1365                 }
1366             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1367         });
1368
1369     }
1370
1371     private void findLocalShard(FindLocalShard message) {
1372         final ShardInformation shardInformation = localShards.get(message.getShardName());
1373
1374         if (shardInformation == null) {
1375             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1376             return;
1377         }
1378
1379         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1380             () -> new LocalShardFound(shardInformation.getActor()));
1381     }
1382
1383     private void findLocalShard(final String shardName, final ActorRef sender,
1384             final Consumer<LocalShardFound> onLocalShardFound) {
1385         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1386                 .getShardInitializationTimeout().duration().$times(2));
1387
1388         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1389         futureObj.onComplete(new OnComplete<Object>() {
1390             @Override
1391             public void onComplete(Throwable failure, Object response) {
1392                 if (failure != null) {
1393                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1394                             failure);
1395                     sender.tell(new Status.Failure(new RuntimeException(
1396                             String.format("Failed to find local shard %s", shardName), failure)), self());
1397                 } else {
1398                     if (response instanceof LocalShardFound) {
1399                         getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
1400                                 sender);
1401                     } else if (response instanceof LocalShardNotFound) {
1402                         String msg = String.format("Local shard %s does not exist", shardName);
1403                         LOG.debug("{}: {}", persistenceId, msg);
1404                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
1405                     } else {
1406                         String msg = String.format("Failed to find local shard %s: received response: %s",
1407                                 shardName, response);
1408                         LOG.debug("{}: {}", persistenceId, msg);
1409                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1410                                 new RuntimeException(msg)), self());
1411                     }
1412                 }
1413             }
1414         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1415     }
1416
1417     private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
1418             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1419         if (isShardReplicaOperationInProgress(shardName, sender)) {
1420             return;
1421         }
1422
1423         shardReplicaOperationsInProgress.add(shardName);
1424
1425         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1426         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1427
1428         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1429                 changeServersVotingStatus, shardActorRef.path());
1430
1431         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1432         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1433
1434         futureObj.onComplete(new OnComplete<Object>() {
1435             @Override
1436             public void onComplete(Throwable failure, Object response) {
1437                 shardReplicaOperationsInProgress.remove(shardName);
1438                 if (failure != null) {
1439                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
1440                             shardActorRef.path());
1441                     LOG.debug("{}: {}", persistenceId(), msg, failure);
1442                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
1443                 } else {
1444                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1445
1446                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1447                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1448                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1449                         sender.tell(new Status.Success(null), getSelf());
1450                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1451                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1452                                 "The requested voting state change for shard %s is invalid. At least one member "
1453                                 + "must be voting", shardId.getShardName()))), getSelf());
1454                     } else {
1455                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1456                                 persistenceId(), shardName, replyMsg.getStatus());
1457
1458                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1459                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1460                         sender.tell(new Status.Failure(error), getSelf());
1461                     }
1462                 }
1463             }
1464         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1465     }
1466
1467     private static final class ForwardedAddServerReply {
1468         ShardInformation shardInfo;
1469         AddServerReply addServerReply;
1470         String leaderPath;
1471         boolean removeShardOnFailure;
1472
1473         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1474                 boolean removeShardOnFailure) {
1475             this.shardInfo = shardInfo;
1476             this.addServerReply = addServerReply;
1477             this.leaderPath = leaderPath;
1478             this.removeShardOnFailure = removeShardOnFailure;
1479         }
1480     }
1481
1482     private static final class ForwardedAddServerFailure {
1483         String shardName;
1484         String failureMessage;
1485         Throwable failure;
1486         boolean removeShardOnFailure;
1487
1488         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1489                 boolean removeShardOnFailure) {
1490             this.shardName = shardName;
1491             this.failureMessage = failureMessage;
1492             this.failure = failure;
1493             this.removeShardOnFailure = removeShardOnFailure;
1494         }
1495     }
1496
1497     static class OnShardInitialized {
1498         private final Runnable replyRunnable;
1499         private Cancellable timeoutSchedule;
1500
1501         OnShardInitialized(Runnable replyRunnable) {
1502             this.replyRunnable = replyRunnable;
1503         }
1504
1505         Runnable getReplyRunnable() {
1506             return replyRunnable;
1507         }
1508
1509         Cancellable getTimeoutSchedule() {
1510             return timeoutSchedule;
1511         }
1512
1513         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1514             this.timeoutSchedule = timeoutSchedule;
1515         }
1516     }
1517
1518     static class OnShardReady extends OnShardInitialized {
1519         OnShardReady(Runnable replyRunnable) {
1520             super(replyRunnable);
1521         }
1522     }
1523
1524     private interface RunnableMessage extends Runnable {
1525     }
1526
1527     /**
1528      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1529      * a remote or local find primary message is processed.
1530      */
1531     private interface FindPrimaryResponseHandler {
1532         /**
1533          * Invoked when a Failure message is received as a response.
1534          *
1535          * @param failure the failure exception
1536          */
1537         void onFailure(Throwable failure);
1538
1539         /**
1540          * Invoked when a RemotePrimaryShardFound response is received.
1541          *
1542          * @param response the response
1543          */
1544         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1545
1546         /**
1547          * Invoked when a LocalPrimaryShardFound response is received.
1548          *
1549          * @param response the response
1550          */
1551         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1552
1553         /**
1554          * Invoked when an unknown response is received. This is another type of failure.
1555          *
1556          * @param response the response
1557          */
1558         void onUnknownResponse(Object response);
1559     }
1560
1561     /**
1562      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1563      * replica and sends a wrapped Failure response to some targetActor.
1564      */
1565     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1566         private final ActorRef targetActor;
1567         private final String shardName;
1568         private final String persistenceId;
1569         private final ActorRef shardManagerActor;
1570
1571         /**
1572          * Constructs an instance.
1573          *
1574          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1575          * @param shardName The name of the shard for which the primary replica had to be found
1576          * @param persistenceId The persistenceId for the ShardManager
1577          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1578          */
1579         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
1580                 ActorRef shardManagerActor) {
1581             this.targetActor = Preconditions.checkNotNull(targetActor);
1582             this.shardName = Preconditions.checkNotNull(shardName);
1583             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1584             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1585         }
1586
1587         public ActorRef getTargetActor() {
1588             return targetActor;
1589         }
1590
1591         public String getShardName() {
1592             return shardName;
1593         }
1594
1595         @Override
1596         public void onFailure(Throwable failure) {
1597             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1598             targetActor.tell(new Status.Failure(new RuntimeException(
1599                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1600         }
1601
1602         @Override
1603         public void onUnknownResponse(Object response) {
1604             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1605                     shardName, response);
1606             LOG.debug("{}: {}", persistenceId, msg);
1607             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1608                     new RuntimeException(msg)), shardManagerActor);
1609         }
1610     }
1611
1612     /**
1613      * The WrappedShardResponse class wraps a response from a Shard.
1614      */
1615     private static final class WrappedShardResponse {
1616         private final ShardIdentifier shardId;
1617         private final Object response;
1618         private final String leaderPath;
1619
1620         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1621             this.shardId = shardId;
1622             this.response = response;
1623             this.leaderPath = leaderPath;
1624         }
1625
1626         ShardIdentifier getShardId() {
1627             return shardId;
1628         }
1629
1630         Object getResponse() {
1631             return response;
1632         }
1633
1634         String getLeaderPath() {
1635             return leaderPath;
1636         }
1637     }
1638
1639     private static final class ShardNotInitializedTimeout {
1640         private final ActorRef sender;
1641         private final ShardInformation shardInfo;
1642         private final OnShardInitialized onShardInitialized;
1643
1644         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1645             this.sender = sender;
1646             this.shardInfo = shardInfo;
1647             this.onShardInitialized = onShardInitialized;
1648         }
1649
1650         ActorRef getSender() {
1651             return sender;
1652         }
1653
1654         ShardInformation getShardInfo() {
1655             return shardInfo;
1656         }
1657
1658         OnShardInitialized getOnShardInitialized() {
1659             return onShardInitialized;
1660         }
1661     }
1662 }
1663
1664
1665