Bug 8385: Fix testMultipleRegistrationsAtOnePrefix failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.actor.SupervisorStrategy.Directive;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberWeaklyUp;
23 import akka.cluster.Member;
24 import akka.dispatch.Futures;
25 import akka.dispatch.OnComplete;
26 import akka.japi.Function;
27 import akka.pattern.Patterns;
28 import akka.persistence.RecoveryCompleted;
29 import akka.persistence.SaveSnapshotFailure;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.persistence.SnapshotOffer;
32 import akka.persistence.SnapshotSelectionCriteria;
33 import akka.util.Timeout;
34 import com.google.common.annotations.VisibleForTesting;
35 import com.google.common.base.Preconditions;
36 import java.util.ArrayList;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.HashMap;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.Set;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import java.util.function.Consumer;
49 import java.util.function.Supplier;
50 import org.opendaylight.controller.cluster.access.concepts.MemberName;
51 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
52 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
53 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
54 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
55 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
57 import org.opendaylight.controller.cluster.datastore.Shard;
58 import org.opendaylight.controller.cluster.datastore.config.Configuration;
59 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
60 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
63 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
64 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
65 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
66 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
67 import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
68 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
69 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
70 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
71 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
72 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
73 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
77 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
78 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
79 import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
80 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
81 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
82 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
83 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
84 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
85 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
86 import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
87 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
88 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
89 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
90 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
91 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
92 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
93 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
94 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
95 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
96 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
97 import org.opendaylight.controller.cluster.raft.messages.AddServer;
98 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
99 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
100 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
101 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
102 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
103 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
104 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
105 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
106 import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
107 import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
108 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
109 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
110 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
111 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
112 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
113 import org.opendaylight.yangtools.concepts.ListenerRegistration;
114 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
115 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
116 import org.slf4j.Logger;
117 import org.slf4j.LoggerFactory;
118 import scala.concurrent.ExecutionContext;
119 import scala.concurrent.Future;
120 import scala.concurrent.duration.Duration;
121 import scala.concurrent.duration.FiniteDuration;
122
123 /**
124  * Manages the shards for a data store. The ShardManager has the following jobs:
125  * <ul>
126  * <li> Create all the local shard replicas that belong on this cluster member
127  * <li> Find the address of the local shard
128  * <li> Find the primary replica for any given shard
129  * <li> Monitor the cluster members and store their addresses
130  * </ul>
131  */
132 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
133     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
134
135     // Stores a mapping between a shard name and it's corresponding information
136     // Shard names look like inventory, topology etc and are as specified in
137     // configuration
138     private final Map<String, ShardInformation> localShards = new HashMap<>();
139
140     // The type of a ShardManager reflects the type of the datastore itself
141     // A data store could be of type config/operational
142     private final String type;
143
144     private final ClusterWrapper cluster;
145
146     private final Configuration configuration;
147
148     private final String shardDispatcherPath;
149
150     private final ShardManagerInfo shardManagerMBean;
151
152     private DatastoreContextFactory datastoreContextFactory;
153
154     private final CountDownLatch waitTillReadyCountdownLatch;
155
156     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
157
158     private final ShardPeerAddressResolver peerAddressResolver;
159
160     private SchemaContext schemaContext;
161
162     private DatastoreSnapshot restoreFromSnapshot;
163
164     private ShardManagerSnapshot currentSnapshot;
165
166     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
167
168     private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
169
170     private final String persistenceId;
171     private final AbstractDataStore dataStore;
172
173     private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
174     private PrefixedShardConfigUpdateHandler configUpdateHandler;
175
176     ShardManager(AbstractShardManagerCreator<?> builder) {
177         this.cluster = builder.getCluster();
178         this.configuration = builder.getConfiguration();
179         this.datastoreContextFactory = builder.getDatastoreContextFactory();
180         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
181         this.shardDispatcherPath =
182                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
183         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
184         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
185         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
186
187         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
188         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
189
190         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
191
192         // Subscribe this actor to cluster member events
193         cluster.subscribeToMemberEvents(getSelf());
194
195         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
196                 "shard-manager-" + this.type,
197                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
198         shardManagerMBean.registerMBean();
199
200         dataStore = builder.getDistributedDataStore();
201     }
202
203     @Override
204     public void preStart() {
205         LOG.info("Starting ShardManager {}", persistenceId);
206     }
207
208     @Override
209     public void postStop() {
210         LOG.info("Stopping ShardManager {}", persistenceId());
211
212         shardManagerMBean.unregisterMBean();
213
214         if (configListenerReg != null) {
215             configListenerReg.close();
216             configListenerReg = null;
217         }
218     }
219
220     @Override
221     public void handleCommand(Object message) throws Exception {
222         if (message  instanceof FindPrimary) {
223             findPrimary((FindPrimary)message);
224         } else if (message instanceof FindLocalShard) {
225             findLocalShard((FindLocalShard) message);
226         } else if (message instanceof UpdateSchemaContext) {
227             updateSchemaContext(message);
228         } else if (message instanceof ActorInitialized) {
229             onActorInitialized(message);
230         } else if (message instanceof ClusterEvent.MemberUp) {
231             memberUp((ClusterEvent.MemberUp) message);
232         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
233             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
234         } else if (message instanceof ClusterEvent.MemberExited) {
235             memberExited((ClusterEvent.MemberExited) message);
236         } else if (message instanceof ClusterEvent.MemberRemoved) {
237             memberRemoved((ClusterEvent.MemberRemoved) message);
238         } else if (message instanceof ClusterEvent.UnreachableMember) {
239             memberUnreachable((ClusterEvent.UnreachableMember) message);
240         } else if (message instanceof ClusterEvent.ReachableMember) {
241             memberReachable((ClusterEvent.ReachableMember) message);
242         } else if (message instanceof DatastoreContextFactory) {
243             onDatastoreContextFactory((DatastoreContextFactory) message);
244         } else if (message instanceof RoleChangeNotification) {
245             onRoleChangeNotification((RoleChangeNotification) message);
246         } else if (message instanceof FollowerInitialSyncUpStatus) {
247             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
248         } else if (message instanceof ShardNotInitializedTimeout) {
249             onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
250         } else if (message instanceof ShardLeaderStateChanged) {
251             onLeaderStateChanged((ShardLeaderStateChanged) message);
252         } else if (message instanceof SwitchShardBehavior) {
253             onSwitchShardBehavior((SwitchShardBehavior) message);
254         } else if (message instanceof CreateShard) {
255             onCreateShard((CreateShard)message);
256         } else if (message instanceof AddShardReplica) {
257             onAddShardReplica((AddShardReplica) message);
258         } else if (message instanceof AddPrefixShardReplica) {
259             onAddPrefixShardReplica((AddPrefixShardReplica) message);
260         } else if (message instanceof PrefixShardCreated) {
261             onPrefixShardCreated((PrefixShardCreated) message);
262         } else if (message instanceof PrefixShardRemoved) {
263             onPrefixShardRemoved((PrefixShardRemoved) message);
264         } else if (message instanceof InitConfigListener) {
265             onInitConfigListener();
266         } else if (message instanceof ForwardedAddServerReply) {
267             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
268             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
269                     msg.removeShardOnFailure);
270         } else if (message instanceof ForwardedAddServerFailure) {
271             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
272             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
273         } else if (message instanceof RemoveShardReplica) {
274             onRemoveShardReplica((RemoveShardReplica) message);
275         } else if (message instanceof RemovePrefixShardReplica) {
276             onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
277         } else if (message instanceof WrappedShardResponse) {
278             onWrappedShardResponse((WrappedShardResponse) message);
279         } else if (message instanceof GetSnapshot) {
280             onGetSnapshot();
281         } else if (message instanceof ServerRemoved) {
282             onShardReplicaRemoved((ServerRemoved) message);
283         } else if (message instanceof ChangeShardMembersVotingStatus) {
284             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
285         } else if (message instanceof FlipShardMembersVotingStatus) {
286             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
287         } else if (message instanceof SaveSnapshotSuccess) {
288             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
289         } else if (message instanceof SaveSnapshotFailure) {
290             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
291                     ((SaveSnapshotFailure) message).cause());
292         } else if (message instanceof Shutdown) {
293             onShutDown();
294         } else if (message instanceof GetLocalShardIds) {
295             onGetLocalShardIds();
296         } else if (message instanceof RunnableMessage) {
297             ((RunnableMessage)message).run();
298         } else {
299             unknownMessage(message);
300         }
301     }
302
303     private void onInitConfigListener() {
304         LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
305
306         final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
307                 org.opendaylight.mdsal.common.api.LogicalDatastoreType
308                         .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
309
310         if (configUpdateHandler != null) {
311             configUpdateHandler.close();
312         }
313
314         configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
315         configUpdateHandler.initListener(dataStore, type);
316     }
317
318     private void onShutDown() {
319         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
320         for (ShardInformation info : localShards.values()) {
321             if (info.getActor() != null) {
322                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
323
324                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
325                         .getElectionTimeOutInterval().$times(2);
326                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
327             }
328         }
329
330         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
331
332         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
333                 .getDispatcher(Dispatchers.DispatcherType.Client);
334         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
335
336         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
337             @Override
338             public void onComplete(Throwable failure, Iterable<Boolean> results) {
339                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
340
341                 self().tell(PoisonPill.getInstance(), self());
342
343                 if (failure != null) {
344                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
345                 } else {
346                     int nfailed = 0;
347                     for (Boolean result : results) {
348                         if (!result) {
349                             nfailed++;
350                         }
351                     }
352
353                     if (nfailed > 0) {
354                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
355                     }
356                 }
357             }
358         }, dispatcher);
359     }
360
361     private void onWrappedShardResponse(WrappedShardResponse message) {
362         if (message.getResponse() instanceof RemoveServerReply) {
363             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
364                     message.getLeaderPath());
365         }
366     }
367
368     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
369             String leaderPath) {
370         shardReplicaOperationsInProgress.remove(shardId.getShardName());
371
372         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
373
374         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
375             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
376                     shardId.getShardName());
377             originalSender.tell(new Status.Success(null), getSelf());
378         } else {
379             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
380                     persistenceId(), shardId, replyMsg.getStatus());
381
382             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
383             originalSender.tell(new Status.Failure(failure), getSelf());
384         }
385     }
386
387     private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
388                                           final String primaryPath, final ActorRef sender) {
389         if (isShardReplicaOperationInProgress(shardName, sender)) {
390             return;
391         }
392
393         shardReplicaOperationsInProgress.add(shardName);
394
395         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
396
397         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
398
399         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
400         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
401                 primaryPath, shardId);
402
403         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
404         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
405                 new RemoveServer(shardId.toString()), removeServerTimeout);
406
407         futureObj.onComplete(new OnComplete<Object>() {
408             @Override
409             public void onComplete(Throwable failure, Object response) {
410                 if (failure != null) {
411                     shardReplicaOperationsInProgress.remove(shardName);
412                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
413                             primaryPath, shardName);
414
415                     LOG.debug("{}: {}", persistenceId(), msg, failure);
416
417                     // FAILURE
418                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
419                 } else {
420                     // SUCCESS
421                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
422                 }
423             }
424         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
425     }
426
427     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
428             final ActorRef sender) {
429         if (isShardReplicaOperationInProgress(shardName, sender)) {
430             return;
431         }
432
433         shardReplicaOperationsInProgress.add(shardName);
434
435         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
436
437         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
438
439         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
440         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
441                 primaryPath, shardId);
442
443         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
444         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
445                 new RemoveServer(shardId.toString()), removeServerTimeout);
446
447         futureObj.onComplete(new OnComplete<Object>() {
448             @Override
449             public void onComplete(Throwable failure, Object response) {
450                 if (failure != null) {
451                     shardReplicaOperationsInProgress.remove(shardName);
452                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
453                             primaryPath, shardName);
454
455                     LOG.debug("{}: {}", persistenceId(), msg, failure);
456
457                     // FAILURE
458                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
459                 } else {
460                     // SUCCESS
461                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
462                 }
463             }
464         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
465     }
466
467     private void onShardReplicaRemoved(ServerRemoved message) {
468         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
469     }
470
471     @SuppressWarnings("checkstyle:IllegalCatch")
472     private void removeShard(final ShardIdentifier shardId) {
473         final String shardName = shardId.getShardName();
474         final ShardInformation shardInformation = localShards.remove(shardName);
475         if (shardInformation == null) {
476             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
477             return;
478         }
479
480         final ActorRef shardActor = shardInformation.getActor();
481         if (shardActor != null) {
482             long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
483                     .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
484
485             LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
486                     timeoutInMS);
487
488             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
489                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
490
491             final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
492                 @Override
493                 public void onComplete(final Throwable failure, final Boolean result) {
494                     if (failure == null) {
495                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
496                     } else {
497                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
498                     }
499
500                     self().tell((RunnableMessage) () -> {
501                         shardActorsStopping.remove(shardName);
502                         notifyOnCompleteTasks(failure, result);
503                     }, ActorRef.noSender());
504                 }
505             };
506
507             shardActorsStopping.put(shardName, onComplete);
508             stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
509                     .getDispatcher(Dispatchers.DispatcherType.Client));
510         }
511
512         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
513         persistShardList();
514     }
515
516     private void onGetSnapshot() {
517         LOG.debug("{}: onGetSnapshot", persistenceId());
518
519         List<String> notInitialized = null;
520         for (ShardInformation shardInfo : localShards.values()) {
521             if (!shardInfo.isShardInitialized()) {
522                 if (notInitialized == null) {
523                     notInitialized = new ArrayList<>();
524                 }
525
526                 notInitialized.add(shardInfo.getShardName());
527             }
528         }
529
530         if (notInitialized != null) {
531             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
532                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
533             return;
534         }
535
536         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
537                 new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
538                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
539
540         for (ShardInformation shardInfo: localShards.values()) {
541             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
542         }
543     }
544
545     @SuppressWarnings("checkstyle:IllegalCatch")
546     private void onCreateShard(CreateShard createShard) {
547         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
548
549         Object reply;
550         try {
551             String shardName = createShard.getModuleShardConfig().getShardName();
552             if (localShards.containsKey(shardName)) {
553                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
554                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
555             } else {
556                 doCreateShard(createShard);
557                 reply = new Status.Success(null);
558             }
559         } catch (Exception e) {
560             LOG.error("{}: onCreateShard failed", persistenceId(), e);
561             reply = new Status.Failure(e);
562         }
563
564         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
565             getSender().tell(reply, getSelf());
566         }
567     }
568
569     private void onPrefixShardCreated(final PrefixShardCreated message) {
570         LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
571
572         final PrefixShardConfiguration config = message.getConfiguration();
573         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
574                 ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
575         final String shardName = shardId.getShardName();
576
577         if (isPreviousShardActorStopInProgress(shardName, message)) {
578             return;
579         }
580
581         if (localShards.containsKey(shardName)) {
582             LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
583             final PrefixShardConfiguration existing =
584                     configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
585
586             if (existing != null && existing.equals(config)) {
587                 // we don't have to do nothing here
588                 return;
589             }
590         }
591
592         doCreatePrefixShard(config, shardId, shardName);
593     }
594
595     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
596         final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
597         if (stopOnComplete == null) {
598             return false;
599         }
600
601         LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
602                 shardName, messageToDefer);
603         final ActorRef sender = getSender();
604         stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
605             @Override
606             public void onComplete(Throwable failure, Boolean result) {
607                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
608                 self().tell(messageToDefer, sender);
609             }
610         });
611
612         return true;
613     }
614
615     private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
616         configuration.addPrefixShardConfiguration(config);
617
618         final Builder builder = newShardDatastoreContextBuilder(shardName);
619         builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
620                 .storeRoot(config.getPrefix().getRootIdentifier());
621         DatastoreContext shardDatastoreContext = builder.build();
622
623         final Map<String, String> peerAddresses = getPeerAddresses(shardName);
624         final boolean isActiveMember = true;
625
626         LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
627                 persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
628
629         final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
630                 shardDatastoreContext, Shard.builder(), peerAddressResolver);
631         info.setActiveMember(isActiveMember);
632         localShards.put(info.getShardName(), info);
633
634         if (schemaContext != null) {
635             info.setActor(newShardActor(schemaContext, info));
636         }
637     }
638
639     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
640         LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
641
642         final DOMDataTreeIdentifier prefix = message.getPrefix();
643         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
644                 ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
645
646         configuration.removePrefixShardConfiguration(prefix);
647         removeShard(shardId);
648     }
649
650     private void doCreateShard(final CreateShard createShard) {
651         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
652         final String shardName = moduleShardConfig.getShardName();
653
654         configuration.addModuleShardConfiguration(moduleShardConfig);
655
656         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
657         if (shardDatastoreContext == null) {
658             shardDatastoreContext = newShardDatastoreContext(shardName);
659         } else {
660             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
661                     peerAddressResolver).build();
662         }
663
664         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
665
666         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
667                 && currentSnapshot.getShardList().contains(shardName);
668
669         Map<String, String> peerAddresses;
670         boolean isActiveMember;
671         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
672                 .contains(cluster.getCurrentMemberName())) {
673             peerAddresses = getPeerAddresses(shardName);
674             isActiveMember = true;
675         } else {
676             // The local member is not in the static shard member configuration and the shard did not
677             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
678             // the shard with no peers and with elections disabled so it stays as follower. A
679             // subsequent AddServer request will be needed to make it an active member.
680             isActiveMember = false;
681             peerAddresses = Collections.emptyMap();
682             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
683                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
684         }
685
686         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
687                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
688                 isActiveMember);
689
690         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
691                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
692         info.setActiveMember(isActiveMember);
693         localShards.put(info.getShardName(), info);
694
695         if (schemaContext != null) {
696             info.setActor(newShardActor(schemaContext, info));
697         }
698     }
699
700     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
701         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
702                 .shardPeerAddressResolver(peerAddressResolver);
703     }
704
705     private DatastoreContext newShardDatastoreContext(String shardName) {
706         return newShardDatastoreContextBuilder(shardName).build();
707     }
708
709     private void checkReady() {
710         if (isReadyWithLeaderId()) {
711             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
712                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
713
714             waitTillReadyCountdownLatch.countDown();
715         }
716     }
717
718     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
719         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
720
721         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
722         if (shardInformation != null) {
723             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
724             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
725             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
726                 primaryShardInfoCache.remove(shardInformation.getShardName());
727             }
728
729             checkReady();
730         } else {
731             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
732         }
733     }
734
735     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
736         ShardInformation shardInfo = message.getShardInfo();
737
738         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
739                 shardInfo.getShardName());
740
741         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
742
743         if (!shardInfo.isShardInitialized()) {
744             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
745             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
746         } else {
747             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
748             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
749         }
750     }
751
752     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
753         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
754                 status.getName(), status.isInitialSyncDone());
755
756         ShardInformation shardInformation = findShardInformation(status.getName());
757
758         if (shardInformation != null) {
759             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
760
761             shardManagerMBean.setSyncStatus(isInSync());
762         }
763
764     }
765
766     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
767         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
768                 roleChanged.getOldRole(), roleChanged.getNewRole());
769
770         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
771         if (shardInformation != null) {
772             shardInformation.setRole(roleChanged.getNewRole());
773             checkReady();
774             shardManagerMBean.setSyncStatus(isInSync());
775         }
776     }
777
778
779     private ShardInformation findShardInformation(String memberId) {
780         for (ShardInformation info : localShards.values()) {
781             if (info.getShardId().toString().equals(memberId)) {
782                 return info;
783             }
784         }
785
786         return null;
787     }
788
789     private boolean isReadyWithLeaderId() {
790         boolean isReady = true;
791         for (ShardInformation info : localShards.values()) {
792             if (!info.isShardReadyWithLeaderId()) {
793                 isReady = false;
794                 break;
795             }
796         }
797         return isReady;
798     }
799
800     private boolean isInSync() {
801         for (ShardInformation info : localShards.values()) {
802             if (!info.isInSync()) {
803                 return false;
804             }
805         }
806         return true;
807     }
808
809     private void onActorInitialized(Object message) {
810         final ActorRef sender = getSender();
811
812         if (sender == null) {
813             return; //why is a non-actor sending this message? Just ignore.
814         }
815
816         String actorName = sender.path().name();
817         //find shard name from actor name; actor name is stringified shardId
818
819         final ShardIdentifier shardId;
820         try {
821             shardId = ShardIdentifier.fromShardIdString(actorName);
822         } catch (IllegalArgumentException e) {
823             LOG.debug("{}: ignoring actor {}", actorName, e);
824             return;
825         }
826
827         markShardAsInitialized(shardId.getShardName());
828     }
829
830     private void markShardAsInitialized(String shardName) {
831         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
832
833         ShardInformation shardInformation = localShards.get(shardName);
834         if (shardInformation != null) {
835             shardInformation.setActorInitialized();
836
837             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
838         }
839     }
840
841     @Override
842     protected void handleRecover(Object message) throws Exception {
843         if (message instanceof RecoveryCompleted) {
844             onRecoveryCompleted();
845         } else if (message instanceof SnapshotOffer) {
846             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
847         }
848     }
849
850     @SuppressWarnings("checkstyle:IllegalCatch")
851     private void onRecoveryCompleted() {
852         LOG.info("Recovery complete : {}", persistenceId());
853
854         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
855         // journal on upgrade from Helium.
856         deleteMessages(lastSequenceNr());
857
858         if (currentSnapshot == null && restoreFromSnapshot != null
859                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
860             ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
861
862             LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
863
864             applyShardManagerSnapshot(snapshot);
865         }
866
867         createLocalShards();
868     }
869
870     private void sendResponse(ShardInformation shardInformation, boolean doWait,
871             boolean wantShardReady, final Supplier<Object> messageSupplier) {
872         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
873             if (doWait) {
874                 final ActorRef sender = getSender();
875                 final ActorRef self = self();
876
877                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
878
879                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
880                     new OnShardInitialized(replyRunnable);
881
882                 shardInformation.addOnShardInitialized(onShardInitialized);
883
884                 FiniteDuration timeout = shardInformation.getDatastoreContext()
885                         .getShardInitializationTimeout().duration();
886                 if (shardInformation.isShardInitialized()) {
887                     // If the shard is already initialized then we'll wait enough time for the shard to
888                     // elect a leader, ie 2 times the election timeout.
889                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
890                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
891                 }
892
893                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
894                         shardInformation.getShardName());
895
896                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
897                         timeout, getSelf(),
898                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
899                         getContext().dispatcher(), getSelf());
900
901                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
902
903             } else if (!shardInformation.isShardInitialized()) {
904                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
905                         shardInformation.getShardName());
906                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
907             } else {
908                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
909                         shardInformation.getShardName());
910                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
911             }
912
913             return;
914         }
915
916         getSender().tell(messageSupplier.get(), getSelf());
917     }
918
919     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
920         return new NoShardLeaderException(null, shardId.toString());
921     }
922
923     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
924         return new NotInitializedException(String.format(
925                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
926     }
927
928     @VisibleForTesting
929     static MemberName memberToName(final Member member) {
930         return MemberName.forName(member.roles().iterator().next());
931     }
932
933     private void memberRemoved(ClusterEvent.MemberRemoved message) {
934         MemberName memberName = memberToName(message.member());
935
936         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
937                 message.member().address());
938
939         peerAddressResolver.removePeerAddress(memberName);
940
941         for (ShardInformation info : localShards.values()) {
942             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
943         }
944     }
945
946     private void memberExited(ClusterEvent.MemberExited message) {
947         MemberName memberName = memberToName(message.member());
948
949         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
950                 message.member().address());
951
952         peerAddressResolver.removePeerAddress(memberName);
953
954         for (ShardInformation info : localShards.values()) {
955             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
956         }
957     }
958
959     private void memberUp(ClusterEvent.MemberUp message) {
960         MemberName memberName = memberToName(message.member());
961
962         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
963                 message.member().address());
964
965         memberUp(memberName, message.member().address());
966     }
967
968     private void memberUp(MemberName memberName, Address address) {
969         addPeerAddress(memberName, address);
970         checkReady();
971     }
972
973     private void memberWeaklyUp(MemberWeaklyUp message) {
974         MemberName memberName = memberToName(message.member());
975
976         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
977                 message.member().address());
978
979         memberUp(memberName, message.member().address());
980     }
981
982     private void addPeerAddress(MemberName memberName, Address address) {
983         peerAddressResolver.addPeerAddress(memberName, address);
984
985         for (ShardInformation info : localShards.values()) {
986             String shardName = info.getShardName();
987             String peerId = getShardIdentifier(memberName, shardName).toString();
988             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
989
990             info.peerUp(memberName, peerId, getSelf());
991         }
992     }
993
994     private void memberReachable(ClusterEvent.ReachableMember message) {
995         MemberName memberName = memberToName(message.member());
996         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
997
998         addPeerAddress(memberName, message.member().address());
999
1000         markMemberAvailable(memberName);
1001     }
1002
1003     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
1004         MemberName memberName = memberToName(message.member());
1005         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
1006
1007         markMemberUnavailable(memberName);
1008     }
1009
1010     private void markMemberUnavailable(final MemberName memberName) {
1011         final String memberStr = memberName.getName();
1012         for (ShardInformation info : localShards.values()) {
1013             String leaderId = info.getLeaderId();
1014             // XXX: why are we using String#contains() here?
1015             if (leaderId != null && leaderId.contains(memberStr)) {
1016                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
1017                 info.setLeaderAvailable(false);
1018
1019                 primaryShardInfoCache.remove(info.getShardName());
1020             }
1021
1022             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
1023         }
1024     }
1025
1026     private void markMemberAvailable(final MemberName memberName) {
1027         final String memberStr = memberName.getName();
1028         for (ShardInformation info : localShards.values()) {
1029             String leaderId = info.getLeaderId();
1030             // XXX: why are we using String#contains() here?
1031             if (leaderId != null && leaderId.contains(memberStr)) {
1032                 LOG.debug("Marking Leader {} as available.", leaderId);
1033                 info.setLeaderAvailable(true);
1034             }
1035
1036             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
1037         }
1038     }
1039
1040     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
1041         datastoreContextFactory = factory;
1042         for (ShardInformation info : localShards.values()) {
1043             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
1044         }
1045     }
1046
1047     private void onGetLocalShardIds() {
1048         final List<String> response = new ArrayList<>(localShards.size());
1049
1050         for (ShardInformation info : localShards.values()) {
1051             response.add(info.getShardId().toString());
1052         }
1053
1054         getSender().tell(new Status.Success(response), getSelf());
1055     }
1056
1057     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
1058         final ShardIdentifier identifier = message.getShardId();
1059
1060         if (identifier != null) {
1061             final ShardInformation info = localShards.get(identifier.getShardName());
1062             if (info == null) {
1063                 getSender().tell(new Status.Failure(
1064                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
1065                 return;
1066             }
1067
1068             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1069         } else {
1070             for (ShardInformation info : localShards.values()) {
1071                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1072             }
1073         }
1074
1075         getSender().tell(new Status.Success(null), getSelf());
1076     }
1077
1078     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
1079         final ActorRef actor = info.getActor();
1080         if (actor != null) {
1081             actor.tell(switchBehavior, getSelf());
1082         } else {
1083             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
1084                 info.getShardName(), switchBehavior.getNewState());
1085         }
1086     }
1087
1088     /**
1089      * Notifies all the local shards of a change in the schema context.
1090      *
1091      * @param message the message to send
1092      */
1093     private void updateSchemaContext(final Object message) {
1094         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
1095
1096         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
1097
1098         for (ShardInformation info : localShards.values()) {
1099             if (info.getActor() == null) {
1100                 LOG.debug("Creating Shard {}", info.getShardId());
1101                 info.setActor(newShardActor(schemaContext, info));
1102             } else {
1103                 info.getActor().tell(message, getSelf());
1104             }
1105         }
1106     }
1107
1108     @VisibleForTesting
1109     protected ClusterWrapper getCluster() {
1110         return cluster;
1111     }
1112
1113     @VisibleForTesting
1114     protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) {
1115         return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath),
1116                 info.getShardId().toString());
1117     }
1118
1119     private void findPrimary(FindPrimary message) {
1120         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
1121
1122         final String shardName = message.getShardName();
1123         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
1124
1125         // First see if the there is a local replica for the shard
1126         final ShardInformation info = localShards.get(shardName);
1127         if (info != null && info.isActiveMember()) {
1128             sendResponse(info, message.isWaitUntilReady(), true, () -> {
1129                 String primaryPath = info.getSerializedLeaderActor();
1130                 Object found = canReturnLocalShardState && info.isLeader()
1131                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
1132                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
1133
1134                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
1135                 return found;
1136             });
1137
1138             return;
1139         }
1140
1141         final Collection<String> visitedAddresses;
1142         if (message instanceof RemoteFindPrimary) {
1143             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1144         } else {
1145             visitedAddresses = new ArrayList<>(1);
1146         }
1147
1148         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1149
1150         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1151             if (visitedAddresses.contains(address)) {
1152                 continue;
1153             }
1154
1155             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1156                     persistenceId(), shardName, address, visitedAddresses);
1157
1158             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1159                     message.isWaitUntilReady(), visitedAddresses), getContext());
1160             return;
1161         }
1162
1163         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1164
1165         getSender().tell(new PrimaryNotFoundException(
1166                 String.format("No primary shard found for %s.", shardName)), getSelf());
1167     }
1168
1169     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1170         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1171                 .getShardInitializationTimeout().duration().$times(2));
1172
1173         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1174         futureObj.onComplete(new OnComplete<Object>() {
1175             @Override
1176             public void onComplete(Throwable failure, Object response) {
1177                 if (failure != null) {
1178                     handler.onFailure(failure);
1179                 } else {
1180                     if (response instanceof RemotePrimaryShardFound) {
1181                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1182                     } else if (response instanceof LocalPrimaryShardFound) {
1183                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1184                     } else {
1185                         handler.onUnknownResponse(response);
1186                     }
1187                 }
1188             }
1189         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1190     }
1191
1192     /**
1193      * Construct the name of the shard actor given the name of the member on
1194      * which the shard resides and the name of the shard.
1195      *
1196      * @param memberName the member name
1197      * @param shardName the shard name
1198      * @return a b
1199      */
1200     private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
1201         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1202     }
1203
1204     /**
1205      * Create shards that are local to the member on which the ShardManager runs.
1206      */
1207     private void createLocalShards() {
1208         MemberName memberName = this.cluster.getCurrentMemberName();
1209         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
1210
1211         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1212         if (restoreFromSnapshot != null) {
1213             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1214                 shardSnapshots.put(snapshot.getName(), snapshot);
1215             }
1216         }
1217
1218         restoreFromSnapshot = null; // null out to GC
1219
1220         for (String shardName : memberShardNames) {
1221             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1222
1223             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1224
1225             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1226             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
1227                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
1228                         shardSnapshots.get(shardName)), peerAddressResolver));
1229         }
1230     }
1231
1232     /**
1233      * Given the name of the shard find the addresses of all it's peers.
1234      *
1235      * @param shardName the shard name
1236      */
1237     private Map<String, String> getPeerAddresses(String shardName) {
1238         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1239         return getPeerAddresses(shardName, members);
1240     }
1241
1242     private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
1243         Map<String, String> peerAddresses = new HashMap<>();
1244         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1245
1246         for (MemberName memberName : members) {
1247             if (!currentMemberName.equals(memberName)) {
1248                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1249                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1250                 peerAddresses.put(shardId.toString(), address);
1251             }
1252         }
1253         return peerAddresses;
1254     }
1255
1256     @Override
1257     public SupervisorStrategy supervisorStrategy() {
1258
1259         return new OneForOneStrategy(10, Duration.create("1 minute"),
1260                 (Function<Throwable, Directive>) t -> {
1261                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1262                     return SupervisorStrategy.resume();
1263                 });
1264     }
1265
1266     @Override
1267     public String persistenceId() {
1268         return persistenceId;
1269     }
1270
1271     @VisibleForTesting
1272     ShardManagerInfoMBean getMBean() {
1273         return shardManagerMBean;
1274     }
1275
1276     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1277         if (shardReplicaOperationsInProgress.contains(shardName)) {
1278             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1279             LOG.debug("{}: {}", persistenceId(), msg);
1280             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1281             return true;
1282         }
1283
1284         return false;
1285     }
1286
1287     private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
1288         LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
1289
1290         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1291                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1292         final String shardName = shardId.getShardName();
1293
1294         // Create the localShard
1295         if (schemaContext == null) {
1296             String msg = String.format(
1297                     "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1298             LOG.debug("{}: {}", persistenceId(), msg);
1299             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1300             return;
1301         }
1302
1303         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1304                 getSelf()) {
1305             @Override
1306             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1307                 final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
1308                         message.getShardPrefix(), response, getSender());
1309                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1310                     getSelf().tell(runnable, getTargetActor());
1311                 }
1312             }
1313
1314             @Override
1315             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1316                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1317             }
1318         });
1319     }
1320
1321     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1322         final String shardName = shardReplicaMsg.getShardName();
1323
1324         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1325
1326         // verify the shard with the specified name is present in the cluster configuration
1327         if (!this.configuration.isShardConfigured(shardName)) {
1328             String msg = String.format("No module configuration exists for shard %s", shardName);
1329             LOG.debug("{}: {}", persistenceId(), msg);
1330             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1331             return;
1332         }
1333
1334         // Create the localShard
1335         if (schemaContext == null) {
1336             String msg = String.format(
1337                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1338             LOG.debug("{}: {}", persistenceId(), msg);
1339             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1340             return;
1341         }
1342
1343         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1344                 getSelf()) {
1345             @Override
1346             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1347                 final RunnableMessage runnable = (RunnableMessage) () ->
1348                     addShard(getShardName(), response, getSender());
1349                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1350                     getSelf().tell(runnable, getTargetActor());
1351                 }
1352             }
1353
1354             @Override
1355             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1356                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1357             }
1358         });
1359     }
1360
1361     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1362         String msg = String.format("Local shard %s already exists", shardName);
1363         LOG.debug("{}: {}", persistenceId(), msg);
1364         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1365     }
1366
1367     private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
1368                                 final RemotePrimaryShardFound response, final ActorRef sender) {
1369         if (isShardReplicaOperationInProgress(shardName, sender)) {
1370             return;
1371         }
1372
1373         shardReplicaOperationsInProgress.add(shardName);
1374
1375         final ShardInformation shardInfo;
1376         final boolean removeShardOnFailure;
1377         ShardInformation existingShardInfo = localShards.get(shardName);
1378         if (existingShardInfo == null) {
1379             removeShardOnFailure = true;
1380             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1381
1382             final Builder builder = newShardDatastoreContextBuilder(shardName);
1383             builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1384
1385             DatastoreContext datastoreContext = builder.build();
1386
1387             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1388                     Shard.builder(), peerAddressResolver);
1389             shardInfo.setActiveMember(false);
1390             localShards.put(shardName, shardInfo);
1391             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1392         } else {
1393             removeShardOnFailure = false;
1394             shardInfo = existingShardInfo;
1395         }
1396
1397         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1398     }
1399
1400     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1401         if (isShardReplicaOperationInProgress(shardName, sender)) {
1402             return;
1403         }
1404
1405         shardReplicaOperationsInProgress.add(shardName);
1406
1407         final ShardInformation shardInfo;
1408         final boolean removeShardOnFailure;
1409         ShardInformation existingShardInfo = localShards.get(shardName);
1410         if (existingShardInfo == null) {
1411             removeShardOnFailure = true;
1412             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1413
1414             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1415                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1416
1417             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1418                     Shard.builder(), peerAddressResolver);
1419             shardInfo.setActiveMember(false);
1420             localShards.put(shardName, shardInfo);
1421             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1422         } else {
1423             removeShardOnFailure = false;
1424             shardInfo = existingShardInfo;
1425         }
1426
1427         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1428     }
1429
1430     private void execAddShard(final String shardName,
1431                               final ShardInformation shardInfo,
1432                               final RemotePrimaryShardFound response,
1433                               final boolean removeShardOnFailure,
1434                               final ActorRef sender) {
1435
1436         final String localShardAddress =
1437                 peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1438
1439         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1440         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1441                 response.getPrimaryPath(), shardInfo.getShardId());
1442
1443         final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1444                 .getShardLeaderElectionTimeout().duration());
1445         final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1446                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1447
1448         futureObj.onComplete(new OnComplete<Object>() {
1449             @Override
1450             public void onComplete(Throwable failure, Object addServerResponse) {
1451                 if (failure != null) {
1452                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1453                             response.getPrimaryPath(), shardName, failure);
1454
1455                     final String msg = String.format("AddServer request to leader %s for shard %s failed",
1456                             response.getPrimaryPath(), shardName);
1457                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1458                 } else {
1459                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1460                             response.getPrimaryPath(), removeShardOnFailure), sender);
1461                 }
1462             }
1463         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1464     }
1465
1466     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1467             boolean removeShardOnFailure) {
1468         shardReplicaOperationsInProgress.remove(shardName);
1469
1470         if (removeShardOnFailure) {
1471             ShardInformation shardInfo = localShards.remove(shardName);
1472             if (shardInfo.getActor() != null) {
1473                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1474             }
1475         }
1476
1477         sender.tell(new Status.Failure(message == null ? failure :
1478             new RuntimeException(message, failure)), getSelf());
1479     }
1480
1481     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1482             String leaderPath, boolean removeShardOnFailure) {
1483         String shardName = shardInfo.getShardName();
1484         shardReplicaOperationsInProgress.remove(shardName);
1485
1486         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1487
1488         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1489             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1490
1491             // Make the local shard voting capable
1492             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1493             shardInfo.setActiveMember(true);
1494             persistShardList();
1495
1496             sender.tell(new Status.Success(null), getSelf());
1497         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1498             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1499         } else {
1500             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1501                     persistenceId(), shardName, replyMsg.getStatus());
1502
1503             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1504                     shardInfo.getShardId());
1505
1506             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1507         }
1508     }
1509
1510     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1511                                                String leaderPath, ShardIdentifier shardId) {
1512         Exception failure;
1513         switch (serverChangeStatus) {
1514             case TIMEOUT:
1515                 failure = new TimeoutException(String.format(
1516                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
1517                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
1518                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
1519                 break;
1520             case NO_LEADER:
1521                 failure = createNoShardLeaderException(shardId);
1522                 break;
1523             case NOT_SUPPORTED:
1524                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1525                         serverChange.getSimpleName(), shardId.getShardName()));
1526                 break;
1527             default :
1528                 failure = new RuntimeException(String.format(
1529                         "%s request to leader %s for shard %s failed with status %s",
1530                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1531         }
1532         return failure;
1533     }
1534
1535     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1536         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1537
1538         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1539                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1540             @Override
1541             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1542                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1543             }
1544
1545             @Override
1546             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1547                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1548             }
1549
1550             private void doRemoveShardReplicaAsync(final String primaryPath) {
1551                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1552                         primaryPath, getSender()), getTargetActor());
1553             }
1554         });
1555     }
1556
1557     private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
1558         LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
1559
1560         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1561                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1562         final String shardName = shardId.getShardName();
1563
1564         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
1565                 shardName, persistenceId(), getSelf()) {
1566             @Override
1567             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1568                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1569             }
1570
1571             @Override
1572             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1573                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1574             }
1575
1576             private void doRemoveShardReplicaAsync(final String primaryPath) {
1577                 getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
1578                         primaryPath, getSender()), getTargetActor());
1579             }
1580         });
1581     }
1582
1583     private void persistShardList() {
1584         List<String> shardList = new ArrayList<>(localShards.keySet());
1585         for (ShardInformation shardInfo : localShards.values()) {
1586             if (!shardInfo.isActiveMember()) {
1587                 shardList.remove(shardInfo.getShardName());
1588             }
1589         }
1590         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1591         saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
1592     }
1593
1594     private ShardManagerSnapshot updateShardManagerSnapshot(
1595             final List<String> shardList,
1596             final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
1597         currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
1598         return currentSnapshot;
1599     }
1600
1601     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1602         currentSnapshot = snapshot;
1603
1604         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1605
1606         final MemberName currentMember = cluster.getCurrentMemberName();
1607         Set<String> configuredShardList =
1608             new HashSet<>(configuration.getMemberShardNames(currentMember));
1609         for (String shard : currentSnapshot.getShardList()) {
1610             if (!configuredShardList.contains(shard)) {
1611                 // add the current member as a replica for the shard
1612                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1613                 configuration.addMemberReplicaForShard(shard, currentMember);
1614             } else {
1615                 configuredShardList.remove(shard);
1616             }
1617         }
1618         for (String shard : configuredShardList) {
1619             // remove the member as a replica for the shard
1620             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1621             configuration.removeMemberReplicaForShard(shard, currentMember);
1622         }
1623     }
1624
1625     private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
1626         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1627             persistenceId());
1628         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1629             0, 0));
1630     }
1631
1632     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1633         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1634
1635         String shardName = changeMembersVotingStatus.getShardName();
1636         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1637         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1638             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1639                     e.getValue());
1640         }
1641
1642         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1643
1644         findLocalShard(shardName, getSender(),
1645             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1646             localShardFound.getPath(), getSender()));
1647     }
1648
1649     private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
1650         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1651
1652         ActorRef sender = getSender();
1653         final String shardName = flipMembersVotingStatus.getShardName();
1654         findLocalShard(shardName, sender, localShardFound -> {
1655             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1656                     Timeout.apply(30, TimeUnit.SECONDS));
1657
1658             future.onComplete(new OnComplete<Object>() {
1659                 @Override
1660                 public void onComplete(Throwable failure, Object response) {
1661                     if (failure != null) {
1662                         sender.tell(new Status.Failure(new RuntimeException(
1663                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1664                         return;
1665                     }
1666
1667                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1668                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1669                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1670                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1671                     }
1672
1673                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1674                             .toString(), !raftState.isVoting());
1675
1676                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1677                             shardName, localShardFound.getPath(), sender);
1678                 }
1679             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1680         });
1681
1682     }
1683
1684     private void findLocalShard(FindLocalShard message) {
1685         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
1686
1687         final ShardInformation shardInformation = localShards.get(message.getShardName());
1688
1689         if (shardInformation == null) {
1690             LOG.debug("{}: Local shard {} not found - shards present: {}",
1691                     persistenceId(), message.getShardName(), localShards.keySet());
1692
1693             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1694             return;
1695         }
1696
1697         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1698             () -> new LocalShardFound(shardInformation.getActor()));
1699     }
1700
1701     private void findLocalShard(final String shardName, final ActorRef sender,
1702             final Consumer<LocalShardFound> onLocalShardFound) {
1703         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1704                 .getShardInitializationTimeout().duration().$times(2));
1705
1706         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1707         futureObj.onComplete(new OnComplete<Object>() {
1708             @Override
1709             public void onComplete(Throwable failure, Object response) {
1710                 if (failure != null) {
1711                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1712                             failure);
1713                     sender.tell(new Status.Failure(new RuntimeException(
1714                             String.format("Failed to find local shard %s", shardName), failure)), self());
1715                 } else {
1716                     if (response instanceof LocalShardFound) {
1717                         getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
1718                                 sender);
1719                     } else if (response instanceof LocalShardNotFound) {
1720                         String msg = String.format("Local shard %s does not exist", shardName);
1721                         LOG.debug("{}: {}", persistenceId, msg);
1722                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
1723                     } else {
1724                         String msg = String.format("Failed to find local shard %s: received response: %s",
1725                                 shardName, response);
1726                         LOG.debug("{}: {}", persistenceId, msg);
1727                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1728                                 new RuntimeException(msg)), self());
1729                     }
1730                 }
1731             }
1732         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1733     }
1734
1735     private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
1736             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1737         if (isShardReplicaOperationInProgress(shardName, sender)) {
1738             return;
1739         }
1740
1741         shardReplicaOperationsInProgress.add(shardName);
1742
1743         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1744         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1745
1746         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1747                 changeServersVotingStatus, shardActorRef.path());
1748
1749         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1750         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1751
1752         futureObj.onComplete(new OnComplete<Object>() {
1753             @Override
1754             public void onComplete(Throwable failure, Object response) {
1755                 shardReplicaOperationsInProgress.remove(shardName);
1756                 if (failure != null) {
1757                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
1758                             shardActorRef.path());
1759                     LOG.debug("{}: {}", persistenceId(), msg, failure);
1760                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
1761                 } else {
1762                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1763
1764                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1765                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1766                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1767                         sender.tell(new Status.Success(null), getSelf());
1768                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1769                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1770                                 "The requested voting state change for shard %s is invalid. At least one member "
1771                                 + "must be voting", shardId.getShardName()))), getSelf());
1772                     } else {
1773                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1774                                 persistenceId(), shardName, replyMsg.getStatus());
1775
1776                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1777                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1778                         sender.tell(new Status.Failure(error), getSelf());
1779                     }
1780                 }
1781             }
1782         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1783     }
1784
1785     private static final class ForwardedAddServerReply {
1786         ShardInformation shardInfo;
1787         AddServerReply addServerReply;
1788         String leaderPath;
1789         boolean removeShardOnFailure;
1790
1791         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1792                 boolean removeShardOnFailure) {
1793             this.shardInfo = shardInfo;
1794             this.addServerReply = addServerReply;
1795             this.leaderPath = leaderPath;
1796             this.removeShardOnFailure = removeShardOnFailure;
1797         }
1798     }
1799
1800     private static final class ForwardedAddServerFailure {
1801         String shardName;
1802         String failureMessage;
1803         Throwable failure;
1804         boolean removeShardOnFailure;
1805
1806         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1807                 boolean removeShardOnFailure) {
1808             this.shardName = shardName;
1809             this.failureMessage = failureMessage;
1810             this.failure = failure;
1811             this.removeShardOnFailure = removeShardOnFailure;
1812         }
1813     }
1814
1815     static class OnShardInitialized {
1816         private final Runnable replyRunnable;
1817         private Cancellable timeoutSchedule;
1818
1819         OnShardInitialized(Runnable replyRunnable) {
1820             this.replyRunnable = replyRunnable;
1821         }
1822
1823         Runnable getReplyRunnable() {
1824             return replyRunnable;
1825         }
1826
1827         Cancellable getTimeoutSchedule() {
1828             return timeoutSchedule;
1829         }
1830
1831         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1832             this.timeoutSchedule = timeoutSchedule;
1833         }
1834     }
1835
1836     static class OnShardReady extends OnShardInitialized {
1837         OnShardReady(Runnable replyRunnable) {
1838             super(replyRunnable);
1839         }
1840     }
1841
1842     private interface RunnableMessage extends Runnable {
1843     }
1844
1845     /**
1846      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1847      * a remote or local find primary message is processed.
1848      */
1849     private interface FindPrimaryResponseHandler {
1850         /**
1851          * Invoked when a Failure message is received as a response.
1852          *
1853          * @param failure the failure exception
1854          */
1855         void onFailure(Throwable failure);
1856
1857         /**
1858          * Invoked when a RemotePrimaryShardFound response is received.
1859          *
1860          * @param response the response
1861          */
1862         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1863
1864         /**
1865          * Invoked when a LocalPrimaryShardFound response is received.
1866          *
1867          * @param response the response
1868          */
1869         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1870
1871         /**
1872          * Invoked when an unknown response is received. This is another type of failure.
1873          *
1874          * @param response the response
1875          */
1876         void onUnknownResponse(Object response);
1877     }
1878
1879     /**
1880      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1881      * replica and sends a wrapped Failure response to some targetActor.
1882      */
1883     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1884         private final ActorRef targetActor;
1885         private final String shardName;
1886         private final String persistenceId;
1887         private final ActorRef shardManagerActor;
1888
1889         /**
1890          * Constructs an instance.
1891          *
1892          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1893          * @param shardName The name of the shard for which the primary replica had to be found
1894          * @param persistenceId The persistenceId for the ShardManager
1895          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1896          */
1897         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
1898                 ActorRef shardManagerActor) {
1899             this.targetActor = Preconditions.checkNotNull(targetActor);
1900             this.shardName = Preconditions.checkNotNull(shardName);
1901             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1902             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1903         }
1904
1905         public ActorRef getTargetActor() {
1906             return targetActor;
1907         }
1908
1909         public String getShardName() {
1910             return shardName;
1911         }
1912
1913         @Override
1914         public void onFailure(Throwable failure) {
1915             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1916             targetActor.tell(new Status.Failure(new RuntimeException(
1917                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1918         }
1919
1920         @Override
1921         public void onUnknownResponse(Object response) {
1922             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1923                     shardName, response);
1924             LOG.debug("{}: {}", persistenceId, msg);
1925             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1926                     new RuntimeException(msg)), shardManagerActor);
1927         }
1928     }
1929
1930     /**
1931      * The WrappedShardResponse class wraps a response from a Shard.
1932      */
1933     private static final class WrappedShardResponse {
1934         private final ShardIdentifier shardId;
1935         private final Object response;
1936         private final String leaderPath;
1937
1938         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1939             this.shardId = shardId;
1940             this.response = response;
1941             this.leaderPath = leaderPath;
1942         }
1943
1944         ShardIdentifier getShardId() {
1945             return shardId;
1946         }
1947
1948         Object getResponse() {
1949             return response;
1950         }
1951
1952         String getLeaderPath() {
1953             return leaderPath;
1954         }
1955     }
1956
1957     private static final class ShardNotInitializedTimeout {
1958         private final ActorRef sender;
1959         private final ShardInformation shardInfo;
1960         private final OnShardInitialized onShardInitialized;
1961
1962         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1963             this.sender = sender;
1964             this.shardInfo = shardInfo;
1965             this.onShardInitialized = onShardInitialized;
1966         }
1967
1968         ActorRef getSender() {
1969             return sender;
1970         }
1971
1972         ShardInformation getShardInfo() {
1973             return shardInfo;
1974         }
1975
1976         OnShardInitialized getOnShardInitialized() {
1977             return onShardInitialized;
1978         }
1979     }
1980 }
1981
1982
1983