Remove DOMDataTreeProducer-related classes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12 import static java.util.Objects.requireNonNull;
13
14 import akka.actor.ActorRef;
15 import akka.actor.Address;
16 import akka.actor.Cancellable;
17 import akka.actor.OneForOneStrategy;
18 import akka.actor.PoisonPill;
19 import akka.actor.Status;
20 import akka.actor.SupervisorStrategy;
21 import akka.actor.SupervisorStrategy.Directive;
22 import akka.cluster.ClusterEvent;
23 import akka.cluster.ClusterEvent.MemberWeaklyUp;
24 import akka.cluster.Member;
25 import akka.dispatch.Futures;
26 import akka.dispatch.OnComplete;
27 import akka.japi.Function;
28 import akka.pattern.Patterns;
29 import akka.persistence.DeleteSnapshotsFailure;
30 import akka.persistence.DeleteSnapshotsSuccess;
31 import akka.persistence.RecoveryCompleted;
32 import akka.persistence.SaveSnapshotFailure;
33 import akka.persistence.SaveSnapshotSuccess;
34 import akka.persistence.SnapshotOffer;
35 import akka.persistence.SnapshotSelectionCriteria;
36 import akka.util.Timeout;
37 import com.google.common.annotations.VisibleForTesting;
38 import com.google.common.util.concurrent.SettableFuture;
39 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.Collections;
43 import java.util.HashMap;
44 import java.util.HashSet;
45 import java.util.List;
46 import java.util.Map;
47 import java.util.Map.Entry;
48 import java.util.Set;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.TimeoutException;
51 import java.util.function.Consumer;
52 import java.util.function.Supplier;
53 import org.opendaylight.controller.cluster.access.concepts.MemberName;
54 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
55 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
56 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
57 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
58 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
59 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
60 import org.opendaylight.controller.cluster.datastore.Shard;
61 import org.opendaylight.controller.cluster.datastore.config.Configuration;
62 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
63 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
64 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
65 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
66 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
67 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
68 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
69 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
70 import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
71 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
72 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
73 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
74 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
75 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
76 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
77 import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
78 import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
79 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
80 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
81 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
82 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
83 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
84 import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
85 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
86 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
87 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
88 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
89 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
90 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
91 import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
92 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
93 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
94 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
95 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
96 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
97 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
98 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
99 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
100 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
101 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
102 import org.opendaylight.controller.cluster.raft.messages.AddServer;
103 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
104 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
105 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
106 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
107 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
108 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
109 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
110 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
111 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
112 import org.opendaylight.yangtools.concepts.Registration;
113 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
114 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
115 import org.slf4j.Logger;
116 import org.slf4j.LoggerFactory;
117 import scala.concurrent.ExecutionContext;
118 import scala.concurrent.Future;
119 import scala.concurrent.duration.FiniteDuration;
120
121 /**
122  * Manages the shards for a data store. The ShardManager has the following jobs:
123  * <ul>
124  * <li> Create all the local shard replicas that belong on this cluster member
125  * <li> Find the address of the local shard
126  * <li> Find the primary replica for any given shard
127  * <li> Monitor the cluster members and store their addresses
128  * </ul>
129  */
130 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
131     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
132
133     // Stores a mapping between a shard name and it's corresponding information
134     // Shard names look like inventory, topology etc and are as specified in
135     // configuration
136     @VisibleForTesting
137     final Map<String, ShardInformation> localShards = new HashMap<>();
138
139     // The type of a ShardManager reflects the type of the datastore itself
140     // A data store could be of type config/operational
141     private final String type;
142
143     private final ClusterWrapper cluster;
144
145     private final Configuration configuration;
146
147     @VisibleForTesting
148     final String shardDispatcherPath;
149
150     private final ShardManagerInfo shardManagerMBean;
151
152     private DatastoreContextFactory datastoreContextFactory;
153
154     private final SettableFuture<Void> readinessFuture;
155
156     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
157
158     @VisibleForTesting
159     final ShardPeerAddressResolver peerAddressResolver;
160
161     private EffectiveModelContext schemaContext;
162
163     private DatastoreSnapshot restoreFromSnapshot;
164
165     private ShardManagerSnapshot currentSnapshot;
166
167     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
168
169     private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
170
171     private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
172
173     private final String persistenceId;
174
175     ShardManager(final AbstractShardManagerCreator<?> builder) {
176         this.cluster = builder.getCluster();
177         this.configuration = builder.getConfiguration();
178         this.datastoreContextFactory = builder.getDatastoreContextFactory();
179         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
180         this.shardDispatcherPath =
181                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
182         this.readinessFuture = builder.getReadinessFuture();
183         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
184         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
185
186         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
187         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
188
189         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
190
191         // Subscribe this actor to cluster member events
192         cluster.subscribeToMemberEvents(getSelf());
193
194         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
195                 "shard-manager-" + this.type,
196                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
197         shardManagerMBean.registerMBean();
198     }
199
200     @Override
201     public void preStart() {
202         LOG.info("Starting ShardManager {}", persistenceId);
203     }
204
205     @Override
206     public void postStop() {
207         LOG.info("Stopping ShardManager {}", persistenceId());
208
209         shardManagerMBean.unregisterMBean();
210     }
211
212     @Override
213     public void handleCommand(final Object message) throws Exception {
214         if (message  instanceof FindPrimary) {
215             findPrimary((FindPrimary)message);
216         } else if (message instanceof FindLocalShard) {
217             findLocalShard((FindLocalShard) message);
218         } else if (message instanceof UpdateSchemaContext) {
219             updateSchemaContext(message);
220         } else if (message instanceof ActorInitialized) {
221             onActorInitialized(message);
222         } else if (message instanceof ClusterEvent.MemberUp) {
223             memberUp((ClusterEvent.MemberUp) message);
224         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
225             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
226         } else if (message instanceof ClusterEvent.MemberExited) {
227             memberExited((ClusterEvent.MemberExited) message);
228         } else if (message instanceof ClusterEvent.MemberRemoved) {
229             memberRemoved((ClusterEvent.MemberRemoved) message);
230         } else if (message instanceof ClusterEvent.UnreachableMember) {
231             memberUnreachable((ClusterEvent.UnreachableMember) message);
232         } else if (message instanceof ClusterEvent.ReachableMember) {
233             memberReachable((ClusterEvent.ReachableMember) message);
234         } else if (message instanceof DatastoreContextFactory) {
235             onDatastoreContextFactory((DatastoreContextFactory) message);
236         } else if (message instanceof RoleChangeNotification) {
237             onRoleChangeNotification((RoleChangeNotification) message);
238         } else if (message instanceof FollowerInitialSyncUpStatus) {
239             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
240         } else if (message instanceof ShardNotInitializedTimeout) {
241             onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
242         } else if (message instanceof ShardLeaderStateChanged) {
243             onLeaderStateChanged((ShardLeaderStateChanged) message);
244         } else if (message instanceof SwitchShardBehavior) {
245             onSwitchShardBehavior((SwitchShardBehavior) message);
246         } else if (message instanceof CreateShard) {
247             onCreateShard((CreateShard)message);
248         } else if (message instanceof AddShardReplica) {
249             onAddShardReplica((AddShardReplica) message);
250         } else if (message instanceof AddPrefixShardReplica) {
251             onAddPrefixShardReplica((AddPrefixShardReplica) message);
252         } else if (message instanceof ForwardedAddServerReply) {
253             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
254             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
255                     msg.removeShardOnFailure);
256         } else if (message instanceof ForwardedAddServerFailure) {
257             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
258             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
259         } else if (message instanceof RemoveShardReplica) {
260             onRemoveShardReplica((RemoveShardReplica) message);
261         } else if (message instanceof RemovePrefixShardReplica) {
262             onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
263         } else if (message instanceof WrappedShardResponse) {
264             onWrappedShardResponse((WrappedShardResponse) message);
265         } else if (message instanceof GetSnapshot) {
266             onGetSnapshot((GetSnapshot) message);
267         } else if (message instanceof ServerRemoved) {
268             onShardReplicaRemoved((ServerRemoved) message);
269         } else if (message instanceof ChangeShardMembersVotingStatus) {
270             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
271         } else if (message instanceof FlipShardMembersVotingStatus) {
272             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
273         } else if (message instanceof SaveSnapshotSuccess) {
274             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
275         } else if (message instanceof SaveSnapshotFailure) {
276             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
277                     ((SaveSnapshotFailure) message).cause());
278         } else if (message instanceof Shutdown) {
279             onShutDown();
280         } else if (message instanceof GetLocalShardIds) {
281             onGetLocalShardIds();
282         } else if (message instanceof GetShardRole) {
283             onGetShardRole((GetShardRole) message);
284         } else if (message instanceof RunnableMessage) {
285             ((RunnableMessage)message).run();
286         } else if (message instanceof RegisterForShardAvailabilityChanges) {
287             onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message);
288         } else if (message instanceof DeleteSnapshotsFailure) {
289             LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
290                     ((DeleteSnapshotsFailure) message).cause());
291         } else if (message instanceof DeleteSnapshotsSuccess) {
292             LOG.debug("{}: Successfully deleted prior snapshots", persistenceId());
293         } else if (message instanceof RegisterRoleChangeListenerReply) {
294             LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
295         } else if (message instanceof ClusterEvent.MemberEvent) {
296             LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message);
297         } else {
298             unknownMessage(message);
299         }
300     }
301
302     private void onRegisterForShardAvailabilityChanges(final RegisterForShardAvailabilityChanges message) {
303         LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
304
305         final Consumer<String> callback = message.getCallback();
306         shardAvailabilityCallbacks.add(callback);
307
308         getSender().tell(new Status.Success((Registration)
309             () -> executeInSelf(() -> shardAvailabilityCallbacks.remove(callback))), self());
310     }
311
312     private void onGetShardRole(final GetShardRole message) {
313         LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
314
315         final String name = message.getName();
316
317         final ShardInformation shardInformation = localShards.get(name);
318
319         if (shardInformation == null) {
320             LOG.info("{}: no shard information for {} found", persistenceId(), name);
321             getSender().tell(new Status.Failure(
322                     new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender());
323             return;
324         }
325
326         getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
327     }
328
329     void onShutDown() {
330         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
331         for (ShardInformation info : localShards.values()) {
332             if (info.getActor() != null) {
333                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
334
335                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
336                         .getElectionTimeOutInterval().$times(2);
337                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
338             }
339         }
340
341         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
342
343         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
344                 .getDispatcher(Dispatchers.DispatcherType.Client);
345         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
346
347         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
348             @Override
349             public void onComplete(final Throwable failure, final Iterable<Boolean> results) {
350                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
351
352                 self().tell(PoisonPill.getInstance(), self());
353
354                 if (failure != null) {
355                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
356                 } else {
357                     int nfailed = 0;
358                     for (Boolean result : results) {
359                         if (!result) {
360                             nfailed++;
361                         }
362                     }
363
364                     if (nfailed > 0) {
365                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
366                     }
367                 }
368             }
369         }, dispatcher);
370     }
371
372     private void onWrappedShardResponse(final WrappedShardResponse message) {
373         if (message.getResponse() instanceof RemoveServerReply) {
374             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
375                     message.getLeaderPath());
376         }
377     }
378
379     private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId,
380             final RemoveServerReply replyMsg, final String leaderPath) {
381         shardReplicaOperationsInProgress.remove(shardId.getShardName());
382
383         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
384
385         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
386             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
387                     shardId.getShardName());
388             originalSender.tell(new Status.Success(null), getSelf());
389         } else {
390             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
391                     persistenceId(), shardId, replyMsg.getStatus());
392
393             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
394             originalSender.tell(new Status.Failure(failure), getSelf());
395         }
396     }
397
398     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
399             justification = "https://github.com/spotbugs/spotbugs/issues/811")
400     private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
401                                           final String primaryPath, final ActorRef sender) {
402         if (isShardReplicaOperationInProgress(shardName, sender)) {
403             return;
404         }
405
406         shardReplicaOperationsInProgress.add(shardName);
407
408         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
409
410         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
411
412         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
413         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
414                 primaryPath, shardId);
415
416         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
417         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
418                 new RemoveServer(shardId.toString()), removeServerTimeout);
419
420         futureObj.onComplete(new OnComplete<>() {
421             @Override
422             public void onComplete(final Throwable failure, final Object response) {
423                 if (failure != null) {
424                     shardReplicaOperationsInProgress.remove(shardName);
425
426                     LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
427                         shardName, failure);
428
429                     // FAILURE
430                     sender.tell(new Status.Failure(new RuntimeException(
431                         String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
432                         failure)), self());
433                 } else {
434                     // SUCCESS
435                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
436                 }
437             }
438         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
439     }
440
441     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
442             justification = "https://github.com/spotbugs/spotbugs/issues/811")
443     private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
444             final String primaryPath, final ActorRef sender) {
445         if (isShardReplicaOperationInProgress(shardName, sender)) {
446             return;
447         }
448
449         shardReplicaOperationsInProgress.add(shardName);
450
451         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
452
453         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
454
455         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
456         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
457                 primaryPath, shardId);
458
459         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
460         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
461                 new RemoveServer(shardId.toString()), removeServerTimeout);
462
463         futureObj.onComplete(new OnComplete<>() {
464             @Override
465             public void onComplete(final Throwable failure, final Object response) {
466                 if (failure != null) {
467                     shardReplicaOperationsInProgress.remove(shardName);
468                     LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
469                         shardName, failure);
470
471                     // FAILURE
472                     sender.tell(new Status.Failure(new RuntimeException(
473                         String.format("RemoveServer request to leader %s for shard %s failed", primaryPath, shardName),
474                         failure)), self());
475                 } else {
476                     // SUCCESS
477                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
478                 }
479             }
480         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
481     }
482
483     private void onShardReplicaRemoved(final ServerRemoved message) {
484         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
485     }
486
487     @SuppressWarnings("checkstyle:IllegalCatch")
488     private void removeShard(final ShardIdentifier shardId) {
489         final String shardName = shardId.getShardName();
490         final ShardInformation shardInformation = localShards.remove(shardName);
491         if (shardInformation == null) {
492             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
493             return;
494         }
495
496         final ActorRef shardActor = shardInformation.getActor();
497         if (shardActor != null) {
498             long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
499                     .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
500
501             LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
502                     timeoutInMS);
503
504             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
505                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
506
507             final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<>() {
508                 @Override
509                 public void onComplete(final Throwable failure, final Boolean result) {
510                     if (failure == null) {
511                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
512                     } else {
513                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
514                     }
515
516                     self().tell((RunnableMessage) () -> {
517                         // At any rate, invalidate primaryShardInfo cache
518                         primaryShardInfoCache.remove(shardName);
519
520                         shardActorsStopping.remove(shardName);
521                         notifyOnCompleteTasks(failure, result);
522                     }, ActorRef.noSender());
523                 }
524             };
525
526             shardActorsStopping.put(shardName, onComplete);
527             stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
528                     .getDispatcher(Dispatchers.DispatcherType.Client));
529         }
530
531         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
532         persistShardList();
533     }
534
535     private void onGetSnapshot(final GetSnapshot getSnapshot) {
536         LOG.debug("{}: onGetSnapshot", persistenceId());
537
538         List<String> notInitialized = null;
539         for (ShardInformation shardInfo : localShards.values()) {
540             if (!shardInfo.isShardInitialized()) {
541                 if (notInitialized == null) {
542                     notInitialized = new ArrayList<>();
543                 }
544
545                 notInitialized.add(shardInfo.getShardName());
546             }
547         }
548
549         if (notInitialized != null) {
550             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
551                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
552             return;
553         }
554
555         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
556                 new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
557                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
558
559         for (ShardInformation shardInfo: localShards.values()) {
560             shardInfo.getActor().tell(getSnapshot, replyActor);
561         }
562     }
563
564     @SuppressWarnings("checkstyle:IllegalCatch")
565     private void onCreateShard(final CreateShard createShard) {
566         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
567
568         Object reply;
569         try {
570             String shardName = createShard.getModuleShardConfig().getShardName();
571             if (localShards.containsKey(shardName)) {
572                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
573                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
574             } else {
575                 doCreateShard(createShard);
576                 reply = new Status.Success(null);
577             }
578         } catch (Exception e) {
579             LOG.error("{}: onCreateShard failed", persistenceId(), e);
580             reply = new Status.Failure(e);
581         }
582
583         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
584             getSender().tell(reply, getSelf());
585         }
586     }
587
588     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
589         justification = "https://github.com/spotbugs/spotbugs/issues/811")
590     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
591         final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
592         if (stopOnComplete == null) {
593             return false;
594         }
595
596         LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
597                 shardName, messageToDefer);
598         final ActorRef sender = getSender();
599         stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
600             @Override
601             public void onComplete(final Throwable failure, final Boolean result) {
602                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
603                 self().tell(messageToDefer, sender);
604             }
605         });
606
607         return true;
608     }
609
610     private void doCreateShard(final CreateShard createShard) {
611         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
612         final String shardName = moduleShardConfig.getShardName();
613
614         configuration.addModuleShardConfiguration(moduleShardConfig);
615
616         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
617         if (shardDatastoreContext == null) {
618             shardDatastoreContext = newShardDatastoreContext(shardName);
619         } else {
620             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
621                     peerAddressResolver).build();
622         }
623
624         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
625
626         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
627                 && currentSnapshot.getShardList().contains(shardName);
628
629         Map<String, String> peerAddresses;
630         boolean isActiveMember;
631         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
632                 .contains(cluster.getCurrentMemberName())) {
633             peerAddresses = getPeerAddresses(shardName);
634             isActiveMember = true;
635         } else {
636             // The local member is not in the static shard member configuration and the shard did not
637             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
638             // the shard with no peers and with elections disabled so it stays as follower. A
639             // subsequent AddServer request will be needed to make it an active member.
640             isActiveMember = false;
641             peerAddresses = Collections.emptyMap();
642             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
643                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
644         }
645
646         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
647                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
648                 isActiveMember);
649
650         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
651                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
652         info.setActiveMember(isActiveMember);
653         localShards.put(info.getShardName(), info);
654
655         if (schemaContext != null) {
656             info.setSchemaContext(schemaContext);
657             info.setActor(newShardActor(info));
658         }
659     }
660
661     private DatastoreContext.Builder newShardDatastoreContextBuilder(final String shardName) {
662         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
663                 .shardPeerAddressResolver(peerAddressResolver);
664     }
665
666     private DatastoreContext newShardDatastoreContext(final String shardName) {
667         return newShardDatastoreContextBuilder(shardName).build();
668     }
669
670     private void checkReady() {
671         if (isReadyWithLeaderId()) {
672             LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type);
673             readinessFuture.set(null);
674         }
675     }
676
677     private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) {
678         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
679
680         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
681         if (shardInformation != null) {
682             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
683             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
684             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
685                 primaryShardInfoCache.remove(shardInformation.getShardName());
686
687                 notifyShardAvailabilityCallbacks(shardInformation);
688             }
689
690             checkReady();
691         } else {
692             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
693         }
694     }
695
696     private void notifyShardAvailabilityCallbacks(final ShardInformation shardInformation) {
697         shardAvailabilityCallbacks.forEach(callback -> callback.accept(shardInformation.getShardName()));
698     }
699
700     private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
701         ShardInformation shardInfo = message.getShardInfo();
702
703         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
704                 shardInfo.getShardName());
705
706         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
707
708         if (!shardInfo.isShardInitialized()) {
709             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
710             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
711         } else {
712             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
713             message.getSender().tell(new NoShardLeaderException(shardInfo.getShardId()), getSelf());
714         }
715     }
716
717     private void onFollowerInitialSyncStatus(final FollowerInitialSyncUpStatus status) {
718         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
719                 status.getName(), status.isInitialSyncDone());
720
721         ShardInformation shardInformation = findShardInformation(status.getName());
722
723         if (shardInformation != null) {
724             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
725
726             shardManagerMBean.setSyncStatus(isInSync());
727         }
728
729     }
730
731     private void onRoleChangeNotification(final RoleChangeNotification roleChanged) {
732         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
733                 roleChanged.getOldRole(), roleChanged.getNewRole());
734
735         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
736         if (shardInformation != null) {
737             shardInformation.setRole(roleChanged.getNewRole());
738             checkReady();
739             shardManagerMBean.setSyncStatus(isInSync());
740         }
741     }
742
743
744     private ShardInformation findShardInformation(final String memberId) {
745         for (ShardInformation info : localShards.values()) {
746             if (info.getShardId().toString().equals(memberId)) {
747                 return info;
748             }
749         }
750
751         return null;
752     }
753
754     private boolean isReadyWithLeaderId() {
755         boolean isReady = true;
756         for (ShardInformation info : localShards.values()) {
757             if (!info.isShardReadyWithLeaderId()) {
758                 isReady = false;
759                 break;
760             }
761         }
762         return isReady;
763     }
764
765     private boolean isInSync() {
766         for (ShardInformation info : localShards.values()) {
767             if (!info.isInSync()) {
768                 return false;
769             }
770         }
771         return true;
772     }
773
774     private void onActorInitialized(final Object message) {
775         final ActorRef sender = getSender();
776
777         if (sender == null) {
778             // why is a non-actor sending this message? Just ignore.
779             return;
780         }
781
782         String actorName = sender.path().name();
783         //find shard name from actor name; actor name is stringified shardId
784
785         final ShardIdentifier shardId;
786         try {
787             shardId = ShardIdentifier.fromShardIdString(actorName);
788         } catch (IllegalArgumentException e) {
789             LOG.debug("{}: ignoring actor {}", persistenceId, actorName, e);
790             return;
791         }
792
793         markShardAsInitialized(shardId.getShardName());
794     }
795
796     private void markShardAsInitialized(final String shardName) {
797         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
798
799         ShardInformation shardInformation = localShards.get(shardName);
800         if (shardInformation != null) {
801             shardInformation.setActorInitialized();
802
803             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
804         }
805     }
806
807     @Override
808     protected void handleRecover(final Object message) throws Exception {
809         if (message instanceof RecoveryCompleted) {
810             onRecoveryCompleted();
811         } else if (message instanceof SnapshotOffer) {
812             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
813         }
814     }
815
816     @SuppressWarnings("checkstyle:IllegalCatch")
817     private void onRecoveryCompleted() {
818         LOG.info("Recovery complete : {}", persistenceId());
819
820         if (currentSnapshot == null && restoreFromSnapshot != null
821                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
822             ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
823
824             LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
825
826             applyShardManagerSnapshot(snapshot);
827         }
828
829         createLocalShards();
830     }
831
832     private void sendResponse(final ShardInformation shardInformation, final boolean doWait,
833             final boolean wantShardReady, final Supplier<Object> messageSupplier) {
834         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
835             if (doWait) {
836                 final ActorRef sender = getSender();
837                 final ActorRef self = self();
838
839                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
840
841                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
842                     new OnShardInitialized(replyRunnable);
843
844                 shardInformation.addOnShardInitialized(onShardInitialized);
845
846                 FiniteDuration timeout = shardInformation.getDatastoreContext()
847                         .getShardInitializationTimeout().duration();
848                 if (shardInformation.isShardInitialized()) {
849                     // If the shard is already initialized then we'll wait enough time for the shard to
850                     // elect a leader, ie 2 times the election timeout.
851                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
852                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
853                 }
854
855                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
856                         shardInformation);
857
858                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
859                         timeout, getSelf(),
860                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
861                         getContext().dispatcher(), getSelf());
862
863                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
864
865             } else if (!shardInformation.isShardInitialized()) {
866                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
867                         shardInformation.getShardName());
868                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
869             } else {
870                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
871                         shardInformation.getShardName());
872                 getSender().tell(new NoShardLeaderException(shardInformation.getShardId()), getSelf());
873             }
874
875             return;
876         }
877
878         getSender().tell(messageSupplier.get(), getSelf());
879     }
880
881     private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
882         return new NotInitializedException(String.format(
883                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
884     }
885
886     @VisibleForTesting
887     static MemberName memberToName(final Member member) {
888         return MemberName.forName(member.roles().iterator().next());
889     }
890
891     private void memberRemoved(final ClusterEvent.MemberRemoved message) {
892         MemberName memberName = memberToName(message.member());
893
894         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
895                 message.member().address());
896
897         peerAddressResolver.removePeerAddress(memberName);
898
899         for (ShardInformation info : localShards.values()) {
900             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
901         }
902     }
903
904     private void memberExited(final ClusterEvent.MemberExited message) {
905         MemberName memberName = memberToName(message.member());
906
907         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
908                 message.member().address());
909
910         peerAddressResolver.removePeerAddress(memberName);
911
912         for (ShardInformation info : localShards.values()) {
913             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
914         }
915     }
916
917     private void memberUp(final ClusterEvent.MemberUp message) {
918         MemberName memberName = memberToName(message.member());
919
920         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
921                 message.member().address());
922
923         memberUp(memberName, message.member().address());
924     }
925
926     private void memberUp(final MemberName memberName, final Address address) {
927         addPeerAddress(memberName, address);
928         checkReady();
929     }
930
931     private void memberWeaklyUp(final MemberWeaklyUp message) {
932         MemberName memberName = memberToName(message.member());
933
934         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
935                 message.member().address());
936
937         memberUp(memberName, message.member().address());
938     }
939
940     private void addPeerAddress(final MemberName memberName, final Address address) {
941         peerAddressResolver.addPeerAddress(memberName, address);
942
943         for (ShardInformation info : localShards.values()) {
944             String shardName = info.getShardName();
945             String peerId = getShardIdentifier(memberName, shardName).toString();
946             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
947
948             info.peerUp(memberName, peerId, getSelf());
949         }
950     }
951
952     private void memberReachable(final ClusterEvent.ReachableMember message) {
953         MemberName memberName = memberToName(message.member());
954         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
955
956         addPeerAddress(memberName, message.member().address());
957
958         markMemberAvailable(memberName);
959     }
960
961     private void memberUnreachable(final ClusterEvent.UnreachableMember message) {
962         MemberName memberName = memberToName(message.member());
963         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
964
965         markMemberUnavailable(memberName);
966     }
967
968     private void markMemberUnavailable(final MemberName memberName) {
969         for (ShardInformation info : localShards.values()) {
970             String leaderId = info.getLeaderId();
971             if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
972                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
973                 info.setLeaderAvailable(false);
974
975                 primaryShardInfoCache.remove(info.getShardName());
976
977                 notifyShardAvailabilityCallbacks(info);
978             }
979
980             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
981         }
982     }
983
984     private void markMemberAvailable(final MemberName memberName) {
985         for (ShardInformation info : localShards.values()) {
986             String leaderId = info.getLeaderId();
987             if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
988                 LOG.debug("Marking Leader {} as available.", leaderId);
989                 info.setLeaderAvailable(true);
990             }
991
992             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
993         }
994     }
995
996     private void onDatastoreContextFactory(final DatastoreContextFactory factory) {
997         datastoreContextFactory = factory;
998         for (ShardInformation info : localShards.values()) {
999             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
1000         }
1001     }
1002
1003     private void onGetLocalShardIds() {
1004         final List<String> response = new ArrayList<>(localShards.size());
1005
1006         for (ShardInformation info : localShards.values()) {
1007             response.add(info.getShardId().toString());
1008         }
1009
1010         getSender().tell(new Status.Success(response), getSelf());
1011     }
1012
1013     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
1014         final ShardIdentifier identifier = message.getShardId();
1015
1016         if (identifier != null) {
1017             final ShardInformation info = localShards.get(identifier.getShardName());
1018             if (info == null) {
1019                 getSender().tell(new Status.Failure(
1020                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
1021                 return;
1022             }
1023
1024             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1025         } else {
1026             for (ShardInformation info : localShards.values()) {
1027                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1028             }
1029         }
1030
1031         getSender().tell(new Status.Success(null), getSelf());
1032     }
1033
1034     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
1035         final ActorRef actor = info.getActor();
1036         if (actor != null) {
1037             actor.tell(switchBehavior, getSelf());
1038         } else {
1039             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
1040                 info.getShardName(), switchBehavior.getNewState());
1041         }
1042     }
1043
1044     /**
1045      * Notifies all the local shards of a change in the schema context.
1046      *
1047      * @param message the message to send
1048      */
1049     private void updateSchemaContext(final Object message) {
1050         schemaContext = ((UpdateSchemaContext) message).getEffectiveModelContext();
1051
1052         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
1053
1054         for (ShardInformation info : localShards.values()) {
1055             info.setSchemaContext(schemaContext);
1056
1057             if (info.getActor() == null) {
1058                 LOG.debug("Creating Shard {}", info.getShardId());
1059                 info.setActor(newShardActor(info));
1060                 // Update peer address for every existing peer memeber to avoid missing sending
1061                 // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp.
1062                 String shardName = info.getShardName();
1063                 for (MemberName memberName : peerAddressResolver.getPeerMembers()) {
1064                     String peerId = getShardIdentifier(memberName, shardName).toString() ;
1065                     String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName);
1066                     info.updatePeerAddress(peerId, peerAddress, getSelf());
1067                     info.peerUp(memberName, peerId, getSelf());
1068                     LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}",
1069                             persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
1070                 }
1071             } else {
1072                 info.getActor().tell(message, getSelf());
1073             }
1074         }
1075     }
1076
1077     @VisibleForTesting
1078     protected ClusterWrapper getCluster() {
1079         return cluster;
1080     }
1081
1082     @VisibleForTesting
1083     protected ActorRef newShardActor(final ShardInformation info) {
1084         return getContext().actorOf(info.newProps().withDispatcher(shardDispatcherPath),
1085                 info.getShardId().toString());
1086     }
1087
1088     private void findPrimary(final FindPrimary message) {
1089         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
1090
1091         final String shardName = message.getShardName();
1092         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
1093
1094         // First see if the there is a local replica for the shard
1095         final ShardInformation info = localShards.get(shardName);
1096         if (info != null && info.isActiveMember()) {
1097             sendResponse(info, message.isWaitUntilReady(), true, () -> {
1098                 String primaryPath = info.getSerializedLeaderActor();
1099                 Object found = canReturnLocalShardState && info.isLeader()
1100                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
1101                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
1102
1103                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
1104                 return found;
1105             });
1106
1107             return;
1108         }
1109
1110         final Collection<String> visitedAddresses;
1111         if (message instanceof RemoteFindPrimary) {
1112             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1113         } else {
1114             visitedAddresses = new ArrayList<>(1);
1115         }
1116
1117         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1118
1119         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1120             if (visitedAddresses.contains(address)) {
1121                 continue;
1122             }
1123
1124             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1125                     persistenceId(), shardName, address, visitedAddresses);
1126
1127             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1128                     message.isWaitUntilReady(), visitedAddresses), getContext());
1129             return;
1130         }
1131
1132         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1133
1134         getSender().tell(new PrimaryNotFoundException(
1135                 String.format("No primary shard found for %s.", shardName)), getSelf());
1136     }
1137
1138     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1139         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1140                 .getShardInitializationTimeout().duration().$times(2));
1141
1142         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1143         futureObj.onComplete(new OnComplete<>() {
1144             @Override
1145             public void onComplete(final Throwable failure, final Object response) {
1146                 if (failure != null) {
1147                     handler.onFailure(failure);
1148                 } else {
1149                     if (response instanceof RemotePrimaryShardFound) {
1150                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1151                     } else if (response instanceof LocalPrimaryShardFound) {
1152                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1153                     } else {
1154                         handler.onUnknownResponse(response);
1155                     }
1156                 }
1157             }
1158         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1159     }
1160
1161     /**
1162      * Construct the name of the shard actor given the name of the member on
1163      * which the shard resides and the name of the shard.
1164      *
1165      * @param memberName the member name
1166      * @param shardName the shard name
1167      * @return a b
1168      */
1169     private ShardIdentifier getShardIdentifier(final MemberName memberName, final String shardName) {
1170         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1171     }
1172
1173     /**
1174      * Create shards that are local to the member on which the ShardManager runs.
1175      */
1176     private void createLocalShards() {
1177         MemberName memberName = this.cluster.getCurrentMemberName();
1178         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
1179
1180         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1181         if (restoreFromSnapshot != null) {
1182             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1183                 shardSnapshots.put(snapshot.getName(), snapshot);
1184             }
1185         }
1186
1187         // null out to GC
1188         restoreFromSnapshot = null;
1189
1190         for (String shardName : memberShardNames) {
1191             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1192
1193             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1194
1195             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1196             localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
1197                     newShardDatastoreContext(shardName), shardSnapshots));
1198         }
1199     }
1200
1201     @VisibleForTesting
1202     ShardInformation createShardInfoFor(final String shardName, final ShardIdentifier shardId,
1203                                         final Map<String, String> peerAddresses,
1204                                         final DatastoreContext datastoreContext,
1205                                         final Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots) {
1206         return new ShardInformation(shardName, shardId, peerAddresses,
1207                 datastoreContext, Shard.builder().restoreFromSnapshot(shardSnapshots.get(shardName)),
1208                 peerAddressResolver);
1209     }
1210
1211     /**
1212      * Given the name of the shard find the addresses of all it's peers.
1213      *
1214      * @param shardName the shard name
1215      */
1216     Map<String, String> getPeerAddresses(final String shardName) {
1217         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1218         return getPeerAddresses(shardName, members);
1219     }
1220
1221     private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
1222         Map<String, String> peerAddresses = new HashMap<>();
1223         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1224
1225         for (MemberName memberName : members) {
1226             if (!currentMemberName.equals(memberName)) {
1227                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1228                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1229                 peerAddresses.put(shardId.toString(), address);
1230             }
1231         }
1232         return peerAddresses;
1233     }
1234
1235     @Override
1236     public SupervisorStrategy supervisorStrategy() {
1237
1238         return new OneForOneStrategy(10, FiniteDuration.create(1, TimeUnit.MINUTES),
1239                 (Function<Throwable, Directive>) t -> {
1240                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1241                     return SupervisorStrategy.resume();
1242                 });
1243     }
1244
1245     @Override
1246     public String persistenceId() {
1247         return persistenceId;
1248     }
1249
1250     @VisibleForTesting
1251     ShardManagerInfoMBean getMBean() {
1252         return shardManagerMBean;
1253     }
1254
1255     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1256         if (shardReplicaOperationsInProgress.contains(shardName)) {
1257             LOG.debug("{}: A shard replica operation for {} is already in progress", persistenceId(), shardName);
1258             sender.tell(new Status.Failure(new IllegalStateException(
1259                 String.format("A shard replica operation for %s is already in progress", shardName))), getSelf());
1260             return true;
1261         }
1262
1263         return false;
1264     }
1265
1266     private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
1267         LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
1268
1269         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1270                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1271         final String shardName = shardId.getShardName();
1272
1273         // Create the localShard
1274         if (schemaContext == null) {
1275             LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
1276                 persistenceId(), shardName);
1277             getSender().tell(new Status.Failure(new IllegalStateException(
1278                 "No SchemaContext is available in order to create a local shard instance for " + shardName)),
1279                 getSelf());
1280             return;
1281         }
1282
1283         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1284                 getSelf()) {
1285             @Override
1286             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1287                 final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
1288                         message.getShardPrefix(), response, getSender());
1289                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1290                     getSelf().tell(runnable, getTargetActor());
1291                 }
1292             }
1293
1294             @Override
1295             public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
1296                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1297             }
1298         });
1299     }
1300
1301     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1302         final String shardName = shardReplicaMsg.getShardName();
1303
1304         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1305
1306         // verify the shard with the specified name is present in the cluster configuration
1307         if (!this.configuration.isShardConfigured(shardName)) {
1308             LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName);
1309             getSender().tell(new Status.Failure(new IllegalArgumentException(
1310                 "No module configuration exists for shard " + shardName)), getSelf());
1311             return;
1312         }
1313
1314         // Create the localShard
1315         if (schemaContext == null) {
1316             LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
1317                 persistenceId(), shardName);
1318             getSender().tell(new Status.Failure(new IllegalStateException(
1319                 "No SchemaContext is available in order to create a local shard instance for " + shardName)),
1320                 getSelf());
1321             return;
1322         }
1323
1324         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1325                 getSelf()) {
1326             @Override
1327             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1328                 final RunnableMessage runnable = (RunnableMessage) () ->
1329                     addShard(getShardName(), response, getSender());
1330                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1331                     getSelf().tell(runnable, getTargetActor());
1332                 }
1333             }
1334
1335             @Override
1336             public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
1337                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1338             }
1339         });
1340     }
1341
1342     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
1343             justification = "https://github.com/spotbugs/spotbugs/issues/811")
1344     private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
1345         LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
1346         sender.tell(new Status.Failure(new AlreadyExistsException(
1347             String.format("Local shard %s already exists", shardName))), getSelf());
1348     }
1349
1350     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
1351             justification = "https://github.com/spotbugs/spotbugs/issues/811")
1352     private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
1353                                 final RemotePrimaryShardFound response, final ActorRef sender) {
1354         if (isShardReplicaOperationInProgress(shardName, sender)) {
1355             return;
1356         }
1357
1358         shardReplicaOperationsInProgress.add(shardName);
1359
1360         final ShardInformation shardInfo;
1361         final boolean removeShardOnFailure;
1362         ShardInformation existingShardInfo = localShards.get(shardName);
1363         if (existingShardInfo == null) {
1364             removeShardOnFailure = true;
1365             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1366
1367             final Builder builder = newShardDatastoreContextBuilder(shardName);
1368             builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1369
1370             DatastoreContext datastoreContext = builder.build();
1371
1372             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1373                     Shard.builder(), peerAddressResolver);
1374             shardInfo.setActiveMember(false);
1375             shardInfo.setSchemaContext(schemaContext);
1376             localShards.put(shardName, shardInfo);
1377             shardInfo.setActor(newShardActor(shardInfo));
1378         } else {
1379             removeShardOnFailure = false;
1380             shardInfo = existingShardInfo;
1381         }
1382
1383         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1384     }
1385
1386     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
1387             justification = "https://github.com/spotbugs/spotbugs/issues/811")
1388     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1389         if (isShardReplicaOperationInProgress(shardName, sender)) {
1390             return;
1391         }
1392
1393         shardReplicaOperationsInProgress.add(shardName);
1394
1395         final ShardInformation shardInfo;
1396         final boolean removeShardOnFailure;
1397         ShardInformation existingShardInfo = localShards.get(shardName);
1398         if (existingShardInfo == null) {
1399             removeShardOnFailure = true;
1400             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1401
1402             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1403                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1404
1405             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1406                     Shard.builder(), peerAddressResolver);
1407             shardInfo.setActiveMember(false);
1408             shardInfo.setSchemaContext(schemaContext);
1409             localShards.put(shardName, shardInfo);
1410             shardInfo.setActor(newShardActor(shardInfo));
1411         } else {
1412             removeShardOnFailure = false;
1413             shardInfo = existingShardInfo;
1414         }
1415
1416         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1417     }
1418
1419     private void execAddShard(final String shardName,
1420                               final ShardInformation shardInfo,
1421                               final RemotePrimaryShardFound response,
1422                               final boolean removeShardOnFailure,
1423                               final ActorRef sender) {
1424
1425         final String localShardAddress =
1426                 peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1427
1428         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1429         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1430                 response.getPrimaryPath(), shardInfo.getShardId());
1431
1432         final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1433                 .getShardLeaderElectionTimeout().duration());
1434         final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1435                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1436
1437         futureObj.onComplete(new OnComplete<>() {
1438             @Override
1439             public void onComplete(final Throwable failure, final Object addServerResponse) {
1440                 if (failure != null) {
1441                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1442                             response.getPrimaryPath(), shardName, failure);
1443
1444                     final String msg = String.format("AddServer request to leader %s for shard %s failed",
1445                             response.getPrimaryPath(), shardName);
1446                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1447                 } else {
1448                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1449                             response.getPrimaryPath(), removeShardOnFailure), sender);
1450                 }
1451             }
1452         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1453     }
1454
1455     private void onAddServerFailure(final String shardName, final String message, final Throwable failure,
1456             final ActorRef sender, final boolean removeShardOnFailure) {
1457         shardReplicaOperationsInProgress.remove(shardName);
1458
1459         if (removeShardOnFailure) {
1460             ShardInformation shardInfo = localShards.remove(shardName);
1461             if (shardInfo.getActor() != null) {
1462                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1463             }
1464         }
1465
1466         sender.tell(new Status.Failure(message == null ? failure :
1467             new RuntimeException(message, failure)), getSelf());
1468     }
1469
1470     private void onAddServerReply(final ShardInformation shardInfo, final AddServerReply replyMsg,
1471             final ActorRef sender, final String leaderPath, final boolean removeShardOnFailure) {
1472         String shardName = shardInfo.getShardName();
1473         shardReplicaOperationsInProgress.remove(shardName);
1474
1475         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1476
1477         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1478             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1479
1480             // Make the local shard voting capable
1481             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1482             shardInfo.setActiveMember(true);
1483             persistShardList();
1484
1485             sender.tell(new Status.Success(null), getSelf());
1486         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1487             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1488         } else {
1489             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1490                     persistenceId(), shardName, replyMsg.getStatus());
1491
1492             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1493                     shardInfo.getShardId());
1494
1495             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1496         }
1497     }
1498
1499     private static Exception getServerChangeException(final Class<?> serverChange,
1500             final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) {
1501         switch (serverChangeStatus) {
1502             case TIMEOUT:
1503                 return new TimeoutException(String.format(
1504                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
1505                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
1506                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
1507             case NO_LEADER:
1508                 return new NoShardLeaderException(shardId);
1509             case NOT_SUPPORTED:
1510                 return new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1511                         serverChange.getSimpleName(), shardId.getShardName()));
1512             default :
1513                 return new RuntimeException(String.format("%s request to leader %s for shard %s failed with status %s",
1514                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1515         }
1516     }
1517
1518     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1519         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1520
1521         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1522                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1523             @Override
1524             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1525                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1526             }
1527
1528             @Override
1529             public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
1530                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1531             }
1532
1533             private void doRemoveShardReplicaAsync(final String primaryPath) {
1534                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1535                         primaryPath, getSender()), getTargetActor());
1536             }
1537         });
1538     }
1539
1540     private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
1541         LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
1542
1543         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1544                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1545         final String shardName = shardId.getShardName();
1546
1547         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
1548                 shardName, persistenceId(), getSelf()) {
1549             @Override
1550             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1551                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1552             }
1553
1554             @Override
1555             public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
1556                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1557             }
1558
1559             private void doRemoveShardReplicaAsync(final String primaryPath) {
1560                 getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
1561                         primaryPath, getSender()), getTargetActor());
1562             }
1563         });
1564     }
1565
1566     private void persistShardList() {
1567         List<String> shardList = new ArrayList<>(localShards.keySet());
1568         for (ShardInformation shardInfo : localShards.values()) {
1569             if (!shardInfo.isActiveMember()) {
1570                 shardList.remove(shardInfo.getShardName());
1571             }
1572         }
1573         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1574         saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
1575     }
1576
1577     private ShardManagerSnapshot updateShardManagerSnapshot(
1578             final List<String> shardList,
1579             final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
1580         currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
1581         return currentSnapshot;
1582     }
1583
1584     private void applyShardManagerSnapshot(final ShardManagerSnapshot snapshot) {
1585         currentSnapshot = snapshot;
1586
1587         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1588
1589         final MemberName currentMember = cluster.getCurrentMemberName();
1590         Set<String> configuredShardList =
1591             new HashSet<>(configuration.getMemberShardNames(currentMember));
1592         for (String shard : currentSnapshot.getShardList()) {
1593             if (!configuredShardList.contains(shard)) {
1594                 // add the current member as a replica for the shard
1595                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1596                 configuration.addMemberReplicaForShard(shard, currentMember);
1597             } else {
1598                 configuredShardList.remove(shard);
1599             }
1600         }
1601         for (String shard : configuredShardList) {
1602             // remove the member as a replica for the shard
1603             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1604             configuration.removeMemberReplicaForShard(shard, currentMember);
1605         }
1606     }
1607
1608     private void onSaveSnapshotSuccess(final SaveSnapshotSuccess successMessage) {
1609         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1610             persistenceId());
1611         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1612             0, 0));
1613     }
1614
1615     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1616         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1617
1618         String shardName = changeMembersVotingStatus.getShardName();
1619         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1620         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1621             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1622                     e.getValue());
1623         }
1624
1625         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1626
1627         findLocalShard(shardName, getSender(),
1628             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1629             localShardFound.getPath(), getSender()));
1630     }
1631
1632     private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) {
1633         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1634
1635         ActorRef sender = getSender();
1636         final String shardName = flipMembersVotingStatus.getShardName();
1637         findLocalShard(shardName, sender, localShardFound -> {
1638             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1639                     Timeout.apply(30, TimeUnit.SECONDS));
1640
1641             future.onComplete(new OnComplete<>() {
1642                 @Override
1643                 public void onComplete(final Throwable failure, final Object response) {
1644                     if (failure != null) {
1645                         sender.tell(new Status.Failure(new RuntimeException(
1646                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1647                         return;
1648                     }
1649
1650                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1651                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1652                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1653                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1654                     }
1655
1656                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1657                             .toString(), !raftState.isVoting());
1658
1659                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1660                             shardName, localShardFound.getPath(), sender);
1661                 }
1662             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1663         });
1664
1665     }
1666
1667     private void findLocalShard(final FindLocalShard message) {
1668         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
1669
1670         final ShardInformation shardInformation = localShards.get(message.getShardName());
1671
1672         if (shardInformation == null) {
1673             LOG.debug("{}: Local shard {} not found - shards present: {}",
1674                     persistenceId(), message.getShardName(), localShards.keySet());
1675
1676             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1677             return;
1678         }
1679
1680         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1681             () -> new LocalShardFound(shardInformation.getActor()));
1682     }
1683
1684     private void findLocalShard(final String shardName, final ActorRef sender,
1685             final Consumer<LocalShardFound> onLocalShardFound) {
1686         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1687                 .getShardInitializationTimeout().duration().$times(2));
1688
1689         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1690         futureObj.onComplete(new OnComplete<>() {
1691             @Override
1692             public void onComplete(final Throwable failure, final Object response) {
1693                 if (failure != null) {
1694                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1695                             failure);
1696                     sender.tell(new Status.Failure(new RuntimeException(
1697                             String.format("Failed to find local shard %s", shardName), failure)), self());
1698                 } else {
1699                     if (response instanceof LocalShardFound) {
1700                         getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
1701                                 sender);
1702                     } else if (response instanceof LocalShardNotFound) {
1703                         LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName);
1704                         sender.tell(new Status.Failure(new IllegalArgumentException(
1705                             String.format("Local shard %s does not exist", shardName))), self());
1706                     } else {
1707                         LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName,
1708                             response);
1709                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
1710                                 : new RuntimeException(
1711                                     String.format("Failed to find local shard %s: received response: %s", shardName,
1712                                         response))), self());
1713                     }
1714                 }
1715             }
1716         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1717     }
1718
1719     private void changeShardMembersVotingStatus(final ChangeServersVotingStatus changeServersVotingStatus,
1720             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1721         if (isShardReplicaOperationInProgress(shardName, sender)) {
1722             return;
1723         }
1724
1725         shardReplicaOperationsInProgress.add(shardName);
1726
1727         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1728         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1729
1730         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1731                 changeServersVotingStatus, shardActorRef.path());
1732
1733         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1734         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1735
1736         futureObj.onComplete(new OnComplete<>() {
1737             @Override
1738             public void onComplete(final Throwable failure, final Object response) {
1739                 shardReplicaOperationsInProgress.remove(shardName);
1740                 if (failure != null) {
1741                     LOG.debug("{}: ChangeServersVotingStatus request to local shard {} failed", persistenceId(),
1742                         shardActorRef.path(), failure);
1743                     sender.tell(new Status.Failure(new RuntimeException(
1744                         String.format("ChangeServersVotingStatus request to local shard %s failed",
1745                             shardActorRef.path()), failure)), self());
1746                 } else {
1747                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1748
1749                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1750                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1751                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1752                         sender.tell(new Status.Success(null), getSelf());
1753                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1754                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1755                                 "The requested voting state change for shard %s is invalid. At least one member "
1756                                 + "must be voting", shardId.getShardName()))), getSelf());
1757                     } else {
1758                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1759                                 persistenceId(), shardName, replyMsg.getStatus());
1760
1761                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1762                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1763                         sender.tell(new Status.Failure(error), getSelf());
1764                     }
1765                 }
1766             }
1767         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1768     }
1769
1770     private static final class ForwardedAddServerReply {
1771         ShardInformation shardInfo;
1772         AddServerReply addServerReply;
1773         String leaderPath;
1774         boolean removeShardOnFailure;
1775
1776         ForwardedAddServerReply(final ShardInformation shardInfo, final AddServerReply addServerReply,
1777             final String leaderPath, final boolean removeShardOnFailure) {
1778             this.shardInfo = shardInfo;
1779             this.addServerReply = addServerReply;
1780             this.leaderPath = leaderPath;
1781             this.removeShardOnFailure = removeShardOnFailure;
1782         }
1783     }
1784
1785     private static final class ForwardedAddServerFailure {
1786         String shardName;
1787         String failureMessage;
1788         Throwable failure;
1789         boolean removeShardOnFailure;
1790
1791         ForwardedAddServerFailure(final String shardName, final String failureMessage, final Throwable failure,
1792                 final boolean removeShardOnFailure) {
1793             this.shardName = shardName;
1794             this.failureMessage = failureMessage;
1795             this.failure = failure;
1796             this.removeShardOnFailure = removeShardOnFailure;
1797         }
1798     }
1799
1800     static class OnShardInitialized {
1801         private final Runnable replyRunnable;
1802         private Cancellable timeoutSchedule;
1803
1804         OnShardInitialized(final Runnable replyRunnable) {
1805             this.replyRunnable = replyRunnable;
1806         }
1807
1808         Runnable getReplyRunnable() {
1809             return replyRunnable;
1810         }
1811
1812         Cancellable getTimeoutSchedule() {
1813             return timeoutSchedule;
1814         }
1815
1816         void setTimeoutSchedule(final Cancellable timeoutSchedule) {
1817             this.timeoutSchedule = timeoutSchedule;
1818         }
1819     }
1820
1821     static class OnShardReady extends OnShardInitialized {
1822         OnShardReady(final Runnable replyRunnable) {
1823             super(replyRunnable);
1824         }
1825     }
1826
1827     private interface RunnableMessage extends Runnable {
1828     }
1829
1830     /**
1831      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1832      * a remote or local find primary message is processed.
1833      */
1834     private interface FindPrimaryResponseHandler {
1835         /**
1836          * Invoked when a Failure message is received as a response.
1837          *
1838          * @param failure the failure exception
1839          */
1840         void onFailure(Throwable failure);
1841
1842         /**
1843          * Invoked when a RemotePrimaryShardFound response is received.
1844          *
1845          * @param response the response
1846          */
1847         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1848
1849         /**
1850          * Invoked when a LocalPrimaryShardFound response is received.
1851          *
1852          * @param response the response
1853          */
1854         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1855
1856         /**
1857          * Invoked when an unknown response is received. This is another type of failure.
1858          *
1859          * @param response the response
1860          */
1861         void onUnknownResponse(Object response);
1862     }
1863
1864     /**
1865      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1866      * replica and sends a wrapped Failure response to some targetActor.
1867      */
1868     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1869         private final ActorRef targetActor;
1870         private final String shardName;
1871         private final String persistenceId;
1872         private final ActorRef shardManagerActor;
1873
1874         /**
1875          * Constructs an instance.
1876          *
1877          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1878          * @param shardName The name of the shard for which the primary replica had to be found
1879          * @param persistenceId The persistenceId for the ShardManager
1880          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1881          */
1882         protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
1883                 final String persistenceId, final ActorRef shardManagerActor) {
1884             this.targetActor = requireNonNull(targetActor);
1885             this.shardName = requireNonNull(shardName);
1886             this.persistenceId = requireNonNull(persistenceId);
1887             this.shardManagerActor = requireNonNull(shardManagerActor);
1888         }
1889
1890         public ActorRef getTargetActor() {
1891             return targetActor;
1892         }
1893
1894         public String getShardName() {
1895             return shardName;
1896         }
1897
1898         @Override
1899         public void onFailure(final Throwable failure) {
1900             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1901             targetActor.tell(new Status.Failure(new RuntimeException(
1902                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1903         }
1904
1905         @Override
1906         public void onUnknownResponse(final Object response) {
1907             LOG.debug("{}: Failed to find leader for shard {}: received response: {}", persistenceId, shardName,
1908                 response);
1909             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
1910                     : new RuntimeException(String.format("Failed to find leader for shard %s: received response: %s",
1911                         shardName, response))), shardManagerActor);
1912         }
1913     }
1914
1915     /**
1916      * The WrappedShardResponse class wraps a response from a Shard.
1917      */
1918     private static final class WrappedShardResponse {
1919         private final ShardIdentifier shardId;
1920         private final Object response;
1921         private final String leaderPath;
1922
1923         WrappedShardResponse(final ShardIdentifier shardId, final Object response, final String leaderPath) {
1924             this.shardId = shardId;
1925             this.response = response;
1926             this.leaderPath = leaderPath;
1927         }
1928
1929         ShardIdentifier getShardId() {
1930             return shardId;
1931         }
1932
1933         Object getResponse() {
1934             return response;
1935         }
1936
1937         String getLeaderPath() {
1938             return leaderPath;
1939         }
1940     }
1941
1942     private static final class ShardNotInitializedTimeout {
1943         private final ActorRef sender;
1944         private final ShardInformation shardInfo;
1945         private final OnShardInitialized onShardInitialized;
1946
1947         ShardNotInitializedTimeout(final ShardInformation shardInfo, final OnShardInitialized onShardInitialized,
1948             final ActorRef sender) {
1949             this.sender = sender;
1950             this.shardInfo = shardInfo;
1951             this.onShardInitialized = onShardInitialized;
1952         }
1953
1954         ActorRef getSender() {
1955             return sender;
1956         }
1957
1958         ShardInformation getShardInfo() {
1959             return shardInfo;
1960         }
1961
1962         OnShardInitialized getOnShardInitialized() {
1963             return onShardInitialized;
1964         }
1965     }
1966 }
1967
1968
1969