BUG 8447: Add shard getRole rpcs
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.actor.SupervisorStrategy.Directive;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberWeaklyUp;
23 import akka.cluster.Member;
24 import akka.dispatch.Futures;
25 import akka.dispatch.OnComplete;
26 import akka.japi.Function;
27 import akka.pattern.Patterns;
28 import akka.persistence.RecoveryCompleted;
29 import akka.persistence.SaveSnapshotFailure;
30 import akka.persistence.SaveSnapshotSuccess;
31 import akka.persistence.SnapshotOffer;
32 import akka.persistence.SnapshotSelectionCriteria;
33 import akka.util.Timeout;
34 import com.google.common.annotations.VisibleForTesting;
35 import com.google.common.base.Preconditions;
36 import java.util.ArrayList;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.HashMap;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.Set;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import java.util.function.Consumer;
49 import java.util.function.Supplier;
50 import org.opendaylight.controller.cluster.access.concepts.MemberName;
51 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
52 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
53 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
54 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
55 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
57 import org.opendaylight.controller.cluster.datastore.Shard;
58 import org.opendaylight.controller.cluster.datastore.config.Configuration;
59 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
60 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
61 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
63 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
64 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
65 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
66 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
67 import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
68 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
69 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
70 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
71 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
72 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
73 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
74 import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
75 import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
77 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
78 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
79 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
80 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
81 import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
82 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
83 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
84 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
85 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
86 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
87 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
88 import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
89 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
90 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
91 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
92 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
93 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
94 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
95 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
96 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
97 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
98 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
99 import org.opendaylight.controller.cluster.raft.messages.AddServer;
100 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
101 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
102 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
103 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
104 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
105 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
106 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
107 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
108 import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
109 import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
110 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
111 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
112 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
113 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
114 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
115 import org.opendaylight.yangtools.concepts.ListenerRegistration;
116 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
117 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
118 import org.slf4j.Logger;
119 import org.slf4j.LoggerFactory;
120 import scala.concurrent.ExecutionContext;
121 import scala.concurrent.Future;
122 import scala.concurrent.duration.Duration;
123 import scala.concurrent.duration.FiniteDuration;
124
125 /**
126  * Manages the shards for a data store. The ShardManager has the following jobs:
127  * <ul>
128  * <li> Create all the local shard replicas that belong on this cluster member
129  * <li> Find the address of the local shard
130  * <li> Find the primary replica for any given shard
131  * <li> Monitor the cluster members and store their addresses
132  * </ul>
133  */
134 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
135     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
136
137     // Stores a mapping between a shard name and it's corresponding information
138     // Shard names look like inventory, topology etc and are as specified in
139     // configuration
140     private final Map<String, ShardInformation> localShards = new HashMap<>();
141
142     // The type of a ShardManager reflects the type of the datastore itself
143     // A data store could be of type config/operational
144     private final String type;
145
146     private final ClusterWrapper cluster;
147
148     private final Configuration configuration;
149
150     private final String shardDispatcherPath;
151
152     private final ShardManagerInfo shardManagerMBean;
153
154     private DatastoreContextFactory datastoreContextFactory;
155
156     private final CountDownLatch waitTillReadyCountdownLatch;
157
158     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
159
160     private final ShardPeerAddressResolver peerAddressResolver;
161
162     private SchemaContext schemaContext;
163
164     private DatastoreSnapshot restoreFromSnapshot;
165
166     private ShardManagerSnapshot currentSnapshot;
167
168     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
169
170     private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
171
172     private final String persistenceId;
173     private final AbstractDataStore dataStore;
174
175     private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
176     private PrefixedShardConfigUpdateHandler configUpdateHandler;
177
178     ShardManager(AbstractShardManagerCreator<?> builder) {
179         this.cluster = builder.getCluster();
180         this.configuration = builder.getConfiguration();
181         this.datastoreContextFactory = builder.getDatastoreContextFactory();
182         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
183         this.shardDispatcherPath =
184                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
185         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
186         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
187         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
188
189         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
190         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
191
192         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
193
194         // Subscribe this actor to cluster member events
195         cluster.subscribeToMemberEvents(getSelf());
196
197         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
198                 "shard-manager-" + this.type,
199                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
200         shardManagerMBean.registerMBean();
201
202         dataStore = builder.getDistributedDataStore();
203     }
204
205     @Override
206     public void preStart() {
207         LOG.info("Starting ShardManager {}", persistenceId);
208     }
209
210     @Override
211     public void postStop() {
212         LOG.info("Stopping ShardManager {}", persistenceId());
213
214         shardManagerMBean.unregisterMBean();
215
216         if (configListenerReg != null) {
217             configListenerReg.close();
218             configListenerReg = null;
219         }
220     }
221
222     @Override
223     public void handleCommand(Object message) throws Exception {
224         if (message  instanceof FindPrimary) {
225             findPrimary((FindPrimary)message);
226         } else if (message instanceof FindLocalShard) {
227             findLocalShard((FindLocalShard) message);
228         } else if (message instanceof UpdateSchemaContext) {
229             updateSchemaContext(message);
230         } else if (message instanceof ActorInitialized) {
231             onActorInitialized(message);
232         } else if (message instanceof ClusterEvent.MemberUp) {
233             memberUp((ClusterEvent.MemberUp) message);
234         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
235             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
236         } else if (message instanceof ClusterEvent.MemberExited) {
237             memberExited((ClusterEvent.MemberExited) message);
238         } else if (message instanceof ClusterEvent.MemberRemoved) {
239             memberRemoved((ClusterEvent.MemberRemoved) message);
240         } else if (message instanceof ClusterEvent.UnreachableMember) {
241             memberUnreachable((ClusterEvent.UnreachableMember) message);
242         } else if (message instanceof ClusterEvent.ReachableMember) {
243             memberReachable((ClusterEvent.ReachableMember) message);
244         } else if (message instanceof DatastoreContextFactory) {
245             onDatastoreContextFactory((DatastoreContextFactory) message);
246         } else if (message instanceof RoleChangeNotification) {
247             onRoleChangeNotification((RoleChangeNotification) message);
248         } else if (message instanceof FollowerInitialSyncUpStatus) {
249             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
250         } else if (message instanceof ShardNotInitializedTimeout) {
251             onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
252         } else if (message instanceof ShardLeaderStateChanged) {
253             onLeaderStateChanged((ShardLeaderStateChanged) message);
254         } else if (message instanceof SwitchShardBehavior) {
255             onSwitchShardBehavior((SwitchShardBehavior) message);
256         } else if (message instanceof CreateShard) {
257             onCreateShard((CreateShard)message);
258         } else if (message instanceof AddShardReplica) {
259             onAddShardReplica((AddShardReplica) message);
260         } else if (message instanceof AddPrefixShardReplica) {
261             onAddPrefixShardReplica((AddPrefixShardReplica) message);
262         } else if (message instanceof PrefixShardCreated) {
263             onPrefixShardCreated((PrefixShardCreated) message);
264         } else if (message instanceof PrefixShardRemoved) {
265             onPrefixShardRemoved((PrefixShardRemoved) message);
266         } else if (message instanceof InitConfigListener) {
267             onInitConfigListener();
268         } else if (message instanceof ForwardedAddServerReply) {
269             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
270             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
271                     msg.removeShardOnFailure);
272         } else if (message instanceof ForwardedAddServerFailure) {
273             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
274             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
275         } else if (message instanceof RemoveShardReplica) {
276             onRemoveShardReplica((RemoveShardReplica) message);
277         } else if (message instanceof RemovePrefixShardReplica) {
278             onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
279         } else if (message instanceof WrappedShardResponse) {
280             onWrappedShardResponse((WrappedShardResponse) message);
281         } else if (message instanceof GetSnapshot) {
282             onGetSnapshot();
283         } else if (message instanceof ServerRemoved) {
284             onShardReplicaRemoved((ServerRemoved) message);
285         } else if (message instanceof ChangeShardMembersVotingStatus) {
286             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
287         } else if (message instanceof FlipShardMembersVotingStatus) {
288             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
289         } else if (message instanceof SaveSnapshotSuccess) {
290             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
291         } else if (message instanceof SaveSnapshotFailure) {
292             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
293                     ((SaveSnapshotFailure) message).cause());
294         } else if (message instanceof Shutdown) {
295             onShutDown();
296         } else if (message instanceof GetLocalShardIds) {
297             onGetLocalShardIds();
298         } else if (message instanceof GetShardRole) {
299             onGetShardRole((GetShardRole) message);
300         } else if (message instanceof RunnableMessage) {
301             ((RunnableMessage)message).run();
302         } else {
303             unknownMessage(message);
304         }
305     }
306
307     private void onGetShardRole(final GetShardRole message) {
308         LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
309
310         final String name = message.getName();
311
312         final ShardInformation shardInformation = localShards.get(name);
313
314         if (shardInformation == null) {
315             LOG.info("{}: no shard information for {} found", persistenceId(), name);
316             getSender().tell(new Status.Failure(
317                     new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender());
318             return;
319         }
320
321         getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
322     }
323
324     private void onInitConfigListener() {
325         LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
326
327         final org.opendaylight.mdsal.common.api.LogicalDatastoreType type =
328                 org.opendaylight.mdsal.common.api.LogicalDatastoreType
329                         .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
330
331         if (configUpdateHandler != null) {
332             configUpdateHandler.close();
333         }
334
335         configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
336         configUpdateHandler.initListener(dataStore, type);
337     }
338
339     private void onShutDown() {
340         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
341         for (ShardInformation info : localShards.values()) {
342             if (info.getActor() != null) {
343                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
344
345                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
346                         .getElectionTimeOutInterval().$times(2);
347                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
348             }
349         }
350
351         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
352
353         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
354                 .getDispatcher(Dispatchers.DispatcherType.Client);
355         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
356
357         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
358             @Override
359             public void onComplete(Throwable failure, Iterable<Boolean> results) {
360                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
361
362                 self().tell(PoisonPill.getInstance(), self());
363
364                 if (failure != null) {
365                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
366                 } else {
367                     int nfailed = 0;
368                     for (Boolean result : results) {
369                         if (!result) {
370                             nfailed++;
371                         }
372                     }
373
374                     if (nfailed > 0) {
375                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
376                     }
377                 }
378             }
379         }, dispatcher);
380     }
381
382     private void onWrappedShardResponse(WrappedShardResponse message) {
383         if (message.getResponse() instanceof RemoveServerReply) {
384             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
385                     message.getLeaderPath());
386         }
387     }
388
389     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
390             String leaderPath) {
391         shardReplicaOperationsInProgress.remove(shardId.getShardName());
392
393         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
394
395         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
396             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
397                     shardId.getShardName());
398             originalSender.tell(new Status.Success(null), getSelf());
399         } else {
400             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
401                     persistenceId(), shardId, replyMsg.getStatus());
402
403             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
404             originalSender.tell(new Status.Failure(failure), getSelf());
405         }
406     }
407
408     private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
409                                           final String primaryPath, final ActorRef sender) {
410         if (isShardReplicaOperationInProgress(shardName, sender)) {
411             return;
412         }
413
414         shardReplicaOperationsInProgress.add(shardName);
415
416         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
417
418         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
419
420         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
421         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
422                 primaryPath, shardId);
423
424         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
425         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
426                 new RemoveServer(shardId.toString()), removeServerTimeout);
427
428         futureObj.onComplete(new OnComplete<Object>() {
429             @Override
430             public void onComplete(Throwable failure, Object response) {
431                 if (failure != null) {
432                     shardReplicaOperationsInProgress.remove(shardName);
433                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
434                             primaryPath, shardName);
435
436                     LOG.debug("{}: {}", persistenceId(), msg, failure);
437
438                     // FAILURE
439                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
440                 } else {
441                     // SUCCESS
442                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
443                 }
444             }
445         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
446     }
447
448     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
449             final ActorRef sender) {
450         if (isShardReplicaOperationInProgress(shardName, sender)) {
451             return;
452         }
453
454         shardReplicaOperationsInProgress.add(shardName);
455
456         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
457
458         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
459
460         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
461         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
462                 primaryPath, shardId);
463
464         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
465         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
466                 new RemoveServer(shardId.toString()), removeServerTimeout);
467
468         futureObj.onComplete(new OnComplete<Object>() {
469             @Override
470             public void onComplete(Throwable failure, Object response) {
471                 if (failure != null) {
472                     shardReplicaOperationsInProgress.remove(shardName);
473                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
474                             primaryPath, shardName);
475
476                     LOG.debug("{}: {}", persistenceId(), msg, failure);
477
478                     // FAILURE
479                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
480                 } else {
481                     // SUCCESS
482                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
483                 }
484             }
485         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
486     }
487
488     private void onShardReplicaRemoved(ServerRemoved message) {
489         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
490     }
491
492     @SuppressWarnings("checkstyle:IllegalCatch")
493     private void removeShard(final ShardIdentifier shardId) {
494         final String shardName = shardId.getShardName();
495         final ShardInformation shardInformation = localShards.remove(shardName);
496         if (shardInformation == null) {
497             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
498             return;
499         }
500
501         final ActorRef shardActor = shardInformation.getActor();
502         if (shardActor != null) {
503             long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
504                     .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
505
506             LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
507                     timeoutInMS);
508
509             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
510                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
511
512             final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
513                 @Override
514                 public void onComplete(final Throwable failure, final Boolean result) {
515                     if (failure == null) {
516                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
517                     } else {
518                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
519                     }
520
521                     self().tell((RunnableMessage) () -> {
522                         shardActorsStopping.remove(shardName);
523                         notifyOnCompleteTasks(failure, result);
524                     }, ActorRef.noSender());
525                 }
526             };
527
528             shardActorsStopping.put(shardName, onComplete);
529             stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
530                     .getDispatcher(Dispatchers.DispatcherType.Client));
531         }
532
533         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
534         persistShardList();
535     }
536
537     private void onGetSnapshot() {
538         LOG.debug("{}: onGetSnapshot", persistenceId());
539
540         List<String> notInitialized = null;
541         for (ShardInformation shardInfo : localShards.values()) {
542             if (!shardInfo.isShardInitialized()) {
543                 if (notInitialized == null) {
544                     notInitialized = new ArrayList<>();
545                 }
546
547                 notInitialized.add(shardInfo.getShardName());
548             }
549         }
550
551         if (notInitialized != null) {
552             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
553                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
554             return;
555         }
556
557         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
558                 new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
559                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
560
561         for (ShardInformation shardInfo: localShards.values()) {
562             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
563         }
564     }
565
566     @SuppressWarnings("checkstyle:IllegalCatch")
567     private void onCreateShard(CreateShard createShard) {
568         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
569
570         Object reply;
571         try {
572             String shardName = createShard.getModuleShardConfig().getShardName();
573             if (localShards.containsKey(shardName)) {
574                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
575                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
576             } else {
577                 doCreateShard(createShard);
578                 reply = new Status.Success(null);
579             }
580         } catch (Exception e) {
581             LOG.error("{}: onCreateShard failed", persistenceId(), e);
582             reply = new Status.Failure(e);
583         }
584
585         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
586             getSender().tell(reply, getSelf());
587         }
588     }
589
590     private void onPrefixShardCreated(final PrefixShardCreated message) {
591         LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
592
593         final PrefixShardConfiguration config = message.getConfiguration();
594         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
595                 ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
596         final String shardName = shardId.getShardName();
597
598         if (isPreviousShardActorStopInProgress(shardName, message)) {
599             return;
600         }
601
602         if (localShards.containsKey(shardName)) {
603             LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
604             final PrefixShardConfiguration existing =
605                     configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
606
607             if (existing != null && existing.equals(config)) {
608                 // we don't have to do nothing here
609                 return;
610             }
611         }
612
613         doCreatePrefixShard(config, shardId, shardName);
614     }
615
616     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
617         final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
618         if (stopOnComplete == null) {
619             return false;
620         }
621
622         LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
623                 shardName, messageToDefer);
624         final ActorRef sender = getSender();
625         stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
626             @Override
627             public void onComplete(Throwable failure, Boolean result) {
628                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
629                 self().tell(messageToDefer, sender);
630             }
631         });
632
633         return true;
634     }
635
636     private void doCreatePrefixShard(PrefixShardConfiguration config, ShardIdentifier shardId, String shardName) {
637         configuration.addPrefixShardConfiguration(config);
638
639         final Builder builder = newShardDatastoreContextBuilder(shardName);
640         builder.logicalStoreType(LogicalDatastoreType.valueOf(config.getPrefix().getDatastoreType().name()))
641                 .storeRoot(config.getPrefix().getRootIdentifier());
642         DatastoreContext shardDatastoreContext = builder.build();
643
644         final Map<String, String> peerAddresses = getPeerAddresses(shardName);
645         final boolean isActiveMember = true;
646
647         LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
648                 persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
649
650         final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
651                 shardDatastoreContext, Shard.builder(), peerAddressResolver);
652         info.setActiveMember(isActiveMember);
653         localShards.put(info.getShardName(), info);
654
655         if (schemaContext != null) {
656             info.setActor(newShardActor(schemaContext, info));
657         }
658     }
659
660     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
661         LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
662
663         final DOMDataTreeIdentifier prefix = message.getPrefix();
664         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
665                 ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
666
667         configuration.removePrefixShardConfiguration(prefix);
668         removeShard(shardId);
669     }
670
671     private void doCreateShard(final CreateShard createShard) {
672         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
673         final String shardName = moduleShardConfig.getShardName();
674
675         configuration.addModuleShardConfiguration(moduleShardConfig);
676
677         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
678         if (shardDatastoreContext == null) {
679             shardDatastoreContext = newShardDatastoreContext(shardName);
680         } else {
681             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
682                     peerAddressResolver).build();
683         }
684
685         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
686
687         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
688                 && currentSnapshot.getShardList().contains(shardName);
689
690         Map<String, String> peerAddresses;
691         boolean isActiveMember;
692         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
693                 .contains(cluster.getCurrentMemberName())) {
694             peerAddresses = getPeerAddresses(shardName);
695             isActiveMember = true;
696         } else {
697             // The local member is not in the static shard member configuration and the shard did not
698             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
699             // the shard with no peers and with elections disabled so it stays as follower. A
700             // subsequent AddServer request will be needed to make it an active member.
701             isActiveMember = false;
702             peerAddresses = Collections.emptyMap();
703             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
704                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
705         }
706
707         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
708                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
709                 isActiveMember);
710
711         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
712                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
713         info.setActiveMember(isActiveMember);
714         localShards.put(info.getShardName(), info);
715
716         if (schemaContext != null) {
717             info.setActor(newShardActor(schemaContext, info));
718         }
719     }
720
721     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
722         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
723                 .shardPeerAddressResolver(peerAddressResolver);
724     }
725
726     private DatastoreContext newShardDatastoreContext(String shardName) {
727         return newShardDatastoreContextBuilder(shardName).build();
728     }
729
730     private void checkReady() {
731         if (isReadyWithLeaderId()) {
732             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
733                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
734
735             waitTillReadyCountdownLatch.countDown();
736         }
737     }
738
739     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
740         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
741
742         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
743         if (shardInformation != null) {
744             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
745             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
746             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
747                 primaryShardInfoCache.remove(shardInformation.getShardName());
748             }
749
750             checkReady();
751         } else {
752             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
753         }
754     }
755
756     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
757         ShardInformation shardInfo = message.getShardInfo();
758
759         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
760                 shardInfo.getShardName());
761
762         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
763
764         if (!shardInfo.isShardInitialized()) {
765             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
766             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
767         } else {
768             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
769             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
770         }
771     }
772
773     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
774         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
775                 status.getName(), status.isInitialSyncDone());
776
777         ShardInformation shardInformation = findShardInformation(status.getName());
778
779         if (shardInformation != null) {
780             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
781
782             shardManagerMBean.setSyncStatus(isInSync());
783         }
784
785     }
786
787     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
788         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
789                 roleChanged.getOldRole(), roleChanged.getNewRole());
790
791         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
792         if (shardInformation != null) {
793             shardInformation.setRole(roleChanged.getNewRole());
794             checkReady();
795             shardManagerMBean.setSyncStatus(isInSync());
796         }
797     }
798
799
800     private ShardInformation findShardInformation(String memberId) {
801         for (ShardInformation info : localShards.values()) {
802             if (info.getShardId().toString().equals(memberId)) {
803                 return info;
804             }
805         }
806
807         return null;
808     }
809
810     private boolean isReadyWithLeaderId() {
811         boolean isReady = true;
812         for (ShardInformation info : localShards.values()) {
813             if (!info.isShardReadyWithLeaderId()) {
814                 isReady = false;
815                 break;
816             }
817         }
818         return isReady;
819     }
820
821     private boolean isInSync() {
822         for (ShardInformation info : localShards.values()) {
823             if (!info.isInSync()) {
824                 return false;
825             }
826         }
827         return true;
828     }
829
830     private void onActorInitialized(Object message) {
831         final ActorRef sender = getSender();
832
833         if (sender == null) {
834             return; //why is a non-actor sending this message? Just ignore.
835         }
836
837         String actorName = sender.path().name();
838         //find shard name from actor name; actor name is stringified shardId
839
840         final ShardIdentifier shardId;
841         try {
842             shardId = ShardIdentifier.fromShardIdString(actorName);
843         } catch (IllegalArgumentException e) {
844             LOG.debug("{}: ignoring actor {}", actorName, e);
845             return;
846         }
847
848         markShardAsInitialized(shardId.getShardName());
849     }
850
851     private void markShardAsInitialized(String shardName) {
852         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
853
854         ShardInformation shardInformation = localShards.get(shardName);
855         if (shardInformation != null) {
856             shardInformation.setActorInitialized();
857
858             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
859         }
860     }
861
862     @Override
863     protected void handleRecover(Object message) throws Exception {
864         if (message instanceof RecoveryCompleted) {
865             onRecoveryCompleted();
866         } else if (message instanceof SnapshotOffer) {
867             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
868         }
869     }
870
871     @SuppressWarnings("checkstyle:IllegalCatch")
872     private void onRecoveryCompleted() {
873         LOG.info("Recovery complete : {}", persistenceId());
874
875         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
876         // journal on upgrade from Helium.
877         deleteMessages(lastSequenceNr());
878
879         if (currentSnapshot == null && restoreFromSnapshot != null
880                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
881             ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
882
883             LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
884
885             applyShardManagerSnapshot(snapshot);
886         }
887
888         createLocalShards();
889     }
890
891     private void sendResponse(ShardInformation shardInformation, boolean doWait,
892             boolean wantShardReady, final Supplier<Object> messageSupplier) {
893         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
894             if (doWait) {
895                 final ActorRef sender = getSender();
896                 final ActorRef self = self();
897
898                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
899
900                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
901                     new OnShardInitialized(replyRunnable);
902
903                 shardInformation.addOnShardInitialized(onShardInitialized);
904
905                 FiniteDuration timeout = shardInformation.getDatastoreContext()
906                         .getShardInitializationTimeout().duration();
907                 if (shardInformation.isShardInitialized()) {
908                     // If the shard is already initialized then we'll wait enough time for the shard to
909                     // elect a leader, ie 2 times the election timeout.
910                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
911                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
912                 }
913
914                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
915                         shardInformation.getShardName());
916
917                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
918                         timeout, getSelf(),
919                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
920                         getContext().dispatcher(), getSelf());
921
922                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
923
924             } else if (!shardInformation.isShardInitialized()) {
925                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
926                         shardInformation.getShardName());
927                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
928             } else {
929                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
930                         shardInformation.getShardName());
931                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
932             }
933
934             return;
935         }
936
937         getSender().tell(messageSupplier.get(), getSelf());
938     }
939
940     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
941         return new NoShardLeaderException(null, shardId.toString());
942     }
943
944     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
945         return new NotInitializedException(String.format(
946                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
947     }
948
949     @VisibleForTesting
950     static MemberName memberToName(final Member member) {
951         return MemberName.forName(member.roles().iterator().next());
952     }
953
954     private void memberRemoved(ClusterEvent.MemberRemoved message) {
955         MemberName memberName = memberToName(message.member());
956
957         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
958                 message.member().address());
959
960         peerAddressResolver.removePeerAddress(memberName);
961
962         for (ShardInformation info : localShards.values()) {
963             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
964         }
965     }
966
967     private void memberExited(ClusterEvent.MemberExited message) {
968         MemberName memberName = memberToName(message.member());
969
970         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
971                 message.member().address());
972
973         peerAddressResolver.removePeerAddress(memberName);
974
975         for (ShardInformation info : localShards.values()) {
976             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
977         }
978     }
979
980     private void memberUp(ClusterEvent.MemberUp message) {
981         MemberName memberName = memberToName(message.member());
982
983         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
984                 message.member().address());
985
986         memberUp(memberName, message.member().address());
987     }
988
989     private void memberUp(MemberName memberName, Address address) {
990         addPeerAddress(memberName, address);
991         checkReady();
992     }
993
994     private void memberWeaklyUp(MemberWeaklyUp message) {
995         MemberName memberName = memberToName(message.member());
996
997         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
998                 message.member().address());
999
1000         memberUp(memberName, message.member().address());
1001     }
1002
1003     private void addPeerAddress(MemberName memberName, Address address) {
1004         peerAddressResolver.addPeerAddress(memberName, address);
1005
1006         for (ShardInformation info : localShards.values()) {
1007             String shardName = info.getShardName();
1008             String peerId = getShardIdentifier(memberName, shardName).toString();
1009             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
1010
1011             info.peerUp(memberName, peerId, getSelf());
1012         }
1013     }
1014
1015     private void memberReachable(ClusterEvent.ReachableMember message) {
1016         MemberName memberName = memberToName(message.member());
1017         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
1018
1019         addPeerAddress(memberName, message.member().address());
1020
1021         markMemberAvailable(memberName);
1022     }
1023
1024     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
1025         MemberName memberName = memberToName(message.member());
1026         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
1027
1028         markMemberUnavailable(memberName);
1029     }
1030
1031     private void markMemberUnavailable(final MemberName memberName) {
1032         final String memberStr = memberName.getName();
1033         for (ShardInformation info : localShards.values()) {
1034             String leaderId = info.getLeaderId();
1035             // XXX: why are we using String#contains() here?
1036             if (leaderId != null && leaderId.contains(memberStr)) {
1037                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
1038                 info.setLeaderAvailable(false);
1039
1040                 primaryShardInfoCache.remove(info.getShardName());
1041             }
1042
1043             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
1044         }
1045     }
1046
1047     private void markMemberAvailable(final MemberName memberName) {
1048         final String memberStr = memberName.getName();
1049         for (ShardInformation info : localShards.values()) {
1050             String leaderId = info.getLeaderId();
1051             // XXX: why are we using String#contains() here?
1052             if (leaderId != null && leaderId.contains(memberStr)) {
1053                 LOG.debug("Marking Leader {} as available.", leaderId);
1054                 info.setLeaderAvailable(true);
1055             }
1056
1057             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
1058         }
1059     }
1060
1061     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
1062         datastoreContextFactory = factory;
1063         for (ShardInformation info : localShards.values()) {
1064             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
1065         }
1066     }
1067
1068     private void onGetLocalShardIds() {
1069         final List<String> response = new ArrayList<>(localShards.size());
1070
1071         for (ShardInformation info : localShards.values()) {
1072             response.add(info.getShardId().toString());
1073         }
1074
1075         getSender().tell(new Status.Success(response), getSelf());
1076     }
1077
1078     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
1079         final ShardIdentifier identifier = message.getShardId();
1080
1081         if (identifier != null) {
1082             final ShardInformation info = localShards.get(identifier.getShardName());
1083             if (info == null) {
1084                 getSender().tell(new Status.Failure(
1085                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
1086                 return;
1087             }
1088
1089             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1090         } else {
1091             for (ShardInformation info : localShards.values()) {
1092                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1093             }
1094         }
1095
1096         getSender().tell(new Status.Success(null), getSelf());
1097     }
1098
1099     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
1100         final ActorRef actor = info.getActor();
1101         if (actor != null) {
1102             actor.tell(switchBehavior, getSelf());
1103         } else {
1104             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
1105                 info.getShardName(), switchBehavior.getNewState());
1106         }
1107     }
1108
1109     /**
1110      * Notifies all the local shards of a change in the schema context.
1111      *
1112      * @param message the message to send
1113      */
1114     private void updateSchemaContext(final Object message) {
1115         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
1116
1117         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
1118
1119         for (ShardInformation info : localShards.values()) {
1120             if (info.getActor() == null) {
1121                 LOG.debug("Creating Shard {}", info.getShardId());
1122                 info.setActor(newShardActor(schemaContext, info));
1123             } else {
1124                 info.getActor().tell(message, getSelf());
1125             }
1126         }
1127     }
1128
1129     @VisibleForTesting
1130     protected ClusterWrapper getCluster() {
1131         return cluster;
1132     }
1133
1134     @VisibleForTesting
1135     protected ActorRef newShardActor(final SchemaContext shardSchemaContext, final ShardInformation info) {
1136         return getContext().actorOf(info.newProps(shardSchemaContext).withDispatcher(shardDispatcherPath),
1137                 info.getShardId().toString());
1138     }
1139
1140     private void findPrimary(FindPrimary message) {
1141         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
1142
1143         final String shardName = message.getShardName();
1144         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
1145
1146         // First see if the there is a local replica for the shard
1147         final ShardInformation info = localShards.get(shardName);
1148         if (info != null && info.isActiveMember()) {
1149             sendResponse(info, message.isWaitUntilReady(), true, () -> {
1150                 String primaryPath = info.getSerializedLeaderActor();
1151                 Object found = canReturnLocalShardState && info.isLeader()
1152                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
1153                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
1154
1155                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
1156                 return found;
1157             });
1158
1159             return;
1160         }
1161
1162         final Collection<String> visitedAddresses;
1163         if (message instanceof RemoteFindPrimary) {
1164             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1165         } else {
1166             visitedAddresses = new ArrayList<>(1);
1167         }
1168
1169         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1170
1171         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1172             if (visitedAddresses.contains(address)) {
1173                 continue;
1174             }
1175
1176             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1177                     persistenceId(), shardName, address, visitedAddresses);
1178
1179             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1180                     message.isWaitUntilReady(), visitedAddresses), getContext());
1181             return;
1182         }
1183
1184         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1185
1186         getSender().tell(new PrimaryNotFoundException(
1187                 String.format("No primary shard found for %s.", shardName)), getSelf());
1188     }
1189
1190     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1191         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1192                 .getShardInitializationTimeout().duration().$times(2));
1193
1194         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1195         futureObj.onComplete(new OnComplete<Object>() {
1196             @Override
1197             public void onComplete(Throwable failure, Object response) {
1198                 if (failure != null) {
1199                     handler.onFailure(failure);
1200                 } else {
1201                     if (response instanceof RemotePrimaryShardFound) {
1202                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1203                     } else if (response instanceof LocalPrimaryShardFound) {
1204                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1205                     } else {
1206                         handler.onUnknownResponse(response);
1207                     }
1208                 }
1209             }
1210         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1211     }
1212
1213     /**
1214      * Construct the name of the shard actor given the name of the member on
1215      * which the shard resides and the name of the shard.
1216      *
1217      * @param memberName the member name
1218      * @param shardName the shard name
1219      * @return a b
1220      */
1221     private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName) {
1222         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1223     }
1224
1225     /**
1226      * Create shards that are local to the member on which the ShardManager runs.
1227      */
1228     private void createLocalShards() {
1229         MemberName memberName = this.cluster.getCurrentMemberName();
1230         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
1231
1232         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1233         if (restoreFromSnapshot != null) {
1234             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1235                 shardSnapshots.put(snapshot.getName(), snapshot);
1236             }
1237         }
1238
1239         restoreFromSnapshot = null; // null out to GC
1240
1241         for (String shardName : memberShardNames) {
1242             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1243
1244             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1245
1246             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1247             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
1248                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
1249                         shardSnapshots.get(shardName)), peerAddressResolver));
1250         }
1251     }
1252
1253     /**
1254      * Given the name of the shard find the addresses of all it's peers.
1255      *
1256      * @param shardName the shard name
1257      */
1258     private Map<String, String> getPeerAddresses(String shardName) {
1259         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1260         return getPeerAddresses(shardName, members);
1261     }
1262
1263     private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
1264         Map<String, String> peerAddresses = new HashMap<>();
1265         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1266
1267         for (MemberName memberName : members) {
1268             if (!currentMemberName.equals(memberName)) {
1269                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1270                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1271                 peerAddresses.put(shardId.toString(), address);
1272             }
1273         }
1274         return peerAddresses;
1275     }
1276
1277     @Override
1278     public SupervisorStrategy supervisorStrategy() {
1279
1280         return new OneForOneStrategy(10, Duration.create("1 minute"),
1281                 (Function<Throwable, Directive>) t -> {
1282                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1283                     return SupervisorStrategy.resume();
1284                 });
1285     }
1286
1287     @Override
1288     public String persistenceId() {
1289         return persistenceId;
1290     }
1291
1292     @VisibleForTesting
1293     ShardManagerInfoMBean getMBean() {
1294         return shardManagerMBean;
1295     }
1296
1297     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1298         if (shardReplicaOperationsInProgress.contains(shardName)) {
1299             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1300             LOG.debug("{}: {}", persistenceId(), msg);
1301             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1302             return true;
1303         }
1304
1305         return false;
1306     }
1307
1308     private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
1309         LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
1310
1311         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1312                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1313         final String shardName = shardId.getShardName();
1314
1315         // Create the localShard
1316         if (schemaContext == null) {
1317             String msg = String.format(
1318                     "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1319             LOG.debug("{}: {}", persistenceId(), msg);
1320             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1321             return;
1322         }
1323
1324         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1325                 getSelf()) {
1326             @Override
1327             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1328                 final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
1329                         message.getShardPrefix(), response, getSender());
1330                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1331                     getSelf().tell(runnable, getTargetActor());
1332                 }
1333             }
1334
1335             @Override
1336             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1337                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1338             }
1339         });
1340     }
1341
1342     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1343         final String shardName = shardReplicaMsg.getShardName();
1344
1345         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1346
1347         // verify the shard with the specified name is present in the cluster configuration
1348         if (!this.configuration.isShardConfigured(shardName)) {
1349             String msg = String.format("No module configuration exists for shard %s", shardName);
1350             LOG.debug("{}: {}", persistenceId(), msg);
1351             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1352             return;
1353         }
1354
1355         // Create the localShard
1356         if (schemaContext == null) {
1357             String msg = String.format(
1358                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1359             LOG.debug("{}: {}", persistenceId(), msg);
1360             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1361             return;
1362         }
1363
1364         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1365                 getSelf()) {
1366             @Override
1367             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1368                 final RunnableMessage runnable = (RunnableMessage) () ->
1369                     addShard(getShardName(), response, getSender());
1370                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1371                     getSelf().tell(runnable, getTargetActor());
1372                 }
1373             }
1374
1375             @Override
1376             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1377                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1378             }
1379         });
1380     }
1381
1382     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1383         String msg = String.format("Local shard %s already exists", shardName);
1384         LOG.debug("{}: {}", persistenceId(), msg);
1385         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1386     }
1387
1388     private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
1389                                 final RemotePrimaryShardFound response, final ActorRef sender) {
1390         if (isShardReplicaOperationInProgress(shardName, sender)) {
1391             return;
1392         }
1393
1394         shardReplicaOperationsInProgress.add(shardName);
1395
1396         final ShardInformation shardInfo;
1397         final boolean removeShardOnFailure;
1398         ShardInformation existingShardInfo = localShards.get(shardName);
1399         if (existingShardInfo == null) {
1400             removeShardOnFailure = true;
1401             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1402
1403             final Builder builder = newShardDatastoreContextBuilder(shardName);
1404             builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1405
1406             DatastoreContext datastoreContext = builder.build();
1407
1408             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1409                     Shard.builder(), peerAddressResolver);
1410             shardInfo.setActiveMember(false);
1411             localShards.put(shardName, shardInfo);
1412             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1413         } else {
1414             removeShardOnFailure = false;
1415             shardInfo = existingShardInfo;
1416         }
1417
1418         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1419     }
1420
1421     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1422         if (isShardReplicaOperationInProgress(shardName, sender)) {
1423             return;
1424         }
1425
1426         shardReplicaOperationsInProgress.add(shardName);
1427
1428         final ShardInformation shardInfo;
1429         final boolean removeShardOnFailure;
1430         ShardInformation existingShardInfo = localShards.get(shardName);
1431         if (existingShardInfo == null) {
1432             removeShardOnFailure = true;
1433             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1434
1435             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1436                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1437
1438             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1439                     Shard.builder(), peerAddressResolver);
1440             shardInfo.setActiveMember(false);
1441             localShards.put(shardName, shardInfo);
1442             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1443         } else {
1444             removeShardOnFailure = false;
1445             shardInfo = existingShardInfo;
1446         }
1447
1448         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1449     }
1450
1451     private void execAddShard(final String shardName,
1452                               final ShardInformation shardInfo,
1453                               final RemotePrimaryShardFound response,
1454                               final boolean removeShardOnFailure,
1455                               final ActorRef sender) {
1456
1457         final String localShardAddress =
1458                 peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1459
1460         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1461         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1462                 response.getPrimaryPath(), shardInfo.getShardId());
1463
1464         final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1465                 .getShardLeaderElectionTimeout().duration());
1466         final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1467                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1468
1469         futureObj.onComplete(new OnComplete<Object>() {
1470             @Override
1471             public void onComplete(Throwable failure, Object addServerResponse) {
1472                 if (failure != null) {
1473                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1474                             response.getPrimaryPath(), shardName, failure);
1475
1476                     final String msg = String.format("AddServer request to leader %s for shard %s failed",
1477                             response.getPrimaryPath(), shardName);
1478                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1479                 } else {
1480                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1481                             response.getPrimaryPath(), removeShardOnFailure), sender);
1482                 }
1483             }
1484         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1485     }
1486
1487     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1488             boolean removeShardOnFailure) {
1489         shardReplicaOperationsInProgress.remove(shardName);
1490
1491         if (removeShardOnFailure) {
1492             ShardInformation shardInfo = localShards.remove(shardName);
1493             if (shardInfo.getActor() != null) {
1494                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1495             }
1496         }
1497
1498         sender.tell(new Status.Failure(message == null ? failure :
1499             new RuntimeException(message, failure)), getSelf());
1500     }
1501
1502     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1503             String leaderPath, boolean removeShardOnFailure) {
1504         String shardName = shardInfo.getShardName();
1505         shardReplicaOperationsInProgress.remove(shardName);
1506
1507         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1508
1509         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1510             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1511
1512             // Make the local shard voting capable
1513             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1514             shardInfo.setActiveMember(true);
1515             persistShardList();
1516
1517             sender.tell(new Status.Success(null), getSelf());
1518         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1519             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1520         } else {
1521             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1522                     persistenceId(), shardName, replyMsg.getStatus());
1523
1524             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1525                     shardInfo.getShardId());
1526
1527             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1528         }
1529     }
1530
1531     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1532                                                String leaderPath, ShardIdentifier shardId) {
1533         Exception failure;
1534         switch (serverChangeStatus) {
1535             case TIMEOUT:
1536                 failure = new TimeoutException(String.format(
1537                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
1538                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
1539                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
1540                 break;
1541             case NO_LEADER:
1542                 failure = createNoShardLeaderException(shardId);
1543                 break;
1544             case NOT_SUPPORTED:
1545                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1546                         serverChange.getSimpleName(), shardId.getShardName()));
1547                 break;
1548             default :
1549                 failure = new RuntimeException(String.format(
1550                         "%s request to leader %s for shard %s failed with status %s",
1551                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1552         }
1553         return failure;
1554     }
1555
1556     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1557         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1558
1559         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1560                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1561             @Override
1562             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1563                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1564             }
1565
1566             @Override
1567             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1568                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1569             }
1570
1571             private void doRemoveShardReplicaAsync(final String primaryPath) {
1572                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1573                         primaryPath, getSender()), getTargetActor());
1574             }
1575         });
1576     }
1577
1578     private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
1579         LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
1580
1581         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1582                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1583         final String shardName = shardId.getShardName();
1584
1585         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
1586                 shardName, persistenceId(), getSelf()) {
1587             @Override
1588             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1589                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1590             }
1591
1592             @Override
1593             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1594                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1595             }
1596
1597             private void doRemoveShardReplicaAsync(final String primaryPath) {
1598                 getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
1599                         primaryPath, getSender()), getTargetActor());
1600             }
1601         });
1602     }
1603
1604     private void persistShardList() {
1605         List<String> shardList = new ArrayList<>(localShards.keySet());
1606         for (ShardInformation shardInfo : localShards.values()) {
1607             if (!shardInfo.isActiveMember()) {
1608                 shardList.remove(shardInfo.getShardName());
1609             }
1610         }
1611         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1612         saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
1613     }
1614
1615     private ShardManagerSnapshot updateShardManagerSnapshot(
1616             final List<String> shardList,
1617             final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
1618         currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
1619         return currentSnapshot;
1620     }
1621
1622     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1623         currentSnapshot = snapshot;
1624
1625         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1626
1627         final MemberName currentMember = cluster.getCurrentMemberName();
1628         Set<String> configuredShardList =
1629             new HashSet<>(configuration.getMemberShardNames(currentMember));
1630         for (String shard : currentSnapshot.getShardList()) {
1631             if (!configuredShardList.contains(shard)) {
1632                 // add the current member as a replica for the shard
1633                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1634                 configuration.addMemberReplicaForShard(shard, currentMember);
1635             } else {
1636                 configuredShardList.remove(shard);
1637             }
1638         }
1639         for (String shard : configuredShardList) {
1640             // remove the member as a replica for the shard
1641             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1642             configuration.removeMemberReplicaForShard(shard, currentMember);
1643         }
1644     }
1645
1646     private void onSaveSnapshotSuccess(SaveSnapshotSuccess successMessage) {
1647         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1648             persistenceId());
1649         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1650             0, 0));
1651     }
1652
1653     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1654         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1655
1656         String shardName = changeMembersVotingStatus.getShardName();
1657         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1658         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1659             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1660                     e.getValue());
1661         }
1662
1663         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1664
1665         findLocalShard(shardName, getSender(),
1666             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1667             localShardFound.getPath(), getSender()));
1668     }
1669
1670     private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
1671         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1672
1673         ActorRef sender = getSender();
1674         final String shardName = flipMembersVotingStatus.getShardName();
1675         findLocalShard(shardName, sender, localShardFound -> {
1676             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1677                     Timeout.apply(30, TimeUnit.SECONDS));
1678
1679             future.onComplete(new OnComplete<Object>() {
1680                 @Override
1681                 public void onComplete(Throwable failure, Object response) {
1682                     if (failure != null) {
1683                         sender.tell(new Status.Failure(new RuntimeException(
1684                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1685                         return;
1686                     }
1687
1688                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1689                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1690                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1691                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1692                     }
1693
1694                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1695                             .toString(), !raftState.isVoting());
1696
1697                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1698                             shardName, localShardFound.getPath(), sender);
1699                 }
1700             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1701         });
1702
1703     }
1704
1705     private void findLocalShard(FindLocalShard message) {
1706         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
1707
1708         final ShardInformation shardInformation = localShards.get(message.getShardName());
1709
1710         if (shardInformation == null) {
1711             LOG.debug("{}: Local shard {} not found - shards present: {}",
1712                     persistenceId(), message.getShardName(), localShards.keySet());
1713
1714             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1715             return;
1716         }
1717
1718         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1719             () -> new LocalShardFound(shardInformation.getActor()));
1720     }
1721
1722     private void findLocalShard(final String shardName, final ActorRef sender,
1723             final Consumer<LocalShardFound> onLocalShardFound) {
1724         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1725                 .getShardInitializationTimeout().duration().$times(2));
1726
1727         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1728         futureObj.onComplete(new OnComplete<Object>() {
1729             @Override
1730             public void onComplete(Throwable failure, Object response) {
1731                 if (failure != null) {
1732                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1733                             failure);
1734                     sender.tell(new Status.Failure(new RuntimeException(
1735                             String.format("Failed to find local shard %s", shardName), failure)), self());
1736                 } else {
1737                     if (response instanceof LocalShardFound) {
1738                         getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
1739                                 sender);
1740                     } else if (response instanceof LocalShardNotFound) {
1741                         String msg = String.format("Local shard %s does not exist", shardName);
1742                         LOG.debug("{}: {}", persistenceId, msg);
1743                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
1744                     } else {
1745                         String msg = String.format("Failed to find local shard %s: received response: %s",
1746                                 shardName, response);
1747                         LOG.debug("{}: {}", persistenceId, msg);
1748                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1749                                 new RuntimeException(msg)), self());
1750                     }
1751                 }
1752             }
1753         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1754     }
1755
1756     private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
1757             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1758         if (isShardReplicaOperationInProgress(shardName, sender)) {
1759             return;
1760         }
1761
1762         shardReplicaOperationsInProgress.add(shardName);
1763
1764         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1765         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1766
1767         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1768                 changeServersVotingStatus, shardActorRef.path());
1769
1770         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1771         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1772
1773         futureObj.onComplete(new OnComplete<Object>() {
1774             @Override
1775             public void onComplete(Throwable failure, Object response) {
1776                 shardReplicaOperationsInProgress.remove(shardName);
1777                 if (failure != null) {
1778                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
1779                             shardActorRef.path());
1780                     LOG.debug("{}: {}", persistenceId(), msg, failure);
1781                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
1782                 } else {
1783                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1784
1785                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1786                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1787                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1788                         sender.tell(new Status.Success(null), getSelf());
1789                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1790                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1791                                 "The requested voting state change for shard %s is invalid. At least one member "
1792                                 + "must be voting", shardId.getShardName()))), getSelf());
1793                     } else {
1794                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1795                                 persistenceId(), shardName, replyMsg.getStatus());
1796
1797                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1798                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1799                         sender.tell(new Status.Failure(error), getSelf());
1800                     }
1801                 }
1802             }
1803         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1804     }
1805
1806     private static final class ForwardedAddServerReply {
1807         ShardInformation shardInfo;
1808         AddServerReply addServerReply;
1809         String leaderPath;
1810         boolean removeShardOnFailure;
1811
1812         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1813                 boolean removeShardOnFailure) {
1814             this.shardInfo = shardInfo;
1815             this.addServerReply = addServerReply;
1816             this.leaderPath = leaderPath;
1817             this.removeShardOnFailure = removeShardOnFailure;
1818         }
1819     }
1820
1821     private static final class ForwardedAddServerFailure {
1822         String shardName;
1823         String failureMessage;
1824         Throwable failure;
1825         boolean removeShardOnFailure;
1826
1827         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1828                 boolean removeShardOnFailure) {
1829             this.shardName = shardName;
1830             this.failureMessage = failureMessage;
1831             this.failure = failure;
1832             this.removeShardOnFailure = removeShardOnFailure;
1833         }
1834     }
1835
1836     static class OnShardInitialized {
1837         private final Runnable replyRunnable;
1838         private Cancellable timeoutSchedule;
1839
1840         OnShardInitialized(Runnable replyRunnable) {
1841             this.replyRunnable = replyRunnable;
1842         }
1843
1844         Runnable getReplyRunnable() {
1845             return replyRunnable;
1846         }
1847
1848         Cancellable getTimeoutSchedule() {
1849             return timeoutSchedule;
1850         }
1851
1852         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1853             this.timeoutSchedule = timeoutSchedule;
1854         }
1855     }
1856
1857     static class OnShardReady extends OnShardInitialized {
1858         OnShardReady(Runnable replyRunnable) {
1859             super(replyRunnable);
1860         }
1861     }
1862
1863     private interface RunnableMessage extends Runnable {
1864     }
1865
1866     /**
1867      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1868      * a remote or local find primary message is processed.
1869      */
1870     private interface FindPrimaryResponseHandler {
1871         /**
1872          * Invoked when a Failure message is received as a response.
1873          *
1874          * @param failure the failure exception
1875          */
1876         void onFailure(Throwable failure);
1877
1878         /**
1879          * Invoked when a RemotePrimaryShardFound response is received.
1880          *
1881          * @param response the response
1882          */
1883         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1884
1885         /**
1886          * Invoked when a LocalPrimaryShardFound response is received.
1887          *
1888          * @param response the response
1889          */
1890         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1891
1892         /**
1893          * Invoked when an unknown response is received. This is another type of failure.
1894          *
1895          * @param response the response
1896          */
1897         void onUnknownResponse(Object response);
1898     }
1899
1900     /**
1901      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1902      * replica and sends a wrapped Failure response to some targetActor.
1903      */
1904     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1905         private final ActorRef targetActor;
1906         private final String shardName;
1907         private final String persistenceId;
1908         private final ActorRef shardManagerActor;
1909
1910         /**
1911          * Constructs an instance.
1912          *
1913          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1914          * @param shardName The name of the shard for which the primary replica had to be found
1915          * @param persistenceId The persistenceId for the ShardManager
1916          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1917          */
1918         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId,
1919                 ActorRef shardManagerActor) {
1920             this.targetActor = Preconditions.checkNotNull(targetActor);
1921             this.shardName = Preconditions.checkNotNull(shardName);
1922             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1923             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1924         }
1925
1926         public ActorRef getTargetActor() {
1927             return targetActor;
1928         }
1929
1930         public String getShardName() {
1931             return shardName;
1932         }
1933
1934         @Override
1935         public void onFailure(Throwable failure) {
1936             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1937             targetActor.tell(new Status.Failure(new RuntimeException(
1938                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1939         }
1940
1941         @Override
1942         public void onUnknownResponse(Object response) {
1943             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1944                     shardName, response);
1945             LOG.debug("{}: {}", persistenceId, msg);
1946             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1947                     new RuntimeException(msg)), shardManagerActor);
1948         }
1949     }
1950
1951     /**
1952      * The WrappedShardResponse class wraps a response from a Shard.
1953      */
1954     private static final class WrappedShardResponse {
1955         private final ShardIdentifier shardId;
1956         private final Object response;
1957         private final String leaderPath;
1958
1959         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1960             this.shardId = shardId;
1961             this.response = response;
1962             this.leaderPath = leaderPath;
1963         }
1964
1965         ShardIdentifier getShardId() {
1966             return shardId;
1967         }
1968
1969         Object getResponse() {
1970             return response;
1971         }
1972
1973         String getLeaderPath() {
1974             return leaderPath;
1975         }
1976     }
1977
1978     private static final class ShardNotInitializedTimeout {
1979         private final ActorRef sender;
1980         private final ShardInformation shardInfo;
1981         private final OnShardInitialized onShardInitialized;
1982
1983         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1984             this.sender = sender;
1985             this.shardInfo = shardInfo;
1986             this.onShardInitialized = onShardInitialized;
1987         }
1988
1989         ActorRef getSender() {
1990             return sender;
1991         }
1992
1993         ShardInformation getShardInfo() {
1994             return shardInfo;
1995         }
1996
1997         OnShardInitialized getOnShardInitialized() {
1998             return onShardInitialized;
1999         }
2000     }
2001 }
2002
2003
2004