Bug 8380: Fix unhandled messages in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.actor.SupervisorStrategy.Directive;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberWeaklyUp;
23 import akka.cluster.Member;
24 import akka.dispatch.Futures;
25 import akka.dispatch.OnComplete;
26 import akka.japi.Function;
27 import akka.pattern.Patterns;
28 import akka.persistence.DeleteSnapshotsFailure;
29 import akka.persistence.DeleteSnapshotsSuccess;
30 import akka.persistence.RecoveryCompleted;
31 import akka.persistence.SaveSnapshotFailure;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.persistence.SnapshotOffer;
34 import akka.persistence.SnapshotSelectionCriteria;
35 import akka.util.Timeout;
36 import com.google.common.annotations.VisibleForTesting;
37 import com.google.common.base.Preconditions;
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.Collections;
41 import java.util.HashMap;
42 import java.util.HashSet;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Map.Entry;
46 import java.util.Set;
47 import java.util.concurrent.CountDownLatch;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.TimeoutException;
50 import java.util.function.Consumer;
51 import java.util.function.Supplier;
52 import org.opendaylight.controller.cluster.access.concepts.MemberName;
53 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
54 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
55 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
57 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
58 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
59 import org.opendaylight.controller.cluster.datastore.Shard;
60 import org.opendaylight.controller.cluster.datastore.config.Configuration;
61 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
62 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
63 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
64 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
65 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
66 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
67 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
68 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
69 import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
70 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
71 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
72 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
73 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
74 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
75 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
77 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
78 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
79 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
80 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
81 import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
82 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
83 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
84 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
85 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
86 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
87 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
88 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
89 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
90 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
91 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
92 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
93 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
94 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
95 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
96 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
97 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
98 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
99 import org.opendaylight.controller.cluster.raft.messages.AddServer;
100 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
101 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
102 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
103 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
104 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
105 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
106 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
107 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
108 import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
109 import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
110 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
111 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
112 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
113 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
114 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
115 import org.opendaylight.yangtools.concepts.ListenerRegistration;
116 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
117 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
118 import org.slf4j.Logger;
119 import org.slf4j.LoggerFactory;
120 import scala.concurrent.ExecutionContext;
121 import scala.concurrent.Future;
122 import scala.concurrent.duration.Duration;
123 import scala.concurrent.duration.FiniteDuration;
124
125 /**
126  * Manages the shards for a data store. The ShardManager has the following jobs:
127  * <ul>
128  * <li> Create all the local shard replicas that belong on this cluster member
129  * <li> Find the address of the local shard
130  * <li> Find the primary replica for any given shard
131  * <li> Monitor the cluster members and store their addresses
132  * </ul>
133  */
134 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
135     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
136
137     // Stores a mapping between a shard name and it's corresponding information
138     // Shard names look like inventory, topology etc and are as specified in
139     // configuration
140     private final Map<String, ShardInformation> localShards = new HashMap<>();
141
142     // The type of a ShardManager reflects the type of the datastore itself
143     // A data store could be of type config/operational
144     private final String type;
145
146     private final ClusterWrapper cluster;
147
148     private final Configuration configuration;
149
150     private final String shardDispatcherPath;
151
152     private final ShardManagerInfo shardManagerMBean;
153
154     private DatastoreContextFactory datastoreContextFactory;
155
156     private final CountDownLatch waitTillReadyCountdownLatch;
157
158     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
159
160     private final ShardPeerAddressResolver peerAddressResolver;
161
162     private SchemaContext schemaContext;
163
164     private DatastoreSnapshot restoreFromSnapshot;
165
166     private ShardManagerSnapshot currentSnapshot;
167
168     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
169
170     private final Map<String, Future<Boolean>> shardActorStoppingFutures = new HashMap<>();
171
172     private final String persistenceId;
173     private final AbstractDataStore dataStore;
174
175     private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
176     private PrefixedShardConfigUpdateHandler configUpdateHandler;
177
178     ShardManager(AbstractShardManagerCreator<?> builder) {
179         this.cluster = builder.getCluster();
180         this.configuration = builder.getConfiguration();
181         this.datastoreContextFactory = builder.getDatastoreContextFactory();
182         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
183         this.shardDispatcherPath =
184                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
185         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
186         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
187         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
188
189         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
190         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
191
192         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
193
194         // Subscribe this actor to cluster member events
195         cluster.subscribeToMemberEvents(getSelf());
196
197         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
198                 "shard-manager-" + this.type,
199                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
200         shardManagerMBean.registerMBean();
201
202         dataStore = builder.getDistributedDataStore();
203     }
204
205     @Override
206     public void preStart() {
207         LOG.info("Starting ShardManager {}", persistenceId);
208     }
209
210     @Override
211     public void postStop() {
212         LOG.info("Stopping ShardManager {}", persistenceId());
213
214         shardManagerMBean.unregisterMBean();
215
216         if (configListenerReg != null) {
217             configListenerReg.close();
218             configListenerReg = null;
219         }
220     }
221
222     @Override
223     public void handleCommand(Object message) throws Exception {
224         if (message  instanceof FindPrimary) {
225             findPrimary((FindPrimary)message);
226         } else if (message instanceof FindLocalShard) {
227             findLocalShard((FindLocalShard) message);
228         } else if (message instanceof UpdateSchemaContext) {
229             updateSchemaContext(message);
230         } else if (message instanceof ActorInitialized) {
231             onActorInitialized(message);
232         } else if (message instanceof ClusterEvent.MemberUp) {
233             memberUp((ClusterEvent.MemberUp) message);
234         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
235             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
236         } else if (message instanceof ClusterEvent.MemberExited) {
237             memberExited((ClusterEvent.MemberExited) message);
238         } else if (message instanceof ClusterEvent.MemberRemoved) {
239             memberRemoved((ClusterEvent.MemberRemoved) message);
240         } else if (message instanceof ClusterEvent.UnreachableMember) {
241             memberUnreachable((ClusterEvent.UnreachableMember) message);
242         } else if (message instanceof ClusterEvent.ReachableMember) {
243             memberReachable((ClusterEvent.ReachableMember) message);
244         } else if (message instanceof DatastoreContextFactory) {
245             onDatastoreContextFactory((DatastoreContextFactory) message);
246         } else if (message instanceof RoleChangeNotification) {
247             onRoleChangeNotification((RoleChangeNotification) message);
248         } else if (message instanceof FollowerInitialSyncUpStatus) {
249             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
250         } else if (message instanceof ShardNotInitializedTimeout) {
251             onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
252         } else if (message instanceof ShardLeaderStateChanged) {
253             onLeaderStateChanged((ShardLeaderStateChanged) message);
254         } else if (message instanceof SwitchShardBehavior) {
255             onSwitchShardBehavior((SwitchShardBehavior) message);
256         } else if (message instanceof CreateShard) {
257             onCreateShard((CreateShard)message);
258         } else if (message instanceof AddShardReplica) {
259             onAddShardReplica((AddShardReplica) message);
260         } else if (message instanceof AddPrefixShardReplica) {
261             onAddPrefixShardReplica((AddPrefixShardReplica) message);
262         } else if (message instanceof PrefixShardCreated) {
263             onPrefixShardCreated((PrefixShardCreated) message);
264         } else if (message instanceof PrefixShardRemoved) {
265             onPrefixShardRemoved((PrefixShardRemoved) message);
266         } else if (message instanceof InitConfigListener) {
267             onInitConfigListener();
268         } else if (message instanceof ForwardedAddServerReply) {
269             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
270             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
271                     msg.removeShardOnFailure);
272         } else if (message instanceof ForwardedAddServerFailure) {
273             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
274             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
275         } else if (message instanceof RemoveShardReplica) {
276             onRemoveShardReplica((RemoveShardReplica) message);
277         } else if (message instanceof RemovePrefixShardReplica) {
278             onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
279         } else if (message instanceof WrappedShardResponse) {
280             onWrappedShardResponse((WrappedShardResponse) message);
281         } else if (message instanceof GetSnapshot) {
282             onGetSnapshot();
283         } else if (message instanceof ServerRemoved) {
284             onShardReplicaRemoved((ServerRemoved) message);
285         } else if (message instanceof ChangeShardMembersVotingStatus) {
286             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
287         } else if (message instanceof FlipShardMembersVotingStatus) {
288             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
289         } else if (message instanceof SaveSnapshotSuccess) {
290             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
291         } else if (message instanceof SaveSnapshotFailure) {
292             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
293                     ((SaveSnapshotFailure) message).cause());
294         } else if (message instanceof Shutdown) {
295             onShutDown();
296         } else if (message instanceof GetLocalShardIds) {
297             onGetLocalShardIds();
298         } else if (message instanceof RunnableMessage) {
299             ((RunnableMessage)message).run();
300         } else if (message instanceof DeleteSnapshotsFailure) {
301             LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
302                     ((DeleteSnapshotsFailure) message).cause());
303         } else if (message instanceof DeleteSnapshotsSuccess) {
304             LOG.debug("{}: Successfully deleted prior snapshots", persistenceId(), message);
305         } else if (message instanceof RegisterRoleChangeListenerReply) {
306             LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
307         } else if (message instanceof ClusterEvent.MemberEvent) {
308             LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message);
309         } else {
310             unknownMessage(message);
311         }
312     }
313
314     private void onInitConfigListener() {
315         LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
316
317         final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
318                 org.opendaylight.mdsal.common.api.LogicalDatastoreType
319                         .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
320
321         if (configUpdateHandler != null) {
322             configUpdateHandler.close();
323         }
324
325         configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
326         configUpdateHandler.initListener(dataStore, type);
327     }
328
329     private void onShutDown() {
330         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
331         for (ShardInformation info : localShards.values()) {
332             if (info.getActor() != null) {
333                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
334
335                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
336                         .getElectionTimeOutInterval().$times(2);
337                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
338             }
339         }
340
341         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
342
343         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
344                 .getDispatcher(Dispatchers.DispatcherType.Client);
345         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
346
347         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
348             @Override
349             public void onComplete(Throwable failure, Iterable<Boolean> results) {
350                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
351
352                 self().tell(PoisonPill.getInstance(), self());
353
354                 if (failure != null) {
355                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
356                 } else {
357                     int nfailed = 0;
358                     for (Boolean result : results) {
359                         if (!result) {
360                             nfailed++;
361                         }
362                     }
363
364                     if (nfailed > 0) {
365                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
366                     }
367                 }
368             }
369         }, dispatcher);
370     }
371
372     private void onWrappedShardResponse(WrappedShardResponse message) {
373         if (message.getResponse() instanceof RemoveServerReply) {
374             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
375                     message.getLeaderPath());
376         }
377     }
378
379     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
380             String leaderPath) {
381         shardReplicaOperationsInProgress.remove(shardId.getShardName());
382
383         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
384
385         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
386             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
387                     shardId.getShardName());
388             originalSender.tell(new Status.Success(null), getSelf());
389         } else {
390             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
391                     persistenceId(), shardId, replyMsg.getStatus());
392
393             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
394             originalSender.tell(new Status.Failure(failure), getSelf());
395         }
396     }
397
398     private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
399                                           final String primaryPath, final ActorRef sender) {
400         if (isShardReplicaOperationInProgress(shardName, sender)) {
401             return;
402         }
403
404         shardReplicaOperationsInProgress.add(shardName);
405
406         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
407
408         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
409
410         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
411         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
412                 primaryPath, shardId);
413
414         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
415         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
416                 new RemoveServer(shardId.toString()), removeServerTimeout);
417
418         futureObj.onComplete(new OnComplete<Object>() {
419             @Override
420             public void onComplete(Throwable failure, Object response) {
421                 if (failure != null) {
422                     shardReplicaOperationsInProgress.remove(shardName);
423                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
424                             primaryPath, shardName);
425
426                     LOG.debug("{}: {}", persistenceId(), msg, failure);
427
428                     // FAILURE
429                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
430                 } else {
431                     // SUCCESS
432                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
433                 }
434             }
435         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
436     }
437
438     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
439             final ActorRef sender) {
440         if (isShardReplicaOperationInProgress(shardName, sender)) {
441             return;
442         }
443
444         shardReplicaOperationsInProgress.add(shardName);
445
446         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
447
448         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
449
450         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
451         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
452                 primaryPath, shardId);
453
454         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
455         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
456                 new RemoveServer(shardId.toString()), removeServerTimeout);
457
458         futureObj.onComplete(new OnComplete<Object>() {
459             @Override
460             public void onComplete(Throwable failure, Object response) {
461                 if (failure != null) {
462                     shardReplicaOperationsInProgress.remove(shardName);
463                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
464                             primaryPath, shardName);
465
466                     LOG.debug("{}: {}", persistenceId(), msg, failure);
467
468                     // FAILURE
469                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
470                 } else {
471                     // SUCCESS
472                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
473                 }
474             }
475         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
476     }
477
478     private void onShardReplicaRemoved(ServerRemoved message) {
479         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
480     }
481
482     @SuppressWarnings("checkstyle:IllegalCatch")
483     private void removeShard(final ShardIdentifier shardId) {
484         final String shardName = shardId.getShardName();
485         final ShardInformation shardInformation = localShards.remove(shardName);
486         if (shardInformation == null) {
487             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
488             return;
489         }
490
491         final ActorRef shardActor = shardInformation.getActor();
492         if (shardActor != null) {
493             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor);
494             FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig()
495                     .getElectionTimeOutInterval().$times(3);
496             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE);
497             shardActorStoppingFutures.put(shardName, stopFuture);
498             stopFuture.onComplete(new OnComplete<Boolean>() {
499                 @Override
500                 public void onComplete(Throwable failure, Boolean result) {
501                     if (failure == null) {
502                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
503                     } else {
504                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
505                     }
506
507                     self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName),
508                             ActorRef.noSender());
509                 }
510             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
511         }
512
513         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
514         persistShardList();
515     }
516
517     private void onGetSnapshot() {
518         LOG.debug("{}: onGetSnapshot", persistenceId());
519
520         List<String> notInitialized = null;
521         for (ShardInformation shardInfo : localShards.values()) {
522             if (!shardInfo.isShardInitialized()) {
523                 if (notInitialized == null) {
524                     notInitialized = new ArrayList<>();
525                 }
526
527                 notInitialized.add(shardInfo.getShardName());
528             }
529         }
530
531         if (notInitialized != null) {
532             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
533                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
534             return;
535         }
536
537         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
538                 new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
539                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
540
541         for (ShardInformation shardInfo: localShards.values()) {
542             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
543         }
544     }
545
546     @SuppressWarnings("checkstyle:IllegalCatch")
547     private void onCreateShard(CreateShard createShard) {
548         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
549
550         Object reply;
551         try {
552             String shardName = createShard.getModuleShardConfig().getShardName();
553             if (localShards.containsKey(shardName)) {
554                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
555                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
556             } else {
557                 doCreateShard(createShard);
558                 reply = new Status.Success(null);
559             }
560         } catch (Exception e) {
561             LOG.error("{}: onCreateShard failed", persistenceId(), e);
562             reply = new Status.Failure(e);
563         }
564
565         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
566             getSender().tell(reply, getSelf());
567         }
568     }
569
570     private void onPrefixShardCreated(final PrefixShardCreated message) {
571         LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
572
573         final PrefixShardConfiguration config = message.getConfiguration();
574         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
575                 ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
576         final String shardName = shardId.getShardName();
577
578         if (isPreviousShardActorStopInProgress(shardName, message)) {
579             return;
580         }
581
582         if (localShards.containsKey(shardName)) {
583             LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
584             final PrefixShardConfiguration existing =
585                     configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
586
587             if (existing != null && existing.equals(config)) {
588                 // we don't have to do nothing here
589                 return;
590             }
591         }
592
593         doCreatePrefixShard(config, shardId, shardName);
594     }
595
596     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
597         final Future<Boolean> stopFuture = shardActorStoppingFutures.get(shardName);
598         if (stopFuture == null) {
599             return false;
600         }
601
602         LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(),
603                 shardName, messageToDefer);
604         final ActorRef sender = getSender();
605         stopFuture.onComplete(new OnComplete<Boolean>() {
606             @Override
607             public void onComplete(Throwable failure, Boolean result) {
608                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
609                 self().tell(messageToDefer, sender);
610             }
611         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
612
613         return true;
614     }
615
616     private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
617         configuration.addPrefixShardConfiguration(config);
618
619         final Builder builder = newShardDatastoreContextBuilder(shardName);
620         builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
621                 .storeRoot(config.getPrefix().getRootIdentifier());
622         DatastoreContext shardDatastoreContext = builder.build();
623
624         final Map<String, String> peerAddresses = getPeerAddresses(shardName);
625         final boolean isActiveMember = true;
626
627         LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
628                 persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
629
630         final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
631                 shardDatastoreContext, Shard.builder(), peerAddressResolver);
632         info.setActiveMember(isActiveMember);
633         localShards.put(info.getShardName(), info);
634
635         if (schemaContext != null) {
636             info.setActor(newShardActor(schemaContext, info));
637         }
638     }
639
640     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
641         LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
642
643         final DOMDataTreeIdentifier prefix = message.getPrefix();
644         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
645                 ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
646
647         configuration.removePrefixShardConfiguration(prefix);
648         removeShard(shardId);
649     }
650
651     private void doCreateShard(final CreateShard createShard) {
652         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
653         final String shardName = moduleShardConfig.getShardName();
654
655         configuration.addModuleShardConfiguration(moduleShardConfig);
656
657         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
658         if (shardDatastoreContext == null) {
659             shardDatastoreContext = newShardDatastoreContext(shardName);
660         } else {
661             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
662                     peerAddressResolver).build();
663         }
664
665         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
666
667         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
668                 && currentSnapshot.getShardList().contains(shardName);
669
670         Map<String, String> peerAddresses;
671         boolean isActiveMember;
672         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
673                 .contains(cluster.getCurrentMemberName())) {
674             peerAddresses = getPeerAddresses(shardName);
675             isActiveMember = true;
676         } else {
677             // The local member is not in the static shard member configuration and the shard did not
678             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
679             // the shard with no peers and with elections disabled so it stays as follower. A
680             // subsequent AddServer request will be needed to make it an active member.
681             isActiveMember = false;
682             peerAddresses = Collections.emptyMap();
683             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
684                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
685         }
686
687         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
688                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
689                 isActiveMember);
690
691         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
692                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
693         info.setActiveMember(isActiveMember);
694         localShards.put(info.getShardName(), info);
695
696         if (schemaContext != null) {
697             info.setActor(newShardActor(schemaContext, info));
698         }
699     }
700
701     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
702         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
703                 .shardPeerAddressResolver(peerAddressResolver);
704     }
705
706     private DatastoreContext newShardDatastoreContext(String shardName) {
707         return newShardDatastoreContextBuilder(shardName).build();
708     }
709
710     private void checkReady() {
711         if (isReadyWithLeaderId()) {
712             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
713                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
714
715             waitTillReadyCountdownLatch.countDown();
716         }
717     }
718
719     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
720         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
721
722         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
723         if (shardInformation != null) {
724             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
725             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
726             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
727                 primaryShardInfoCache.remove(shardInformation.getShardName());
728             }
729
730             checkReady();
731         } else {
732             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
733         }
734     }
735
736     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
737         ShardInformation shardInfo = message.getShardInfo();
738
739         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
740                 shardInfo.getShardName());
741
742         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
743
744         if (!shardInfo.isShardInitialized()) {
745             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
746             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
747         } else {
748             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
749             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
750         }
751     }
752
753     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
754         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
755                 status.getName(), status.isInitialSyncDone());
756
757         ShardInformation shardInformation = findShardInformation(status.getName());
758
759         if (shardInformation != null) {
760             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
761
762             shardManagerMBean.setSyncStatus(isInSync());
763         }
764
765     }
766
767     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
768         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
769                 roleChanged.getOldRole(), roleChanged.getNewRole());
770
771         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
772         if (shardInformation != null) {
773             shardInformation.setRole(roleChanged.getNewRole());
774             checkReady();
775             shardManagerMBean.setSyncStatus(isInSync());
776         }
777     }
778
779
780     private ShardInformation findShardInformation(String memberId) {
781         for (ShardInformation info : localShards.values()) {
782             if (info.getShardId().toString().equals(memberId)) {
783                 return info;
784             }
785         }
786
787         return null;
788     }
789
790     private boolean isReadyWithLeaderId() {
791         boolean isReady = true;
792         for (ShardInformation info : localShards.values()) {
793             if (!info.isShardReadyWithLeaderId()) {
794                 isReady = false;
795                 break;
796             }
797         }
798         return isReady;
799     }
800
801     private boolean isInSync() {
802         for (ShardInformation info : localShards.values()) {
803             if (!info.isInSync()) {
804                 return false;
805             }
806         }
807         return true;
808     }
809
810     private void onActorInitialized(Object message) {
811         final ActorRef sender = getSender();
812
813         if (sender == null) {
814             return; //why is a non-actor sending this message? Just ignore.
815         }
816
817         String actorName = sender.path().name();
818         //find shard name from actor name; actor name is stringified shardId
819
820         final ShardIdentifier shardId;
821         try {
822             shardId = ShardIdentifier.fromShardIdString(actorName);
823         } catch (IllegalArgumentException e) {
824             LOG.debug("{}: ignoring actor {}", actorName, e);
825             return;
826         }
827
828         markShardAsInitialized(shardId.getShardName());
829     }
830
831     private void markShardAsInitialized(String shardName) {
832         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
833
834         ShardInformation shardInformation = localShards.get(shardName);
835         if (shardInformation != null) {
836             shardInformation.setActorInitialized();
837
838             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
839         }
840     }
841
842     @Override
843     protected void handleRecover(Object message) throws Exception {
844         if (message instanceof RecoveryCompleted) {
845             onRecoveryCompleted();
846         } else if (message instanceof SnapshotOffer) {
847             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
848         }
849     }
850
851     @SuppressWarnings("checkstyle:IllegalCatch")
852     private void onRecoveryCompleted() {
853         LOG.info("Recovery complete : {}", persistenceId());
854
855         if (currentSnapshot == null && restoreFromSnapshot != null
856                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
857             ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
858
859             LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
860
861             applyShardManagerSnapshot(snapshot);
862         }
863
864         createLocalShards();
865     }
866
867     private void sendResponse(ShardInformation shardInformation, boolean doWait,
868             boolean wantShardReady, final Supplier<Object> messageSupplier) {
869         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
870             if (doWait) {
871                 final ActorRef sender = getSender();
872                 final ActorRef self = self();
873
874                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
875
876                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
877                     new OnShardInitialized(replyRunnable);
878
879                 shardInformation.addOnShardInitialized(onShardInitialized);
880
881                 FiniteDuration timeout = shardInformation.getDatastoreContext()
882                         .getShardInitializationTimeout().duration();
883                 if (shardInformation.isShardInitialized()) {
884                     // If the shard is already initialized then we'll wait enough time for the shard to
885                     // elect a leader, ie 2 times the election timeout.
886                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
887                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
888                 }
889
890                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
891                         shardInformation.getShardName());
892
893                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
894                         timeout, getSelf(),
895                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
896                         getContext().dispatcher(), getSelf());
897
898                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
899
900             } else if (!shardInformation.isShardInitialized()) {
901                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
902                         shardInformation.getShardName());
903                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
904             } else {
905                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
906                         shardInformation.getShardName());
907                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
908             }
909
910             return;
911         }
912
913         getSender().tell(messageSupplier.get(), getSelf());
914     }
915
916     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
917         return new NoShardLeaderException(null, shardId.toString());
918     }
919
920     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
921         return new NotInitializedException(String.format(
922                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
923     }
924
925     @VisibleForTesting
926     static MemberName memberToName(final Member member) {
927         return MemberName.forName(member.roles().iterator().next());
928     }
929
930     private void memberRemoved(ClusterEvent.MemberRemoved message) {
931         MemberName memberName = memberToName(message.member());
932
933         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
934                 message.member().address());
935
936         peerAddressResolver.removePeerAddress(memberName);
937
938         for (ShardInformation info : localShards.values()) {
939             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
940         }
941     }
942
943     private void memberExited(ClusterEvent.MemberExited message) {
944         MemberName memberName = memberToName(message.member());
945
946         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
947                 message.member().address());
948
949         peerAddressResolver.removePeerAddress(memberName);
950
951         for (ShardInformation info : localShards.values()) {
952             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
953         }
954     }
955
956     private void memberUp(ClusterEvent.MemberUp message) {
957         MemberName memberName = memberToName(message.member());
958
959         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
960                 message.member().address());
961
962         memberUp(memberName, message.member().address());
963     }
964
965     private void memberUp(MemberName memberName, Address address) {
966         addPeerAddress(memberName, address);
967         checkReady();
968     }
969
970     private void memberWeaklyUp(MemberWeaklyUp message) {
971         MemberName memberName = memberToName(message.member());
972
973         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
974                 message.member().address());
975
976         memberUp(memberName, message.member().address());
977     }
978
979     private void addPeerAddress(MemberName memberName, Address address) {
980         peerAddressResolver.addPeerAddress(memberName, address);
981
982         for (ShardInformation info : localShards.values()) {
983             String shardName = info.getShardName();
984             String peerId = getShardIdentifier(memberName, shardName).toString();
985             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
986
987             info.peerUp(memberName, peerId, getSelf());
988         }
989     }
990
991     private void memberReachable(ClusterEvent.ReachableMember message) {
992         MemberName memberName = memberToName(message.member());
993         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
994
995         addPeerAddress(memberName, message.member().address());
996
997         markMemberAvailable(memberName);
998     }
999
1000     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
1001         MemberName memberName = memberToName(message.member());
1002         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
1003
1004         markMemberUnavailable(memberName);
1005     }
1006
1007     private void markMemberUnavailable(final MemberName memberName) {
1008         final String memberStr = memberName.getName();
1009         for (ShardInformation info : localShards.values()) {
1010             String leaderId = info.getLeaderId();
1011             // XXX: why are we using String#contains() here?
1012             if (leaderId != null && leaderId.contains(memberStr)) {
1013                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
1014                 info.setLeaderAvailable(false);
1015
1016                 primaryShardInfoCache.remove(info.getShardName());
1017             }
1018
1019             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
1020         }
1021     }
1022
1023     private void markMemberAvailable(final MemberName memberName) {
1024         final String memberStr = memberName.getName();
1025         for (ShardInformation info : localShards.values()) {
1026             String leaderId = info.getLeaderId();
1027             // XXX: why are we using String#contains() here?
1028             if (leaderId != null && leaderId.contains(memberStr)) {
1029                 LOG.debug("Marking Leader {} as available.", leaderId);
1030                 info.setLeaderAvailable(true);
1031             }
1032
1033             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
1034         }
1035     }
1036
1037     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
1038         datastoreContextFactory = factory;
1039         for (ShardInformation info : localShards.values()) {
1040             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
1041         }
1042     }
1043
1044     private void onGetLocalShardIds() {
1045         final List<String> response = new ArrayList<>(localShards.size());
1046
1047         for (ShardInformation info : localShards.values()) {
1048             response.add(info.getShardId().toString());
1049         }
1050
1051         getSender().tell(new Status.Success(response), getSelf());
1052     }
1053
1054     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
1055         final ShardIdentifier identifier = message.getShardId();
1056
1057         if (identifier != null) {
1058             final ShardInformation info = localShards.get(identifier.getShardName());
1059             if (info == null) {
1060                 getSender().tell(new Status.Failure(
1061                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
1062                 return;
1063             }
1064
1065             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1066         } else {
1067             for (ShardInformation info : localShards.values()) {
1068                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1069             }
1070         }
1071
1072         getSender().tell(new Status.Success(null), getSelf());
1073     }
1074
1075     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
1076         final ActorRef actor = info.getActor();
1077         if (actor != null) {
1078             actor.tell(switchBehavior, getSelf());
1079         } else {
1080             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
1081                 info.getShardName(), switchBehavior.getNewState());
1082         }
1083     }
1084
1085     /**
1086      * Notifies all the local shards of a change in the schema context.
1087      *
1088      * @param message the message to send
1089      */
1090     private void updateSchemaContext(final Object message) {
1091         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
1092
1093         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
1094
1095         for (ShardInformation info : localShards.values()) {
1096             if (info.getActor() == null) {
1097                 LOG.debug("Creating Shard {}", info.getShardId());
1098                 info.setActor(newShardActor(schemaContext, info));
1099             } else {
1100                 info.getActor().tell(message, getSelf());
1101             }
1102         }
1103     }
1104
1105     @VisibleForTesting
1106     protected ClusterWrapper getCluster() {
1107         return cluster;
1108     }
1109
1110     @VisibleForTesting
1111     protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) {
1112         return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath),
1113                 info.getShardId().toString());
1114     }
1115
1116     private void findPrimary(FindPrimary message) {
1117         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
1118
1119         final String shardName = message.getShardName();
1120         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
1121
1122         // First see if the there is a local replica for the shard
1123         final ShardInformation info = localShards.get(shardName);
1124         if (info != null && info.isActiveMember()) {
1125             sendResponse(info, message.isWaitUntilReady(), true, () -> {
1126                 String primaryPath = info.getSerializedLeaderActor();
1127                 Object found = canReturnLocalShardState && info.isLeader()
1128                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
1129                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
1130
1131                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
1132                 return found;
1133             });
1134
1135             return;
1136         }
1137
1138         final Collection<String> visitedAddresses;
1139         if (message instanceof RemoteFindPrimary) {
1140             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1141         } else {
1142             visitedAddresses = new ArrayList<>(1);
1143         }
1144
1145         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1146
1147         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1148             if (visitedAddresses.contains(address)) {
1149                 continue;
1150             }
1151
1152             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1153                     persistenceId(), shardName, address, visitedAddresses);
1154
1155             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1156                     message.isWaitUntilReady(), visitedAddresses), getContext());
1157             return;
1158         }
1159
1160         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1161
1162         getSender().tell(new PrimaryNotFoundException(
1163                 String.format("No primary shard found for %s.", shardName)), getSelf());
1164     }
1165
1166     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1167         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1168                 .getShardInitializationTimeout().duration().$times(2));
1169
1170         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1171         futureObj.onComplete(new OnComplete<Object>() {
1172             @Override
1173             public void onComplete(Throwable failure, Object response) {
1174                 if (failure != null) {
1175                     handler.onFailure(failure);
1176                 } else {
1177                     if (response instanceof RemotePrimaryShardFound) {
1178                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1179                     } else if (response instanceof LocalPrimaryShardFound) {
1180                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1181                     } else {
1182                         handler.onUnknownResponse(response);
1183                     }
1184                 }
1185             }
1186         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1187     }
1188
1189     /**
1190      * Construct the name of the shard actor given the name of the member on
1191      * which the shard resides and the name of the shard.
1192      *
1193      * @param memberName the member name
1194      * @param shardName the shard name
1195      * @return a b
1196      */
1197     private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
1198         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1199     }
1200
1201     /**
1202      * Create shards that are local to the member on which the ShardManager runs.
1203      */
1204     private void createLocalShards() {
1205         MemberName memberName = this.cluster.getCurrentMemberName();
1206         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
1207
1208         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1209         if (restoreFromSnapshot != null) {
1210             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1211                 shardSnapshots.put(snapshot.getName(), snapshot);
1212             }
1213         }
1214
1215         restoreFromSnapshot = null; // null out to GC
1216
1217         for (String shardName : memberShardNames) {
1218             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1219
1220             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1221
1222             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1223             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
1224                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
1225                         shardSnapshots.get(shardName)), peerAddressResolver));
1226         }
1227     }
1228
1229     /**
1230      * Given the name of the shard find the addresses of all it's peers.
1231      *
1232      * @param shardName the shard name
1233      */
1234     private Map<String, String> getPeerAddresses(String shardName) {
1235         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1236         return getPeerAddresses(shardName, members);
1237     }
1238
1239     private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
1240         Map<String, String> peerAddresses = new HashMap<>();
1241         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1242
1243         for (MemberName memberName : members) {
1244             if (!currentMemberName.equals(memberName)) {
1245                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1246                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1247                 peerAddresses.put(shardId.toString(), address);
1248             }
1249         }
1250         return peerAddresses;
1251     }
1252
1253     @Override
1254     public SupervisorStrategy supervisorStrategy() {
1255
1256         return new OneForOneStrategy(10, Duration.create("1 minute"),
1257                 (Function<Throwable, Directive>) t -> {
1258                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1259                     return SupervisorStrategy.resume();
1260                 });
1261     }
1262
1263     @Override
1264     public String persistenceId() {
1265         return persistenceId;
1266     }
1267
1268     @VisibleForTesting
1269     ShardManagerInfoMBean getMBean() {
1270         return shardManagerMBean;
1271     }
1272
1273     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1274         if (shardReplicaOperationsInProgress.contains(shardName)) {
1275             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1276             LOG.debug("{}: {}", persistenceId(), msg);
1277             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1278             return true;
1279         }
1280
1281         return false;
1282     }
1283
1284     private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
1285         LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
1286
1287         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1288                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1289         final String shardName = shardId.getShardName();
1290
1291         // Create the localShard
1292         if (schemaContext == null) {
1293             String msg = String.format(
1294                     "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1295             LOG.debug("{}: {}", persistenceId(), msg);
1296             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1297             return;
1298         }
1299
1300         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1301                 getSelf()) {
1302             @Override
1303             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1304                 final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
1305                         message.getShardPrefix(), response, getSender());
1306                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1307                     getSelf().tell(runnable, getTargetActor());
1308                 }
1309             }
1310
1311             @Override
1312             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1313                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1314             }
1315         });
1316     }
1317
1318     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1319         final String shardName = shardReplicaMsg.getShardName();
1320
1321         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1322
1323         // verify the shard with the specified name is present in the cluster configuration
1324         if (!this.configuration.isShardConfigured(shardName)) {
1325             String msg = String.format("No module configuration exists for shard %s", shardName);
1326             LOG.debug("{}: {}", persistenceId(), msg);
1327             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1328             return;
1329         }
1330
1331         // Create the localShard
1332         if (schemaContext == null) {
1333             String msg = String.format(
1334                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1335             LOG.debug("{}: {}", persistenceId(), msg);
1336             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1337             return;
1338         }
1339
1340         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1341                 getSelf()) {
1342             @Override
1343             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1344                 final RunnableMessage runnable = (RunnableMessage) () ->
1345                     addShard(getShardName(), response, getSender());
1346                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1347                     getSelf().tell(runnable, getTargetActor());
1348                 }
1349             }
1350
1351             @Override
1352             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1353                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1354             }
1355         });
1356     }
1357
1358     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1359         String msg = String.format("Local shard %s already exists", shardName);
1360         LOG.debug("{}: {}", persistenceId(), msg);
1361         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1362     }
1363
1364     private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
1365                                 final RemotePrimaryShardFound response, final ActorRef sender) {
1366         if (isShardReplicaOperationInProgress(shardName, sender)) {
1367             return;
1368         }
1369
1370         shardReplicaOperationsInProgress.add(shardName);
1371
1372         final ShardInformation shardInfo;
1373         final boolean removeShardOnFailure;
1374         ShardInformation existingShardInfo = localShards.get(shardName);
1375         if (existingShardInfo == null) {
1376             removeShardOnFailure = true;
1377             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1378
1379             final Builder builder = newShardDatastoreContextBuilder(shardName);
1380             builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1381
1382             DatastoreContext datastoreContext = builder.build();
1383
1384             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1385                     Shard.builder(), peerAddressResolver);
1386             shardInfo.setActiveMember(false);
1387             localShards.put(shardName, shardInfo);
1388             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1389         } else {
1390             removeShardOnFailure = false;
1391             shardInfo = existingShardInfo;
1392         }
1393
1394         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1395     }
1396
1397     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1398         if (isShardReplicaOperationInProgress(shardName, sender)) {
1399             return;
1400         }
1401
1402         shardReplicaOperationsInProgress.add(shardName);
1403
1404         final ShardInformation shardInfo;
1405         final boolean removeShardOnFailure;
1406         ShardInformation existingShardInfo = localShards.get(shardName);
1407         if (existingShardInfo == null) {
1408             removeShardOnFailure = true;
1409             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1410
1411             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1412                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1413
1414             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1415                     Shard.builder(), peerAddressResolver);
1416             shardInfo.setActiveMember(false);
1417             localShards.put(shardName, shardInfo);
1418             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1419         } else {
1420             removeShardOnFailure = false;
1421             shardInfo = existingShardInfo;
1422         }
1423
1424         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1425     }
1426
1427     private void execAddShard(final String shardName,
1428                               final ShardInformation shardInfo,
1429                               final RemotePrimaryShardFound response,
1430                               final boolean removeShardOnFailure,
1431                               final ActorRef sender) {
1432
1433         final String localShardAddress =
1434                 peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1435
1436         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1437         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1438                 response.getPrimaryPath(), shardInfo.getShardId());
1439
1440         final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1441                 .getShardLeaderElectionTimeout().duration());
1442         final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1443                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1444
1445         futureObj.onComplete(new OnComplete<Object>() {
1446             @Override
1447             public void onComplete(Throwable failure, Object addServerResponse) {
1448                 if (failure != null) {
1449                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1450                             response.getPrimaryPath(), shardName, failure);
1451
1452                     final String msg = String.format("AddServer request to leader %s for shard %s failed",
1453                             response.getPrimaryPath(), shardName);
1454                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1455                 } else {
1456                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1457                             response.getPrimaryPath(), removeShardOnFailure), sender);
1458                 }
1459             }
1460         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1461     }
1462
1463     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1464             boolean removeShardOnFailure) {
1465         shardReplicaOperationsInProgress.remove(shardName);
1466
1467         if (removeShardOnFailure) {
1468             ShardInformation shardInfo = localShards.remove(shardName);
1469             if (shardInfo.getActor() != null) {
1470                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1471             }
1472         }
1473
1474         sender.tell(new Status.Failure(message == null ? failure :
1475             new RuntimeException(message, failure)), getSelf());
1476     }
1477
1478     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1479             String leaderPath, boolean removeShardOnFailure) {
1480         String shardName = shardInfo.getShardName();
1481         shardReplicaOperationsInProgress.remove(shardName);
1482
1483         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1484
1485         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1486             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1487
1488             // Make the local shard voting capable
1489             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1490             shardInfo.setActiveMember(true);
1491             persistShardList();
1492
1493             sender.tell(new Status.Success(null), getSelf());
1494         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1495             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1496         } else {
1497             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1498                     persistenceId(), shardName, replyMsg.getStatus());
1499
1500             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1501                     shardInfo.getShardId());
1502
1503             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1504         }
1505     }
1506
1507     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1508                                                String leaderPath, ShardIdentifier shardId) {
1509         Exception failure;
1510         switch (serverChangeStatus) {
1511             case TIMEOUT:
1512                 failure = new TimeoutException(String.format(
1513                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
1514                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
1515                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
1516                 break;
1517             case NO_LEADER:
1518                 failure = createNoShardLeaderException(shardId);
1519                 break;
1520             case NOT_SUPPORTED:
1521                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1522                         serverChange.getSimpleName(), shardId.getShardName()));
1523                 break;
1524             default :
1525                 failure = new RuntimeException(String.format(
1526                         "%s request to leader %s for shard %s failed with status %s",
1527                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1528         }
1529         return failure;
1530     }
1531
1532     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1533         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1534
1535         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1536                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1537             @Override
1538             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1539                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1540             }
1541
1542             @Override
1543             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1544                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1545             }
1546
1547             private void doRemoveShardReplicaAsync(final String primaryPath) {
1548                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1549                         primaryPath, getSender()), getTargetActor());
1550             }
1551         });
1552     }
1553
1554     private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
1555         LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
1556
1557         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1558                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1559         final String shardName = shardId.getShardName();
1560
1561         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
1562                 shardName, persistenceId(), getSelf()) {
1563             @Override
1564             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1565                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1566             }
1567
1568             @Override
1569             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1570                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1571             }
1572
1573             private void doRemoveShardReplicaAsync(final String primaryPath) {
1574                 getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
1575                         primaryPath, getSender()), getTargetActor());
1576             }
1577         });
1578     }
1579
1580     private void persistShardList() {
1581         List<String> shardList = new ArrayList<>(localShards.keySet());
1582         for (ShardInformation shardInfo : localShards.values()) {
1583             if (!shardInfo.isActiveMember()) {
1584                 shardList.remove(shardInfo.getShardName());
1585             }
1586         }
1587         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1588         saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
1589     }
1590
1591     private ShardManagerSnapshot updateShardManagerSnapshot(
1592             final List<String> shardList,
1593             final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
1594         currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
1595         return currentSnapshot;
1596     }
1597
1598     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1599         currentSnapshot = snapshot;
1600
1601         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1602
1603         final MemberName currentMember = cluster.getCurrentMemberName();
1604         Set<String> configuredShardList =
1605             new HashSet<>(configuration.getMemberShardNames(currentMember));
1606         for (String shard : currentSnapshot.getShardList()) {
1607             if (!configuredShardList.contains(shard)) {
1608                 // add the current member as a replica for the shard
1609                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1610                 configuration.addMemberReplicaForShard(shard, currentMember);
1611             } else {
1612                 configuredShardList.remove(shard);
1613             }
1614         }
1615         for (String shard : configuredShardList) {
1616             // remove the member as a replica for the shard
1617             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1618             configuration.removeMemberReplicaForShard(shard, currentMember);
1619         }
1620     }
1621
1622     private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
1623         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1624             persistenceId());
1625         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1626             0, 0));
1627     }
1628
1629     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1630         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1631
1632         String shardName = changeMembersVotingStatus.getShardName();
1633         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1634         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1635             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1636                     e.getValue());
1637         }
1638
1639         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1640
1641         findLocalShard(shardName, getSender(),
1642             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1643             localShardFound.getPath(), getSender()));
1644     }
1645
1646     private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
1647         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1648
1649         ActorRef sender = getSender();
1650         final String shardName = flipMembersVotingStatus.getShardName();
1651         findLocalShard(shardName, sender, localShardFound -> {
1652             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1653                     Timeout.apply(30, TimeUnit.SECONDS));
1654
1655             future.onComplete(new OnComplete<Object>() {
1656                 @Override
1657                 public void onComplete(Throwable failure, Object response) {
1658                     if (failure != null) {
1659                         sender.tell(new Status.Failure(new RuntimeException(
1660                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1661                         return;
1662                     }
1663
1664                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1665                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1666                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1667                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1668                     }
1669
1670                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1671                             .toString(), !raftState.isVoting());
1672
1673                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1674                             shardName, localShardFound.getPath(), sender);
1675                 }
1676             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1677         });
1678
1679     }
1680
1681     private void findLocalShard(FindLocalShard message) {
1682         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
1683
1684         final ShardInformation shardInformation = localShards.get(message.getShardName());
1685
1686         if (shardInformation == null) {
1687             LOG.debug("{}: Local shard {} not found - shards present: {}",
1688                     persistenceId(), message.getShardName(), localShards.keySet());
1689
1690             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1691             return;
1692         }
1693
1694         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1695             () -> new LocalShardFound(shardInformation.getActor()));
1696     }
1697
1698     private void findLocalShard(final String shardName, final ActorRef sender,
1699             final Consumer<LocalShardFound> onLocalShardFound) {
1700         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1701                 .getShardInitializationTimeout().duration().$times(2));
1702
1703         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1704         futureObj.onComplete(new OnComplete<Object>() {
1705             @Override
1706             public void onComplete(Throwable failure, Object response) {
1707                 if (failure != null) {
1708                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1709                             failure);
1710                     sender.tell(new Status.Failure(new RuntimeException(
1711                             String.format("Failed to find local shard %s", shardName), failure)), self());
1712                 } else {
1713                     if (response instanceof LocalShardFound) {
1714                         getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
1715                                 sender);
1716                     } else if (response instanceof LocalShardNotFound) {
1717                         String msg = String.format("Local shard %s does not exist", shardName);
1718                         LOG.debug("{}: {}", persistenceId, msg);
1719                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
1720                     } else {
1721                         String msg = String.format("Failed to find local shard %s: received response: %s",
1722                                 shardName, response);
1723                         LOG.debug("{}: {}", persistenceId, msg);
1724                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1725                                 new RuntimeException(msg)), self());
1726                     }
1727                 }
1728             }
1729         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1730     }
1731
1732     private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
1733             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1734         if (isShardReplicaOperationInProgress(shardName, sender)) {
1735             return;
1736         }
1737
1738         shardReplicaOperationsInProgress.add(shardName);
1739
1740         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1741         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1742
1743         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1744                 changeServersVotingStatus, shardActorRef.path());
1745
1746         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1747         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1748
1749         futureObj.onComplete(new OnComplete<Object>() {
1750             @Override
1751             public void onComplete(Throwable failure, Object response) {
1752                 shardReplicaOperationsInProgress.remove(shardName);
1753                 if (failure != null) {
1754                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
1755                             shardActorRef.path());
1756                     LOG.debug("{}: {}", persistenceId(), msg, failure);
1757                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
1758                 } else {
1759                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1760
1761                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1762                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1763                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1764                         sender.tell(new Status.Success(null), getSelf());
1765                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1766                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1767                                 "The requested voting state change for shard %s is invalid. At least one member "
1768                                 + "must be voting", shardId.getShardName()))), getSelf());
1769                     } else {
1770                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1771                                 persistenceId(), shardName, replyMsg.getStatus());
1772
1773                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1774                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1775                         sender.tell(new Status.Failure(error), getSelf());
1776                     }
1777                 }
1778             }
1779         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1780     }
1781
1782     private static final class ForwardedAddServerReply {
1783         ShardInformation shardInfo;
1784         AddServerReply addServerReply;
1785         String leaderPath;
1786         boolean removeShardOnFailure;
1787
1788         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1789                 boolean removeShardOnFailure) {
1790             this.shardInfo = shardInfo;
1791             this.addServerReply = addServerReply;
1792             this.leaderPath = leaderPath;
1793             this.removeShardOnFailure = removeShardOnFailure;
1794         }
1795     }
1796
1797     private static final class ForwardedAddServerFailure {
1798         String shardName;
1799         String failureMessage;
1800         Throwable failure;
1801         boolean removeShardOnFailure;
1802
1803         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1804                 boolean removeShardOnFailure) {
1805             this.shardName = shardName;
1806             this.failureMessage = failureMessage;
1807             this.failure = failure;
1808             this.removeShardOnFailure = removeShardOnFailure;
1809         }
1810     }
1811
1812     static class OnShardInitialized {
1813         private final Runnable replyRunnable;
1814         private Cancellable timeoutSchedule;
1815
1816         OnShardInitialized(Runnable replyRunnable) {
1817             this.replyRunnable = replyRunnable;
1818         }
1819
1820         Runnable getReplyRunnable() {
1821             return replyRunnable;
1822         }
1823
1824         Cancellable getTimeoutSchedule() {
1825             return timeoutSchedule;
1826         }
1827
1828         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1829             this.timeoutSchedule = timeoutSchedule;
1830         }
1831     }
1832
1833     static class OnShardReady extends OnShardInitialized {
1834         OnShardReady(Runnable replyRunnable) {
1835             super(replyRunnable);
1836         }
1837     }
1838
1839     private interface RunnableMessage extends Runnable {
1840     }
1841
1842     /**
1843      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1844      * a remote or local find primary message is processed.
1845      */
1846     private interface FindPrimaryResponseHandler {
1847         /**
1848          * Invoked when a Failure message is received as a response.
1849          *
1850          * @param failure the failure exception
1851          */
1852         void onFailure(Throwable failure);
1853
1854         /**
1855          * Invoked when a RemotePrimaryShardFound response is received.
1856          *
1857          * @param response the response
1858          */
1859         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1860
1861         /**
1862          * Invoked when a LocalPrimaryShardFound response is received.
1863          *
1864          * @param response the response
1865          */
1866         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1867
1868         /**
1869          * Invoked when an unknown response is received. This is another type of failure.
1870          *
1871          * @param response the response
1872          */
1873         void onUnknownResponse(Object response);
1874     }
1875
1876     /**
1877      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1878      * replica and sends a wrapped Failure response to some targetActor.
1879      */
1880     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1881         private final ActorRef targetActor;
1882         private final String shardName;
1883         private final String persistenceId;
1884         private final ActorRef shardManagerActor;
1885
1886         /**
1887          * Constructs an instance.
1888          *
1889          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1890          * @param shardName The name of the shard for which the primary replica had to be found
1891          * @param persistenceId The persistenceId for the ShardManager
1892          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1893          */
1894         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
1895                 ActorRef shardManagerActor) {
1896             this.targetActor = Preconditions.checkNotNull(targetActor);
1897             this.shardName = Preconditions.checkNotNull(shardName);
1898             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1899             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1900         }
1901
1902         public ActorRef getTargetActor() {
1903             return targetActor;
1904         }
1905
1906         public String getShardName() {
1907             return shardName;
1908         }
1909
1910         @Override
1911         public void onFailure(Throwable failure) {
1912             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1913             targetActor.tell(new Status.Failure(new RuntimeException(
1914                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1915         }
1916
1917         @Override
1918         public void onUnknownResponse(Object response) {
1919             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1920                     shardName, response);
1921             LOG.debug("{}: {}", persistenceId, msg);
1922             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1923                     new RuntimeException(msg)), shardManagerActor);
1924         }
1925     }
1926
1927     /**
1928      * The WrappedShardResponse class wraps a response from a Shard.
1929      */
1930     private static final class WrappedShardResponse {
1931         private final ShardIdentifier shardId;
1932         private final Object response;
1933         private final String leaderPath;
1934
1935         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1936             this.shardId = shardId;
1937             this.response = response;
1938             this.leaderPath = leaderPath;
1939         }
1940
1941         ShardIdentifier getShardId() {
1942             return shardId;
1943         }
1944
1945         Object getResponse() {
1946             return response;
1947         }
1948
1949         String getLeaderPath() {
1950             return leaderPath;
1951         }
1952     }
1953
1954     private static final class ShardNotInitializedTimeout {
1955         private final ActorRef sender;
1956         private final ShardInformation shardInfo;
1957         private final OnShardInitialized onShardInitialized;
1958
1959         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1960             this.sender = sender;
1961             this.shardInfo = shardInfo;
1962             this.onShardInitialized = onShardInitialized;
1963         }
1964
1965         ActorRef getSender() {
1966             return sender;
1967         }
1968
1969         ShardInformation getShardInfo() {
1970             return shardInfo;
1971         }
1972
1973         OnShardInitialized getOnShardInitialized() {
1974             return onShardInitialized;
1975         }
1976     }
1977 }
1978
1979
1980