bb289bc82e365177e55d320ffbbbe92d1cd8f404
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.actor.SupervisorStrategy.Directive;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberWeaklyUp;
23 import akka.cluster.Member;
24 import akka.dispatch.Futures;
25 import akka.dispatch.OnComplete;
26 import akka.japi.Function;
27 import akka.pattern.Patterns;
28 import akka.persistence.RecoveryCompleted;
29 import akka.persistence.SaveSnapshotFailure;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.persistence.SnapshotOffer;
32 import akka.persistence.SnapshotSelectionCriteria;
33 import akka.util.Timeout;
34 import com.google.common.annotations.VisibleForTesting;
35 import com.google.common.base.Preconditions;
36 import java.util.ArrayList;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.HashMap;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.Set;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import java.util.function.Consumer;
49 import java.util.function.Supplier;
50 import org.opendaylight.controller.cluster.access.concepts.MemberName;
51 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
52 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
53 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
54 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
55 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
57 import org.opendaylight.controller.cluster.datastore.Shard;
58 import org.opendaylight.controller.cluster.datastore.config.Configuration;
59 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
60 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
63 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
64 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
65 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
66 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
67 import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
68 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
69 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
70 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
71 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
72 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
73 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
77 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
78 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
79 import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
80 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
81 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
82 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
83 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
84 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
85 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
86 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
87 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
88 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
89 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
90 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
91 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
92 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
93 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
94 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
95 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
96 import org.opendaylight.controller.cluster.raft.messages.AddServer;
97 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
98 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
99 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
100 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
101 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
102 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
103 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
104 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
105 import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
106 import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
107 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
108 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
109 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
110 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
111 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
112 import org.opendaylight.yangtools.concepts.ListenerRegistration;
113 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
114 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
115 import org.slf4j.Logger;
116 import org.slf4j.LoggerFactory;
117 import scala.concurrent.ExecutionContext;
118 import scala.concurrent.Future;
119 import scala.concurrent.duration.Duration;
120 import scala.concurrent.duration.FiniteDuration;
121
122 /**
123  * Manages the shards for a data store. The ShardManager has the following jobs:
124  * <ul>
125  * <li> Create all the local shard replicas that belong on this cluster member
126  * <li> Find the address of the local shard
127  * <li> Find the primary replica for any given shard
128  * <li> Monitor the cluster members and store their addresses
129  * </ul>
130  */
131 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
132     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
133
134     // Stores a mapping between a shard name and it's corresponding information
135     // Shard names look like inventory, topology etc and are as specified in
136     // configuration
137     private final Map<String, ShardInformation> localShards = new HashMap<>();
138
139     // The type of a ShardManager reflects the type of the datastore itself
140     // A data store could be of type config/operational
141     private final String type;
142
143     private final ClusterWrapper cluster;
144
145     private final Configuration configuration;
146
147     private final String shardDispatcherPath;
148
149     private final ShardManagerInfo shardManagerMBean;
150
151     private DatastoreContextFactory datastoreContextFactory;
152
153     private final CountDownLatch waitTillReadyCountdownLatch;
154
155     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
156
157     private final ShardPeerAddressResolver peerAddressResolver;
158
159     private SchemaContext schemaContext;
160
161     private DatastoreSnapshot restoreFromSnapshot;
162
163     private ShardManagerSnapshot currentSnapshot;
164
165     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
166
167     private final Map<String, Future<Boolean>> shardActorStoppingFutures = new HashMap<>();
168
169     private final String persistenceId;
170     private final AbstractDataStore dataStore;
171
172     private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
173     private PrefixedShardConfigUpdateHandler configUpdateHandler;
174
175     ShardManager(AbstractShardManagerCreator<?> builder) {
176         this.cluster = builder.getCluster();
177         this.configuration = builder.getConfiguration();
178         this.datastoreContextFactory = builder.getDatastoreContextFactory();
179         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
180         this.shardDispatcherPath =
181                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
182         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
183         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
184         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
185
186         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
187         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
188
189         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
190
191         // Subscribe this actor to cluster member events
192         cluster.subscribeToMemberEvents(getSelf());
193
194         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
195                 "shard-manager-" + this.type,
196                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
197         shardManagerMBean.registerMBean();
198
199         dataStore = builder.getDistributedDataStore();
200     }
201
202     @Override
203     public void preStart() {
204         LOG.info("Starting ShardManager {}", persistenceId);
205     }
206
207     @Override
208     public void postStop() {
209         LOG.info("Stopping ShardManager {}", persistenceId());
210
211         shardManagerMBean.unregisterMBean();
212
213         if (configListenerReg != null) {
214             configListenerReg.close();
215             configListenerReg = null;
216         }
217     }
218
219     @Override
220     public void handleCommand(Object message) throws Exception {
221         if (message  instanceof FindPrimary) {
222             findPrimary((FindPrimary)message);
223         } else if (message instanceof FindLocalShard) {
224             findLocalShard((FindLocalShard) message);
225         } else if (message instanceof UpdateSchemaContext) {
226             updateSchemaContext(message);
227         } else if (message instanceof ActorInitialized) {
228             onActorInitialized(message);
229         } else if (message instanceof ClusterEvent.MemberUp) {
230             memberUp((ClusterEvent.MemberUp) message);
231         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
232             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
233         } else if (message instanceof ClusterEvent.MemberExited) {
234             memberExited((ClusterEvent.MemberExited) message);
235         } else if (message instanceof ClusterEvent.MemberRemoved) {
236             memberRemoved((ClusterEvent.MemberRemoved) message);
237         } else if (message instanceof ClusterEvent.UnreachableMember) {
238             memberUnreachable((ClusterEvent.UnreachableMember) message);
239         } else if (message instanceof ClusterEvent.ReachableMember) {
240             memberReachable((ClusterEvent.ReachableMember) message);
241         } else if (message instanceof DatastoreContextFactory) {
242             onDatastoreContextFactory((DatastoreContextFactory) message);
243         } else if (message instanceof RoleChangeNotification) {
244             onRoleChangeNotification((RoleChangeNotification) message);
245         } else if (message instanceof FollowerInitialSyncUpStatus) {
246             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
247         } else if (message instanceof ShardNotInitializedTimeout) {
248             onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
249         } else if (message instanceof ShardLeaderStateChanged) {
250             onLeaderStateChanged((ShardLeaderStateChanged) message);
251         } else if (message instanceof SwitchShardBehavior) {
252             onSwitchShardBehavior((SwitchShardBehavior) message);
253         } else if (message instanceof CreateShard) {
254             onCreateShard((CreateShard)message);
255         } else if (message instanceof AddShardReplica) {
256             onAddShardReplica((AddShardReplica) message);
257         } else if (message instanceof AddPrefixShardReplica) {
258             onAddPrefixShardReplica((AddPrefixShardReplica) message);
259         } else if (message instanceof PrefixShardCreated) {
260             onPrefixShardCreated((PrefixShardCreated) message);
261         } else if (message instanceof PrefixShardRemoved) {
262             onPrefixShardRemoved((PrefixShardRemoved) message);
263         } else if (message instanceof InitConfigListener) {
264             onInitConfigListener();
265         } else if (message instanceof ForwardedAddServerReply) {
266             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
267             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
268                     msg.removeShardOnFailure);
269         } else if (message instanceof ForwardedAddServerFailure) {
270             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
271             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
272         } else if (message instanceof RemoveShardReplica) {
273             onRemoveShardReplica((RemoveShardReplica) message);
274         } else if (message instanceof RemovePrefixShardReplica) {
275             onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
276         } else if (message instanceof WrappedShardResponse) {
277             onWrappedShardResponse((WrappedShardResponse) message);
278         } else if (message instanceof GetSnapshot) {
279             onGetSnapshot();
280         } else if (message instanceof ServerRemoved) {
281             onShardReplicaRemoved((ServerRemoved) message);
282         } else if (message instanceof ChangeShardMembersVotingStatus) {
283             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
284         } else if (message instanceof FlipShardMembersVotingStatus) {
285             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
286         } else if (message instanceof SaveSnapshotSuccess) {
287             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
288         } else if (message instanceof SaveSnapshotFailure) {
289             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
290                     ((SaveSnapshotFailure) message).cause());
291         } else if (message instanceof Shutdown) {
292             onShutDown();
293         } else if (message instanceof GetLocalShardIds) {
294             onGetLocalShardIds();
295         } else if (message instanceof RunnableMessage) {
296             ((RunnableMessage)message).run();
297         } else {
298             unknownMessage(message);
299         }
300     }
301
302     private void onInitConfigListener() {
303         LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
304
305         final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
306                 org.opendaylight.mdsal.common.api.LogicalDatastoreType
307                         .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
308
309         if (configUpdateHandler != null) {
310             configUpdateHandler.close();
311         }
312
313         configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
314         configUpdateHandler.initListener(dataStore, type);
315     }
316
317     private void onShutDown() {
318         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
319         for (ShardInformation info : localShards.values()) {
320             if (info.getActor() != null) {
321                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
322
323                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
324                         .getElectionTimeOutInterval().$times(2);
325                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
326             }
327         }
328
329         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
330
331         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
332                 .getDispatcher(Dispatchers.DispatcherType.Client);
333         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
334
335         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
336             @Override
337             public void onComplete(Throwable failure, Iterable<Boolean> results) {
338                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
339
340                 self().tell(PoisonPill.getInstance(), self());
341
342                 if (failure != null) {
343                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
344                 } else {
345                     int nfailed = 0;
346                     for (Boolean result : results) {
347                         if (!result) {
348                             nfailed++;
349                         }
350                     }
351
352                     if (nfailed > 0) {
353                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
354                     }
355                 }
356             }
357         }, dispatcher);
358     }
359
360     private void onWrappedShardResponse(WrappedShardResponse message) {
361         if (message.getResponse() instanceof RemoveServerReply) {
362             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
363                     message.getLeaderPath());
364         }
365     }
366
367     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
368             String leaderPath) {
369         shardReplicaOperationsInProgress.remove(shardId.getShardName());
370
371         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
372
373         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
374             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
375                     shardId.getShardName());
376             originalSender.tell(new Status.Success(null), getSelf());
377         } else {
378             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
379                     persistenceId(), shardId, replyMsg.getStatus());
380
381             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
382             originalSender.tell(new Status.Failure(failure), getSelf());
383         }
384     }
385
386     private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
387                                           final String primaryPath, final ActorRef sender) {
388         if (isShardReplicaOperationInProgress(shardName, sender)) {
389             return;
390         }
391
392         shardReplicaOperationsInProgress.add(shardName);
393
394         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
395
396         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
397
398         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
399         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
400                 primaryPath, shardId);
401
402         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
403         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
404                 new RemoveServer(shardId.toString()), removeServerTimeout);
405
406         futureObj.onComplete(new OnComplete<Object>() {
407             @Override
408             public void onComplete(Throwable failure, Object response) {
409                 if (failure != null) {
410                     shardReplicaOperationsInProgress.remove(shardName);
411                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
412                             primaryPath, shardName);
413
414                     LOG.debug("{}: {}", persistenceId(), msg, failure);
415
416                     // FAILURE
417                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
418                 } else {
419                     // SUCCESS
420                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
421                 }
422             }
423         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
424     }
425
426     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
427             final ActorRef sender) {
428         if (isShardReplicaOperationInProgress(shardName, sender)) {
429             return;
430         }
431
432         shardReplicaOperationsInProgress.add(shardName);
433
434         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
435
436         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
437
438         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
439         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
440                 primaryPath, shardId);
441
442         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
443         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
444                 new RemoveServer(shardId.toString()), removeServerTimeout);
445
446         futureObj.onComplete(new OnComplete<Object>() {
447             @Override
448             public void onComplete(Throwable failure, Object response) {
449                 if (failure != null) {
450                     shardReplicaOperationsInProgress.remove(shardName);
451                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
452                             primaryPath, shardName);
453
454                     LOG.debug("{}: {}", persistenceId(), msg, failure);
455
456                     // FAILURE
457                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
458                 } else {
459                     // SUCCESS
460                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
461                 }
462             }
463         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
464     }
465
466     private void onShardReplicaRemoved(ServerRemoved message) {
467         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
468     }
469
470     @SuppressWarnings("checkstyle:IllegalCatch")
471     private void removeShard(final ShardIdentifier shardId) {
472         final String shardName = shardId.getShardName();
473         final ShardInformation shardInformation = localShards.remove(shardName);
474         if (shardInformation == null) {
475             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
476             return;
477         }
478
479         final ActorRef shardActor = shardInformation.getActor();
480         if (shardActor != null) {
481             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardActor);
482             FiniteDuration duration = shardInformation.getDatastoreContext().getShardRaftConfig()
483                     .getElectionTimeOutInterval().$times(3);
484             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor, duration, Shutdown.INSTANCE);
485             shardActorStoppingFutures.put(shardName, stopFuture);
486             stopFuture.onComplete(new OnComplete<Boolean>() {
487                 @Override
488                 public void onComplete(Throwable failure, Boolean result) {
489                     if (failure == null) {
490                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
491                     } else {
492                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
493                     }
494
495                     self().tell((RunnableMessage) () -> shardActorStoppingFutures.remove(shardName),
496                             ActorRef.noSender());
497                 }
498             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
499         }
500
501         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
502         persistShardList();
503     }
504
505     private void onGetSnapshot() {
506         LOG.debug("{}: onGetSnapshot", persistenceId());
507
508         List<String> notInitialized = null;
509         for (ShardInformation shardInfo : localShards.values()) {
510             if (!shardInfo.isShardInitialized()) {
511                 if (notInitialized == null) {
512                     notInitialized = new ArrayList<>();
513                 }
514
515                 notInitialized.add(shardInfo.getShardName());
516             }
517         }
518
519         if (notInitialized != null) {
520             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
521                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
522             return;
523         }
524
525         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
526                 new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
527                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
528
529         for (ShardInformation shardInfo: localShards.values()) {
530             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
531         }
532     }
533
534     @SuppressWarnings("checkstyle:IllegalCatch")
535     private void onCreateShard(CreateShard createShard) {
536         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
537
538         Object reply;
539         try {
540             String shardName = createShard.getModuleShardConfig().getShardName();
541             if (localShards.containsKey(shardName)) {
542                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
543                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
544             } else {
545                 doCreateShard(createShard);
546                 reply = new Status.Success(null);
547             }
548         } catch (Exception e) {
549             LOG.error("{}: onCreateShard failed", persistenceId(), e);
550             reply = new Status.Failure(e);
551         }
552
553         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
554             getSender().tell(reply, getSelf());
555         }
556     }
557
558     private void onPrefixShardCreated(final PrefixShardCreated message) {
559         LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
560
561         final PrefixShardConfiguration config = message.getConfiguration();
562         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
563                 ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
564         final String shardName = shardId.getShardName();
565
566         if (isPreviousShardActorStopInProgress(shardName, message)) {
567             return;
568         }
569
570         if (localShards.containsKey(shardName)) {
571             LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
572             final PrefixShardConfiguration existing =
573                     configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
574
575             if (existing != null && existing.equals(config)) {
576                 // we don't have to do nothing here
577                 return;
578             }
579         }
580
581         doCreatePrefixShard(config, shardId, shardName);
582     }
583
584     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
585         final Future<Boolean> stopFuture = shardActorStoppingFutures.get(shardName);
586         if (stopFuture == null) {
587             return false;
588         }
589
590         LOG.debug("{} : Stop is in progress for shard {} - adding Future callback to defer {}", persistenceId(),
591                 shardName, messageToDefer);
592         final ActorRef sender = getSender();
593         stopFuture.onComplete(new OnComplete<Boolean>() {
594             @Override
595             public void onComplete(Throwable failure, Boolean result) {
596                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
597                 self().tell(messageToDefer, sender);
598             }
599         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
600
601         return true;
602     }
603
604     private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
605         configuration.addPrefixShardConfiguration(config);
606
607         final Builder builder = newShardDatastoreContextBuilder(shardName);
608         builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
609                 .storeRoot(config.getPrefix().getRootIdentifier());
610         DatastoreContext shardDatastoreContext = builder.build();
611
612         final Map<String, String> peerAddresses = getPeerAddresses(shardName);
613         final boolean isActiveMember = true;
614
615         LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
616                 persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
617
618         final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
619                 shardDatastoreContext, Shard.builder(), peerAddressResolver);
620         info.setActiveMember(isActiveMember);
621         localShards.put(info.getShardName(), info);
622
623         if (schemaContext != null) {
624             info.setActor(newShardActor(schemaContext, info));
625         }
626     }
627
628     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
629         LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
630
631         final DOMDataTreeIdentifier prefix = message.getPrefix();
632         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
633                 ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
634
635         configuration.removePrefixShardConfiguration(prefix);
636         removeShard(shardId);
637     }
638
639     private void doCreateShard(final CreateShard createShard) {
640         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
641         final String shardName = moduleShardConfig.getShardName();
642
643         configuration.addModuleShardConfiguration(moduleShardConfig);
644
645         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
646         if (shardDatastoreContext == null) {
647             shardDatastoreContext = newShardDatastoreContext(shardName);
648         } else {
649             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
650                     peerAddressResolver).build();
651         }
652
653         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
654
655         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
656                 && currentSnapshot.getShardList().contains(shardName);
657
658         Map<String, String> peerAddresses;
659         boolean isActiveMember;
660         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
661                 .contains(cluster.getCurrentMemberName())) {
662             peerAddresses = getPeerAddresses(shardName);
663             isActiveMember = true;
664         } else {
665             // The local member is not in the static shard member configuration and the shard did not
666             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
667             // the shard with no peers and with elections disabled so it stays as follower. A
668             // subsequent AddServer request will be needed to make it an active member.
669             isActiveMember = false;
670             peerAddresses = Collections.emptyMap();
671             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
672                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
673         }
674
675         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
676                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
677                 isActiveMember);
678
679         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
680                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
681         info.setActiveMember(isActiveMember);
682         localShards.put(info.getShardName(), info);
683
684         if (schemaContext != null) {
685             info.setActor(newShardActor(schemaContext, info));
686         }
687     }
688
689     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
690         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
691                 .shardPeerAddressResolver(peerAddressResolver);
692     }
693
694     private DatastoreContext newShardDatastoreContext(String shardName) {
695         return newShardDatastoreContextBuilder(shardName).build();
696     }
697
698     private void checkReady() {
699         if (isReadyWithLeaderId()) {
700             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
701                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
702
703             waitTillReadyCountdownLatch.countDown();
704         }
705     }
706
707     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
708         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
709
710         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
711         if (shardInformation != null) {
712             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
713             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
714             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
715                 primaryShardInfoCache.remove(shardInformation.getShardName());
716             }
717
718             checkReady();
719         } else {
720             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
721         }
722     }
723
724     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
725         ShardInformation shardInfo = message.getShardInfo();
726
727         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
728                 shardInfo.getShardName());
729
730         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
731
732         if (!shardInfo.isShardInitialized()) {
733             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
734             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
735         } else {
736             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
737             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
738         }
739     }
740
741     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
742         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
743                 status.getName(), status.isInitialSyncDone());
744
745         ShardInformation shardInformation = findShardInformation(status.getName());
746
747         if (shardInformation != null) {
748             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
749
750             shardManagerMBean.setSyncStatus(isInSync());
751         }
752
753     }
754
755     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
756         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
757                 roleChanged.getOldRole(), roleChanged.getNewRole());
758
759         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
760         if (shardInformation != null) {
761             shardInformation.setRole(roleChanged.getNewRole());
762             checkReady();
763             shardManagerMBean.setSyncStatus(isInSync());
764         }
765     }
766
767
768     private ShardInformation findShardInformation(String memberId) {
769         for (ShardInformation info : localShards.values()) {
770             if (info.getShardId().toString().equals(memberId)) {
771                 return info;
772             }
773         }
774
775         return null;
776     }
777
778     private boolean isReadyWithLeaderId() {
779         boolean isReady = true;
780         for (ShardInformation info : localShards.values()) {
781             if (!info.isShardReadyWithLeaderId()) {
782                 isReady = false;
783                 break;
784             }
785         }
786         return isReady;
787     }
788
789     private boolean isInSync() {
790         for (ShardInformation info : localShards.values()) {
791             if (!info.isInSync()) {
792                 return false;
793             }
794         }
795         return true;
796     }
797
798     private void onActorInitialized(Object message) {
799         final ActorRef sender = getSender();
800
801         if (sender == null) {
802             return; //why is a non-actor sending this message? Just ignore.
803         }
804
805         String actorName = sender.path().name();
806         //find shard name from actor name; actor name is stringified shardId
807
808         final ShardIdentifier shardId;
809         try {
810             shardId = ShardIdentifier.fromShardIdString(actorName);
811         } catch (IllegalArgumentException e) {
812             LOG.debug("{}: ignoring actor {}", actorName, e);
813             return;
814         }
815
816         markShardAsInitialized(shardId.getShardName());
817     }
818
819     private void markShardAsInitialized(String shardName) {
820         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
821
822         ShardInformation shardInformation = localShards.get(shardName);
823         if (shardInformation != null) {
824             shardInformation.setActorInitialized();
825
826             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
827         }
828     }
829
830     @Override
831     protected void handleRecover(Object message) throws Exception {
832         if (message instanceof RecoveryCompleted) {
833             onRecoveryCompleted();
834         } else if (message instanceof SnapshotOffer) {
835             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
836         }
837     }
838
839     @SuppressWarnings("checkstyle:IllegalCatch")
840     private void onRecoveryCompleted() {
841         LOG.info("Recovery complete : {}", persistenceId());
842
843         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
844         // journal on upgrade from Helium.
845         deleteMessages(lastSequenceNr());
846
847         if (currentSnapshot == null && restoreFromSnapshot != null
848                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
849             ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
850
851             LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
852
853             applyShardManagerSnapshot(snapshot);
854         }
855
856         createLocalShards();
857     }
858
859     private void sendResponse(ShardInformation shardInformation, boolean doWait,
860             boolean wantShardReady, final Supplier<Object> messageSupplier) {
861         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
862             if (doWait) {
863                 final ActorRef sender = getSender();
864                 final ActorRef self = self();
865
866                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
867
868                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
869                     new OnShardInitialized(replyRunnable);
870
871                 shardInformation.addOnShardInitialized(onShardInitialized);
872
873                 FiniteDuration timeout = shardInformation.getDatastoreContext()
874                         .getShardInitializationTimeout().duration();
875                 if (shardInformation.isShardInitialized()) {
876                     // If the shard is already initialized then we'll wait enough time for the shard to
877                     // elect a leader, ie 2 times the election timeout.
878                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
879                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
880                 }
881
882                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
883                         shardInformation.getShardName());
884
885                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
886                         timeout, getSelf(),
887                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
888                         getContext().dispatcher(), getSelf());
889
890                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
891
892             } else if (!shardInformation.isShardInitialized()) {
893                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
894                         shardInformation.getShardName());
895                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
896             } else {
897                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
898                         shardInformation.getShardName());
899                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
900             }
901
902             return;
903         }
904
905         getSender().tell(messageSupplier.get(), getSelf());
906     }
907
908     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
909         return new NoShardLeaderException(null, shardId.toString());
910     }
911
912     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
913         return new NotInitializedException(String.format(
914                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
915     }
916
917     @VisibleForTesting
918     static MemberName memberToName(final Member member) {
919         return MemberName.forName(member.roles().iterator().next());
920     }
921
922     private void memberRemoved(ClusterEvent.MemberRemoved message) {
923         MemberName memberName = memberToName(message.member());
924
925         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
926                 message.member().address());
927
928         peerAddressResolver.removePeerAddress(memberName);
929
930         for (ShardInformation info : localShards.values()) {
931             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
932         }
933     }
934
935     private void memberExited(ClusterEvent.MemberExited message) {
936         MemberName memberName = memberToName(message.member());
937
938         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
939                 message.member().address());
940
941         peerAddressResolver.removePeerAddress(memberName);
942
943         for (ShardInformation info : localShards.values()) {
944             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
945         }
946     }
947
948     private void memberUp(ClusterEvent.MemberUp message) {
949         MemberName memberName = memberToName(message.member());
950
951         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
952                 message.member().address());
953
954         memberUp(memberName, message.member().address());
955     }
956
957     private void memberUp(MemberName memberName, Address address) {
958         addPeerAddress(memberName, address);
959         checkReady();
960     }
961
962     private void memberWeaklyUp(MemberWeaklyUp message) {
963         MemberName memberName = memberToName(message.member());
964
965         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
966                 message.member().address());
967
968         memberUp(memberName, message.member().address());
969     }
970
971     private void addPeerAddress(MemberName memberName, Address address) {
972         peerAddressResolver.addPeerAddress(memberName, address);
973
974         for (ShardInformation info : localShards.values()) {
975             String shardName = info.getShardName();
976             String peerId = getShardIdentifier(memberName, shardName).toString();
977             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
978
979             info.peerUp(memberName, peerId, getSelf());
980         }
981     }
982
983     private void memberReachable(ClusterEvent.ReachableMember message) {
984         MemberName memberName = memberToName(message.member());
985         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
986
987         addPeerAddress(memberName, message.member().address());
988
989         markMemberAvailable(memberName);
990     }
991
992     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
993         MemberName memberName = memberToName(message.member());
994         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
995
996         markMemberUnavailable(memberName);
997     }
998
999     private void markMemberUnavailable(final MemberName memberName) {
1000         final String memberStr = memberName.getName();
1001         for (ShardInformation info : localShards.values()) {
1002             String leaderId = info.getLeaderId();
1003             // XXX: why are we using String#contains() here?
1004             if (leaderId != null && leaderId.contains(memberStr)) {
1005                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
1006                 info.setLeaderAvailable(false);
1007
1008                 primaryShardInfoCache.remove(info.getShardName());
1009             }
1010
1011             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
1012         }
1013     }
1014
1015     private void markMemberAvailable(final MemberName memberName) {
1016         final String memberStr = memberName.getName();
1017         for (ShardInformation info : localShards.values()) {
1018             String leaderId = info.getLeaderId();
1019             // XXX: why are we using String#contains() here?
1020             if (leaderId != null && leaderId.contains(memberStr)) {
1021                 LOG.debug("Marking Leader {} as available.", leaderId);
1022                 info.setLeaderAvailable(true);
1023             }
1024
1025             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
1026         }
1027     }
1028
1029     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
1030         datastoreContextFactory = factory;
1031         for (ShardInformation info : localShards.values()) {
1032             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
1033         }
1034     }
1035
1036     private void onGetLocalShardIds() {
1037         final List<String> response = new ArrayList<>(localShards.size());
1038
1039         for (ShardInformation info : localShards.values()) {
1040             response.add(info.getShardId().toString());
1041         }
1042
1043         getSender().tell(new Status.Success(response), getSelf());
1044     }
1045
1046     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
1047         final ShardIdentifier identifier = message.getShardId();
1048
1049         if (identifier != null) {
1050             final ShardInformation info = localShards.get(identifier.getShardName());
1051             if (info == null) {
1052                 getSender().tell(new Status.Failure(
1053                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
1054                 return;
1055             }
1056
1057             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1058         } else {
1059             for (ShardInformation info : localShards.values()) {
1060                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1061             }
1062         }
1063
1064         getSender().tell(new Status.Success(null), getSelf());
1065     }
1066
1067     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
1068         final ActorRef actor = info.getActor();
1069         if (actor != null) {
1070             actor.tell(switchBehavior, getSelf());
1071         } else {
1072             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
1073                 info.getShardName(), switchBehavior.getNewState());
1074         }
1075     }
1076
1077     /**
1078      * Notifies all the local shards of a change in the schema context.
1079      *
1080      * @param message the message to send
1081      */
1082     private void updateSchemaContext(final Object message) {
1083         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
1084
1085         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
1086
1087         for (ShardInformation info : localShards.values()) {
1088             if (info.getActor() == null) {
1089                 LOG.debug("Creating Shard {}", info.getShardId());
1090                 info.setActor(newShardActor(schemaContext, info));
1091             } else {
1092                 info.getActor().tell(message, getSelf());
1093             }
1094         }
1095     }
1096
1097     @VisibleForTesting
1098     protected ClusterWrapper getCluster() {
1099         return cluster;
1100     }
1101
1102     @VisibleForTesting
1103     protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) {
1104         return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath),
1105                 info.getShardId().toString());
1106     }
1107
1108     private void findPrimary(FindPrimary message) {
1109         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
1110
1111         final String shardName = message.getShardName();
1112         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
1113
1114         // First see if the there is a local replica for the shard
1115         final ShardInformation info = localShards.get(shardName);
1116         if (info != null && info.isActiveMember()) {
1117             sendResponse(info, message.isWaitUntilReady(), true, () -> {
1118                 String primaryPath = info.getSerializedLeaderActor();
1119                 Object found = canReturnLocalShardState && info.isLeader()
1120                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
1121                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
1122
1123                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
1124                 return found;
1125             });
1126
1127             return;
1128         }
1129
1130         final Collection<String> visitedAddresses;
1131         if (message instanceof RemoteFindPrimary) {
1132             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1133         } else {
1134             visitedAddresses = new ArrayList<>(1);
1135         }
1136
1137         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1138
1139         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1140             if (visitedAddresses.contains(address)) {
1141                 continue;
1142             }
1143
1144             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1145                     persistenceId(), shardName, address, visitedAddresses);
1146
1147             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1148                     message.isWaitUntilReady(), visitedAddresses), getContext());
1149             return;
1150         }
1151
1152         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1153
1154         getSender().tell(new PrimaryNotFoundException(
1155                 String.format("No primary shard found for %s.", shardName)), getSelf());
1156     }
1157
1158     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1159         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1160                 .getShardInitializationTimeout().duration().$times(2));
1161
1162         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1163         futureObj.onComplete(new OnComplete<Object>() {
1164             @Override
1165             public void onComplete(Throwable failure, Object response) {
1166                 if (failure != null) {
1167                     handler.onFailure(failure);
1168                 } else {
1169                     if (response instanceof RemotePrimaryShardFound) {
1170                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1171                     } else if (response instanceof LocalPrimaryShardFound) {
1172                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1173                     } else {
1174                         handler.onUnknownResponse(response);
1175                     }
1176                 }
1177             }
1178         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1179     }
1180
1181     /**
1182      * Construct the name of the shard actor given the name of the member on
1183      * which the shard resides and the name of the shard.
1184      *
1185      * @param memberName the member name
1186      * @param shardName the shard name
1187      * @return a b
1188      */
1189     private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
1190         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1191     }
1192
1193     /**
1194      * Create shards that are local to the member on which the ShardManager runs.
1195      */
1196     private void createLocalShards() {
1197         MemberName memberName = this.cluster.getCurrentMemberName();
1198         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
1199
1200         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1201         if (restoreFromSnapshot != null) {
1202             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1203                 shardSnapshots.put(snapshot.getName(), snapshot);
1204             }
1205         }
1206
1207         restoreFromSnapshot = null; // null out to GC
1208
1209         for (String shardName : memberShardNames) {
1210             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1211
1212             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1213
1214             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1215             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
1216                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
1217                         shardSnapshots.get(shardName)), peerAddressResolver));
1218         }
1219     }
1220
1221     /**
1222      * Given the name of the shard find the addresses of all it's peers.
1223      *
1224      * @param shardName the shard name
1225      */
1226     private Map<String, String> getPeerAddresses(String shardName) {
1227         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1228         return getPeerAddresses(shardName, members);
1229     }
1230
1231     private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
1232         Map<String, String> peerAddresses = new HashMap<>();
1233         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1234
1235         for (MemberName memberName : members) {
1236             if (!currentMemberName.equals(memberName)) {
1237                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1238                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1239                 peerAddresses.put(shardId.toString(), address);
1240             }
1241         }
1242         return peerAddresses;
1243     }
1244
1245     @Override
1246     public SupervisorStrategy supervisorStrategy() {
1247
1248         return new OneForOneStrategy(10, Duration.create("1 minute"),
1249                 (Function<Throwable, Directive>) t -> {
1250                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1251                     return SupervisorStrategy.resume();
1252                 });
1253     }
1254
1255     @Override
1256     public String persistenceId() {
1257         return persistenceId;
1258     }
1259
1260     @VisibleForTesting
1261     ShardManagerInfoMBean getMBean() {
1262         return shardManagerMBean;
1263     }
1264
1265     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1266         if (shardReplicaOperationsInProgress.contains(shardName)) {
1267             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1268             LOG.debug("{}: {}", persistenceId(), msg);
1269             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1270             return true;
1271         }
1272
1273         return false;
1274     }
1275
1276     private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
1277         LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
1278
1279         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1280                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1281         final String shardName = shardId.getShardName();
1282
1283         // Create the localShard
1284         if (schemaContext == null) {
1285             String msg = String.format(
1286                     "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1287             LOG.debug("{}: {}", persistenceId(), msg);
1288             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1289             return;
1290         }
1291
1292         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1293                 getSelf()) {
1294             @Override
1295             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1296                 final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
1297                         message.getShardPrefix(), response, getSender());
1298                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1299                     getSelf().tell(runnable, getTargetActor());
1300                 }
1301             }
1302
1303             @Override
1304             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1305                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1306             }
1307         });
1308     }
1309
1310     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1311         final String shardName = shardReplicaMsg.getShardName();
1312
1313         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1314
1315         // verify the shard with the specified name is present in the cluster configuration
1316         if (!this.configuration.isShardConfigured(shardName)) {
1317             String msg = String.format("No module configuration exists for shard %s", shardName);
1318             LOG.debug("{}: {}", persistenceId(), msg);
1319             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1320             return;
1321         }
1322
1323         // Create the localShard
1324         if (schemaContext == null) {
1325             String msg = String.format(
1326                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1327             LOG.debug("{}: {}", persistenceId(), msg);
1328             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1329             return;
1330         }
1331
1332         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1333                 getSelf()) {
1334             @Override
1335             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1336                 final RunnableMessage runnable = (RunnableMessage) () ->
1337                     addShard(getShardName(), response, getSender());
1338                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1339                     getSelf().tell(runnable, getTargetActor());
1340                 }
1341             }
1342
1343             @Override
1344             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1345                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1346             }
1347         });
1348     }
1349
1350     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1351         String msg = String.format("Local shard %s already exists", shardName);
1352         LOG.debug("{}: {}", persistenceId(), msg);
1353         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1354     }
1355
1356     private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
1357                                 final RemotePrimaryShardFound response, final ActorRef sender) {
1358         if (isShardReplicaOperationInProgress(shardName, sender)) {
1359             return;
1360         }
1361
1362         shardReplicaOperationsInProgress.add(shardName);
1363
1364         final ShardInformation shardInfo;
1365         final boolean removeShardOnFailure;
1366         ShardInformation existingShardInfo = localShards.get(shardName);
1367         if (existingShardInfo == null) {
1368             removeShardOnFailure = true;
1369             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1370
1371             final Builder builder = newShardDatastoreContextBuilder(shardName);
1372             builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1373
1374             DatastoreContext datastoreContext = builder.build();
1375
1376             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1377                     Shard.builder(), peerAddressResolver);
1378             shardInfo.setActiveMember(false);
1379             localShards.put(shardName, shardInfo);
1380             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1381         } else {
1382             removeShardOnFailure = false;
1383             shardInfo = existingShardInfo;
1384         }
1385
1386         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1387     }
1388
1389     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1390         if (isShardReplicaOperationInProgress(shardName, sender)) {
1391             return;
1392         }
1393
1394         shardReplicaOperationsInProgress.add(shardName);
1395
1396         final ShardInformation shardInfo;
1397         final boolean removeShardOnFailure;
1398         ShardInformation existingShardInfo = localShards.get(shardName);
1399         if (existingShardInfo == null) {
1400             removeShardOnFailure = true;
1401             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1402
1403             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1404                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1405
1406             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1407                     Shard.builder(), peerAddressResolver);
1408             shardInfo.setActiveMember(false);
1409             localShards.put(shardName, shardInfo);
1410             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1411         } else {
1412             removeShardOnFailure = false;
1413             shardInfo = existingShardInfo;
1414         }
1415
1416         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1417     }
1418
1419     private void execAddShard(final String shardName,
1420                               final ShardInformation shardInfo,
1421                               final RemotePrimaryShardFound response,
1422                               final boolean removeShardOnFailure,
1423                               final ActorRef sender) {
1424
1425         final String localShardAddress =
1426                 peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1427
1428         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1429         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1430                 response.getPrimaryPath(), shardInfo.getShardId());
1431
1432         final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1433                 .getShardLeaderElectionTimeout().duration());
1434         final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1435                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1436
1437         futureObj.onComplete(new OnComplete<Object>() {
1438             @Override
1439             public void onComplete(Throwable failure, Object addServerResponse) {
1440                 if (failure != null) {
1441                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1442                             response.getPrimaryPath(), shardName, failure);
1443
1444                     final String msg = String.format("AddServer request to leader %s for shard %s failed",
1445                             response.getPrimaryPath(), shardName);
1446                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1447                 } else {
1448                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1449                             response.getPrimaryPath(), removeShardOnFailure), sender);
1450                 }
1451             }
1452         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1453     }
1454
1455     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1456             boolean removeShardOnFailure) {
1457         shardReplicaOperationsInProgress.remove(shardName);
1458
1459         if (removeShardOnFailure) {
1460             ShardInformation shardInfo = localShards.remove(shardName);
1461             if (shardInfo.getActor() != null) {
1462                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1463             }
1464         }
1465
1466         sender.tell(new Status.Failure(message == null ? failure :
1467             new RuntimeException(message, failure)), getSelf());
1468     }
1469
1470     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1471             String leaderPath, boolean removeShardOnFailure) {
1472         String shardName = shardInfo.getShardName();
1473         shardReplicaOperationsInProgress.remove(shardName);
1474
1475         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1476
1477         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1478             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1479
1480             // Make the local shard voting capable
1481             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1482             shardInfo.setActiveMember(true);
1483             persistShardList();
1484
1485             sender.tell(new Status.Success(null), getSelf());
1486         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1487             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1488         } else {
1489             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1490                     persistenceId(), shardName, replyMsg.getStatus());
1491
1492             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1493                     shardInfo.getShardId());
1494
1495             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1496         }
1497     }
1498
1499     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1500                                                String leaderPath, ShardIdentifier shardId) {
1501         Exception failure;
1502         switch (serverChangeStatus) {
1503             case TIMEOUT:
1504                 failure = new TimeoutException(String.format(
1505                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
1506                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
1507                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
1508                 break;
1509             case NO_LEADER:
1510                 failure = createNoShardLeaderException(shardId);
1511                 break;
1512             case NOT_SUPPORTED:
1513                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1514                         serverChange.getSimpleName(), shardId.getShardName()));
1515                 break;
1516             default :
1517                 failure = new RuntimeException(String.format(
1518                         "%s request to leader %s for shard %s failed with status %s",
1519                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1520         }
1521         return failure;
1522     }
1523
1524     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1525         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1526
1527         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1528                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1529             @Override
1530             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1531                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1532             }
1533
1534             @Override
1535             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1536                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1537             }
1538
1539             private void doRemoveShardReplicaAsync(final String primaryPath) {
1540                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1541                         primaryPath, getSender()), getTargetActor());
1542             }
1543         });
1544     }
1545
1546     private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
1547         LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
1548
1549         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1550                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1551         final String shardName = shardId.getShardName();
1552
1553         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
1554                 shardName, persistenceId(), getSelf()) {
1555             @Override
1556             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1557                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1558             }
1559
1560             @Override
1561             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1562                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1563             }
1564
1565             private void doRemoveShardReplicaAsync(final String primaryPath) {
1566                 getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
1567                         primaryPath, getSender()), getTargetActor());
1568             }
1569         });
1570     }
1571
1572     private void persistShardList() {
1573         List<String> shardList = new ArrayList<>(localShards.keySet());
1574         for (ShardInformation shardInfo : localShards.values()) {
1575             if (!shardInfo.isActiveMember()) {
1576                 shardList.remove(shardInfo.getShardName());
1577             }
1578         }
1579         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1580         saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
1581     }
1582
1583     private ShardManagerSnapshot updateShardManagerSnapshot(
1584             final List<String> shardList,
1585             final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
1586         currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
1587         return currentSnapshot;
1588     }
1589
1590     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1591         currentSnapshot = snapshot;
1592
1593         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1594
1595         final MemberName currentMember = cluster.getCurrentMemberName();
1596         Set<String> configuredShardList =
1597             new HashSet<>(configuration.getMemberShardNames(currentMember));
1598         for (String shard : currentSnapshot.getShardList()) {
1599             if (!configuredShardList.contains(shard)) {
1600                 // add the current member as a replica for the shard
1601                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1602                 configuration.addMemberReplicaForShard(shard, currentMember);
1603             } else {
1604                 configuredShardList.remove(shard);
1605             }
1606         }
1607         for (String shard : configuredShardList) {
1608             // remove the member as a replica for the shard
1609             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1610             configuration.removeMemberReplicaForShard(shard, currentMember);
1611         }
1612     }
1613
1614     private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
1615         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1616             persistenceId());
1617         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1618             0, 0));
1619     }
1620
1621     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1622         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1623
1624         String shardName = changeMembersVotingStatus.getShardName();
1625         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1626         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1627             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1628                     e.getValue());
1629         }
1630
1631         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1632
1633         findLocalShard(shardName, getSender(),
1634             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1635             localShardFound.getPath(), getSender()));
1636     }
1637
1638     private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
1639         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1640
1641         ActorRef sender = getSender();
1642         final String shardName = flipMembersVotingStatus.getShardName();
1643         findLocalShard(shardName, sender, localShardFound -> {
1644             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1645                     Timeout.apply(30, TimeUnit.SECONDS));
1646
1647             future.onComplete(new OnComplete<Object>() {
1648                 @Override
1649                 public void onComplete(Throwable failure, Object response) {
1650                     if (failure != null) {
1651                         sender.tell(new Status.Failure(new RuntimeException(
1652                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1653                         return;
1654                     }
1655
1656                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1657                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1658                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1659                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1660                     }
1661
1662                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1663                             .toString(), !raftState.isVoting());
1664
1665                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1666                             shardName, localShardFound.getPath(), sender);
1667                 }
1668             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1669         });
1670
1671     }
1672
1673     private void findLocalShard(FindLocalShard message) {
1674         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
1675
1676         final ShardInformation shardInformation = localShards.get(message.getShardName());
1677
1678         if (shardInformation == null) {
1679             LOG.debug("{}: Local shard {} not found - shards present: {}",
1680                     persistenceId(), message.getShardName(), localShards.keySet());
1681
1682             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1683             return;
1684         }
1685
1686         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1687             () -> new LocalShardFound(shardInformation.getActor()));
1688     }
1689
1690     private void findLocalShard(final String shardName, final ActorRef sender,
1691             final Consumer<LocalShardFound> onLocalShardFound) {
1692         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1693                 .getShardInitializationTimeout().duration().$times(2));
1694
1695         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1696         futureObj.onComplete(new OnComplete<Object>() {
1697             @Override
1698             public void onComplete(Throwable failure, Object response) {
1699                 if (failure != null) {
1700                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1701                             failure);
1702                     sender.tell(new Status.Failure(new RuntimeException(
1703                             String.format("Failed to find local shard %s", shardName), failure)), self());
1704                 } else {
1705                     if (response instanceof LocalShardFound) {
1706                         getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
1707                                 sender);
1708                     } else if (response instanceof LocalShardNotFound) {
1709                         String msg = String.format("Local shard %s does not exist", shardName);
1710                         LOG.debug("{}: {}", persistenceId, msg);
1711                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
1712                     } else {
1713                         String msg = String.format("Failed to find local shard %s: received response: %s",
1714                                 shardName, response);
1715                         LOG.debug("{}: {}", persistenceId, msg);
1716                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1717                                 new RuntimeException(msg)), self());
1718                     }
1719                 }
1720             }
1721         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1722     }
1723
1724     private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
1725             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1726         if (isShardReplicaOperationInProgress(shardName, sender)) {
1727             return;
1728         }
1729
1730         shardReplicaOperationsInProgress.add(shardName);
1731
1732         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1733         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1734
1735         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1736                 changeServersVotingStatus, shardActorRef.path());
1737
1738         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1739         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1740
1741         futureObj.onComplete(new OnComplete<Object>() {
1742             @Override
1743             public void onComplete(Throwable failure, Object response) {
1744                 shardReplicaOperationsInProgress.remove(shardName);
1745                 if (failure != null) {
1746                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
1747                             shardActorRef.path());
1748                     LOG.debug("{}: {}", persistenceId(), msg, failure);
1749                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
1750                 } else {
1751                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1752
1753                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1754                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1755                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1756                         sender.tell(new Status.Success(null), getSelf());
1757                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1758                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1759                                 "The requested voting state change for shard %s is invalid. At least one member "
1760                                 + "must be voting", shardId.getShardName()))), getSelf());
1761                     } else {
1762                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1763                                 persistenceId(), shardName, replyMsg.getStatus());
1764
1765                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1766                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1767                         sender.tell(new Status.Failure(error), getSelf());
1768                     }
1769                 }
1770             }
1771         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1772     }
1773
1774     private static final class ForwardedAddServerReply {
1775         ShardInformation shardInfo;
1776         AddServerReply addServerReply;
1777         String leaderPath;
1778         boolean removeShardOnFailure;
1779
1780         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1781                 boolean removeShardOnFailure) {
1782             this.shardInfo = shardInfo;
1783             this.addServerReply = addServerReply;
1784             this.leaderPath = leaderPath;
1785             this.removeShardOnFailure = removeShardOnFailure;
1786         }
1787     }
1788
1789     private static final class ForwardedAddServerFailure {
1790         String shardName;
1791         String failureMessage;
1792         Throwable failure;
1793         boolean removeShardOnFailure;
1794
1795         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1796                 boolean removeShardOnFailure) {
1797             this.shardName = shardName;
1798             this.failureMessage = failureMessage;
1799             this.failure = failure;
1800             this.removeShardOnFailure = removeShardOnFailure;
1801         }
1802     }
1803
1804     static class OnShardInitialized {
1805         private final Runnable replyRunnable;
1806         private Cancellable timeoutSchedule;
1807
1808         OnShardInitialized(Runnable replyRunnable) {
1809             this.replyRunnable = replyRunnable;
1810         }
1811
1812         Runnable getReplyRunnable() {
1813             return replyRunnable;
1814         }
1815
1816         Cancellable getTimeoutSchedule() {
1817             return timeoutSchedule;
1818         }
1819
1820         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1821             this.timeoutSchedule = timeoutSchedule;
1822         }
1823     }
1824
1825     static class OnShardReady extends OnShardInitialized {
1826         OnShardReady(Runnable replyRunnable) {
1827             super(replyRunnable);
1828         }
1829     }
1830
1831     private interface RunnableMessage extends Runnable {
1832     }
1833
1834     /**
1835      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1836      * a remote or local find primary message is processed.
1837      */
1838     private interface FindPrimaryResponseHandler {
1839         /**
1840          * Invoked when a Failure message is received as a response.
1841          *
1842          * @param failure the failure exception
1843          */
1844         void onFailure(Throwable failure);
1845
1846         /**
1847          * Invoked when a RemotePrimaryShardFound response is received.
1848          *
1849          * @param response the response
1850          */
1851         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1852
1853         /**
1854          * Invoked when a LocalPrimaryShardFound response is received.
1855          *
1856          * @param response the response
1857          */
1858         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1859
1860         /**
1861          * Invoked when an unknown response is received. This is another type of failure.
1862          *
1863          * @param response the response
1864          */
1865         void onUnknownResponse(Object response);
1866     }
1867
1868     /**
1869      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1870      * replica and sends a wrapped Failure response to some targetActor.
1871      */
1872     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1873         private final ActorRef targetActor;
1874         private final String shardName;
1875         private final String persistenceId;
1876         private final ActorRef shardManagerActor;
1877
1878         /**
1879          * Constructs an instance.
1880          *
1881          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1882          * @param shardName The name of the shard for which the primary replica had to be found
1883          * @param persistenceId The persistenceId for the ShardManager
1884          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1885          */
1886         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
1887                 ActorRef shardManagerActor) {
1888             this.targetActor = Preconditions.checkNotNull(targetActor);
1889             this.shardName = Preconditions.checkNotNull(shardName);
1890             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1891             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1892         }
1893
1894         public ActorRef getTargetActor() {
1895             return targetActor;
1896         }
1897
1898         public String getShardName() {
1899             return shardName;
1900         }
1901
1902         @Override
1903         public void onFailure(Throwable failure) {
1904             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1905             targetActor.tell(new Status.Failure(new RuntimeException(
1906                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1907         }
1908
1909         @Override
1910         public void onUnknownResponse(Object response) {
1911             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1912                     shardName, response);
1913             LOG.debug("{}: {}", persistenceId, msg);
1914             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1915                     new RuntimeException(msg)), shardManagerActor);
1916         }
1917     }
1918
1919     /**
1920      * The WrappedShardResponse class wraps a response from a Shard.
1921      */
1922     private static final class WrappedShardResponse {
1923         private final ShardIdentifier shardId;
1924         private final Object response;
1925         private final String leaderPath;
1926
1927         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1928             this.shardId = shardId;
1929             this.response = response;
1930             this.leaderPath = leaderPath;
1931         }
1932
1933         ShardIdentifier getShardId() {
1934             return shardId;
1935         }
1936
1937         Object getResponse() {
1938             return response;
1939         }
1940
1941         String getLeaderPath() {
1942             return leaderPath;
1943         }
1944     }
1945
1946     private static final class ShardNotInitializedTimeout {
1947         private final ActorRef sender;
1948         private final ShardInformation shardInfo;
1949         private final OnShardInitialized onShardInitialized;
1950
1951         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1952             this.sender = sender;
1953             this.shardInfo = shardInfo;
1954             this.onShardInitialized = onShardInitialized;
1955         }
1956
1957         ActorRef getSender() {
1958             return sender;
1959         }
1960
1961         ShardInformation getShardInfo() {
1962             return shardInfo;
1963         }
1964
1965         OnShardInitialized getOnShardInitialized() {
1966             return onShardInitialized;
1967         }
1968     }
1969 }
1970
1971
1972