Convert CDS implementation to use msdal APIs
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.actor.SupervisorStrategy.Directive;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.ClusterEvent.MemberWeaklyUp;
23 import akka.cluster.Member;
24 import akka.dispatch.Futures;
25 import akka.dispatch.OnComplete;
26 import akka.japi.Function;
27 import akka.pattern.Patterns;
28 import akka.persistence.DeleteSnapshotsFailure;
29 import akka.persistence.DeleteSnapshotsSuccess;
30 import akka.persistence.RecoveryCompleted;
31 import akka.persistence.SaveSnapshotFailure;
32 import akka.persistence.SaveSnapshotSuccess;
33 import akka.persistence.SnapshotOffer;
34 import akka.persistence.SnapshotSelectionCriteria;
35 import akka.util.Timeout;
36 import com.google.common.annotations.VisibleForTesting;
37 import com.google.common.base.Preconditions;
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.Collections;
41 import java.util.HashMap;
42 import java.util.HashSet;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Map.Entry;
46 import java.util.Set;
47 import java.util.concurrent.CountDownLatch;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.TimeoutException;
50 import java.util.function.Consumer;
51 import java.util.function.Supplier;
52 import org.opendaylight.controller.cluster.access.concepts.MemberName;
53 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
54 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
55 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
56 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
57 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
58 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
59 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
60 import org.opendaylight.controller.cluster.datastore.Shard;
61 import org.opendaylight.controller.cluster.datastore.config.Configuration;
62 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
63 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
64 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
65 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
66 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
67 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
68 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
69 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
70 import org.opendaylight.controller.cluster.datastore.messages.AddPrefixShardReplica;
71 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
72 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
73 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
74 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
75 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
76 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
77 import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
78 import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
79 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
80 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
81 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
82 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
83 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
84 import org.opendaylight.controller.cluster.datastore.messages.RemovePrefixShardReplica;
85 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
86 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
87 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
88 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
89 import org.opendaylight.controller.cluster.datastore.persisted.ShardManagerSnapshot;
90 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
91 import org.opendaylight.controller.cluster.datastore.utils.CompositeOnComplete;
92 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
93 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
94 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
95 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
96 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
97 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
98 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
99 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
100 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
101 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
102 import org.opendaylight.controller.cluster.raft.messages.AddServer;
103 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
104 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
105 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
106 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
107 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
108 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
109 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
110 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
111 import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
112 import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
113 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
114 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
115 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
116 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
117 import org.opendaylight.yangtools.concepts.ListenerRegistration;
118 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
119 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
120 import org.slf4j.Logger;
121 import org.slf4j.LoggerFactory;
122 import scala.concurrent.ExecutionContext;
123 import scala.concurrent.Future;
124 import scala.concurrent.duration.Duration;
125 import scala.concurrent.duration.FiniteDuration;
126
127 /**
128  * Manages the shards for a data store. The ShardManager has the following jobs:
129  * <ul>
130  * <li> Create all the local shard replicas that belong on this cluster member
131  * <li> Find the address of the local shard
132  * <li> Find the primary replica for any given shard
133  * <li> Monitor the cluster members and store their addresses
134  * </ul>
135  */
136 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
137     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
138
139     // Stores a mapping between a shard name and it's corresponding information
140     // Shard names look like inventory, topology etc and are as specified in
141     // configuration
142     private final Map<String, ShardInformation> localShards = new HashMap<>();
143
144     // The type of a ShardManager reflects the type of the datastore itself
145     // A data store could be of type config/operational
146     private final String type;
147
148     private final ClusterWrapper cluster;
149
150     private final Configuration configuration;
151
152     private final String shardDispatcherPath;
153
154     private final ShardManagerInfo shardManagerMBean;
155
156     private DatastoreContextFactory datastoreContextFactory;
157
158     private final CountDownLatch waitTillReadyCountdownLatch;
159
160     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
161
162     private final ShardPeerAddressResolver peerAddressResolver;
163
164     private SchemaContext schemaContext;
165
166     private DatastoreSnapshot restoreFromSnapshot;
167
168     private ShardManagerSnapshot currentSnapshot;
169
170     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
171
172     private final Map<String, CompositeOnComplete<Boolean>> shardActorsStopping = new HashMap<>();
173
174     private final String persistenceId;
175     private final AbstractDataStore dataStore;
176
177     private ListenerRegistration<DOMDataTreeChangeListener> configListenerReg = null;
178     private PrefixedShardConfigUpdateHandler configUpdateHandler;
179
180     ShardManager(final AbstractShardManagerCreator<?> builder) {
181         this.cluster = builder.getCluster();
182         this.configuration = builder.getConfiguration();
183         this.datastoreContextFactory = builder.getDatastoreContextFactory();
184         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
185         this.shardDispatcherPath =
186                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
187         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountDownLatch();
188         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
189         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
190
191         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
192         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
193
194         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
195
196         // Subscribe this actor to cluster member events
197         cluster.subscribeToMemberEvents(getSelf());
198
199         shardManagerMBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(),
200                 "shard-manager-" + this.type,
201                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
202         shardManagerMBean.registerMBean();
203
204         dataStore = builder.getDistributedDataStore();
205     }
206
207     @Override
208     public void preStart() {
209         LOG.info("Starting ShardManager {}", persistenceId);
210     }
211
212     @Override
213     public void postStop() {
214         LOG.info("Stopping ShardManager {}", persistenceId());
215
216         shardManagerMBean.unregisterMBean();
217
218         if (configListenerReg != null) {
219             configListenerReg.close();
220             configListenerReg = null;
221         }
222     }
223
224     @Override
225     public void handleCommand(final Object message) throws Exception {
226         if (message  instanceof FindPrimary) {
227             findPrimary((FindPrimary)message);
228         } else if (message instanceof FindLocalShard) {
229             findLocalShard((FindLocalShard) message);
230         } else if (message instanceof UpdateSchemaContext) {
231             updateSchemaContext(message);
232         } else if (message instanceof ActorInitialized) {
233             onActorInitialized(message);
234         } else if (message instanceof ClusterEvent.MemberUp) {
235             memberUp((ClusterEvent.MemberUp) message);
236         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
237             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
238         } else if (message instanceof ClusterEvent.MemberExited) {
239             memberExited((ClusterEvent.MemberExited) message);
240         } else if (message instanceof ClusterEvent.MemberRemoved) {
241             memberRemoved((ClusterEvent.MemberRemoved) message);
242         } else if (message instanceof ClusterEvent.UnreachableMember) {
243             memberUnreachable((ClusterEvent.UnreachableMember) message);
244         } else if (message instanceof ClusterEvent.ReachableMember) {
245             memberReachable((ClusterEvent.ReachableMember) message);
246         } else if (message instanceof DatastoreContextFactory) {
247             onDatastoreContextFactory((DatastoreContextFactory) message);
248         } else if (message instanceof RoleChangeNotification) {
249             onRoleChangeNotification((RoleChangeNotification) message);
250         } else if (message instanceof FollowerInitialSyncUpStatus) {
251             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
252         } else if (message instanceof ShardNotInitializedTimeout) {
253             onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
254         } else if (message instanceof ShardLeaderStateChanged) {
255             onLeaderStateChanged((ShardLeaderStateChanged) message);
256         } else if (message instanceof SwitchShardBehavior) {
257             onSwitchShardBehavior((SwitchShardBehavior) message);
258         } else if (message instanceof CreateShard) {
259             onCreateShard((CreateShard)message);
260         } else if (message instanceof AddShardReplica) {
261             onAddShardReplica((AddShardReplica) message);
262         } else if (message instanceof AddPrefixShardReplica) {
263             onAddPrefixShardReplica((AddPrefixShardReplica) message);
264         } else if (message instanceof PrefixShardCreated) {
265             onPrefixShardCreated((PrefixShardCreated) message);
266         } else if (message instanceof PrefixShardRemoved) {
267             onPrefixShardRemoved((PrefixShardRemoved) message);
268         } else if (message instanceof InitConfigListener) {
269             onInitConfigListener();
270         } else if (message instanceof ForwardedAddServerReply) {
271             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
272             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
273                     msg.removeShardOnFailure);
274         } else if (message instanceof ForwardedAddServerFailure) {
275             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
276             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
277         } else if (message instanceof RemoveShardReplica) {
278             onRemoveShardReplica((RemoveShardReplica) message);
279         } else if (message instanceof RemovePrefixShardReplica) {
280             onRemovePrefixShardReplica((RemovePrefixShardReplica) message);
281         } else if (message instanceof WrappedShardResponse) {
282             onWrappedShardResponse((WrappedShardResponse) message);
283         } else if (message instanceof GetSnapshot) {
284             onGetSnapshot();
285         } else if (message instanceof ServerRemoved) {
286             onShardReplicaRemoved((ServerRemoved) message);
287         } else if (message instanceof ChangeShardMembersVotingStatus) {
288             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
289         } else if (message instanceof FlipShardMembersVotingStatus) {
290             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
291         } else if (message instanceof SaveSnapshotSuccess) {
292             onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
293         } else if (message instanceof SaveSnapshotFailure) {
294             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
295                     ((SaveSnapshotFailure) message).cause());
296         } else if (message instanceof Shutdown) {
297             onShutDown();
298         } else if (message instanceof GetLocalShardIds) {
299             onGetLocalShardIds();
300         } else if (message instanceof GetShardRole) {
301             onGetShardRole((GetShardRole) message);
302         } else if (message instanceof RunnableMessage) {
303             ((RunnableMessage)message).run();
304         } else if (message instanceof DeleteSnapshotsFailure) {
305             LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
306                     ((DeleteSnapshotsFailure) message).cause());
307         } else if (message instanceof DeleteSnapshotsSuccess) {
308             LOG.debug("{}: Successfully deleted prior snapshots", persistenceId(), message);
309         } else if (message instanceof RegisterRoleChangeListenerReply) {
310             LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
311         } else if (message instanceof ClusterEvent.MemberEvent) {
312             LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message);
313         } else {
314             unknownMessage(message);
315         }
316     }
317
318     private void onGetShardRole(final GetShardRole message) {
319         LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
320
321         final String name = message.getName();
322
323         final ShardInformation shardInformation = localShards.get(name);
324
325         if (shardInformation == null) {
326             LOG.info("{}: no shard information for {} found", persistenceId(), name);
327             getSender().tell(new Status.Failure(
328                     new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender());
329             return;
330         }
331
332         getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
333     }
334
335     private void onInitConfigListener() {
336         LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
337
338         final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType =
339                 org.opendaylight.mdsal.common.api.LogicalDatastoreType
340                         .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
341
342         if (configUpdateHandler != null) {
343             configUpdateHandler.close();
344         }
345
346         configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
347         configUpdateHandler.initListener(dataStore, datastoreType);
348     }
349
350     private void onShutDown() {
351         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
352         for (ShardInformation info : localShards.values()) {
353             if (info.getActor() != null) {
354                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
355
356                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig()
357                         .getElectionTimeOutInterval().$times(2);
358                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
359             }
360         }
361
362         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
363
364         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers())
365                 .getDispatcher(Dispatchers.DispatcherType.Client);
366         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
367
368         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
369             @Override
370             public void onComplete(final Throwable failure, final Iterable<Boolean> results) {
371                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
372
373                 self().tell(PoisonPill.getInstance(), self());
374
375                 if (failure != null) {
376                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
377                 } else {
378                     int nfailed = 0;
379                     for (Boolean result : results) {
380                         if (!result) {
381                             nfailed++;
382                         }
383                     }
384
385                     if (nfailed > 0) {
386                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
387                     }
388                 }
389             }
390         }, dispatcher);
391     }
392
393     private void onWrappedShardResponse(final WrappedShardResponse message) {
394         if (message.getResponse() instanceof RemoveServerReply) {
395             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
396                     message.getLeaderPath());
397         }
398     }
399
400     private void onRemoveServerReply(final ActorRef originalSender, final ShardIdentifier shardId,
401             final RemoveServerReply replyMsg, final String leaderPath) {
402         shardReplicaOperationsInProgress.remove(shardId.getShardName());
403
404         LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
405
406         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
407             LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
408                     shardId.getShardName());
409             originalSender.tell(new Status.Success(null), getSelf());
410         } else {
411             LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
412                     persistenceId(), shardId, replyMsg.getStatus());
413
414             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
415             originalSender.tell(new Status.Failure(failure), getSelf());
416         }
417     }
418
419     private void removePrefixShardReplica(final RemovePrefixShardReplica contextMessage, final String shardName,
420                                           final String primaryPath, final ActorRef sender) {
421         if (isShardReplicaOperationInProgress(shardName, sender)) {
422             return;
423         }
424
425         shardReplicaOperationsInProgress.add(shardName);
426
427         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
428
429         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
430
431         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
432         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
433                 primaryPath, shardId);
434
435         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
436         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
437                 new RemoveServer(shardId.toString()), removeServerTimeout);
438
439         futureObj.onComplete(new OnComplete<Object>() {
440             @Override
441             public void onComplete(final Throwable failure, final Object response) {
442                 if (failure != null) {
443                     shardReplicaOperationsInProgress.remove(shardName);
444                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
445                             primaryPath, shardName);
446
447                     LOG.debug("{}: {}", persistenceId(), msg, failure);
448
449                     // FAILURE
450                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
451                 } else {
452                     // SUCCESS
453                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
454                 }
455             }
456         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
457     }
458
459     private void removeShardReplica(final RemoveShardReplica contextMessage, final String shardName,
460             final String primaryPath, final ActorRef sender) {
461         if (isShardReplicaOperationInProgress(shardName, sender)) {
462             return;
463         }
464
465         shardReplicaOperationsInProgress.add(shardName);
466
467         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
468
469         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
470
471         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
472         LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
473                 primaryPath, shardId);
474
475         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
476         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
477                 new RemoveServer(shardId.toString()), removeServerTimeout);
478
479         futureObj.onComplete(new OnComplete<Object>() {
480             @Override
481             public void onComplete(final Throwable failure, final Object response) {
482                 if (failure != null) {
483                     shardReplicaOperationsInProgress.remove(shardName);
484                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
485                             primaryPath, shardName);
486
487                     LOG.debug("{}: {}", persistenceId(), msg, failure);
488
489                     // FAILURE
490                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
491                 } else {
492                     // SUCCESS
493                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
494                 }
495             }
496         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
497     }
498
499     private void onShardReplicaRemoved(final ServerRemoved message) {
500         removeShard(new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build());
501     }
502
503     @SuppressWarnings("checkstyle:IllegalCatch")
504     private void removeShard(final ShardIdentifier shardId) {
505         final String shardName = shardId.getShardName();
506         final ShardInformation shardInformation = localShards.remove(shardName);
507         if (shardInformation == null) {
508             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
509             return;
510         }
511
512         final ActorRef shardActor = shardInformation.getActor();
513         if (shardActor != null) {
514             long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
515                     .getElectionTimeOutInterval().$times(3).toMillis(), 10000);
516
517             LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
518                     timeoutInMS);
519
520             final Future<Boolean> stopFuture = Patterns.gracefulStop(shardActor,
521                     FiniteDuration.apply(timeoutInMS, TimeUnit.MILLISECONDS), Shutdown.INSTANCE);
522
523             final CompositeOnComplete<Boolean> onComplete = new CompositeOnComplete<Boolean>() {
524                 @Override
525                 public void onComplete(final Throwable failure, final Boolean result) {
526                     if (failure == null) {
527                         LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
528                     } else {
529                         LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
530                     }
531
532                     self().tell((RunnableMessage) () -> {
533                         // At any rate, invalidate primaryShardInfo cache
534                         primaryShardInfoCache.remove(shardName);
535
536                         shardActorsStopping.remove(shardName);
537                         notifyOnCompleteTasks(failure, result);
538                     }, ActorRef.noSender());
539                 }
540             };
541
542             shardActorsStopping.put(shardName, onComplete);
543             stopFuture.onComplete(onComplete, new Dispatchers(context().system().dispatchers())
544                     .getDispatcher(Dispatchers.DispatcherType.Client));
545         }
546
547         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
548         persistShardList();
549     }
550
551     private void onGetSnapshot() {
552         LOG.debug("{}: onGetSnapshot", persistenceId());
553
554         List<String> notInitialized = null;
555         for (ShardInformation shardInfo : localShards.values()) {
556             if (!shardInfo.isShardInitialized()) {
557                 if (notInitialized == null) {
558                     notInitialized = new ArrayList<>();
559                 }
560
561                 notInitialized.add(shardInfo.getShardName());
562             }
563         }
564
565         if (notInitialized != null) {
566             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
567                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
568             return;
569         }
570
571         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
572                 new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
573                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
574
575         for (ShardInformation shardInfo: localShards.values()) {
576             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
577         }
578     }
579
580     @SuppressWarnings("checkstyle:IllegalCatch")
581     private void onCreateShard(final CreateShard createShard) {
582         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
583
584         Object reply;
585         try {
586             String shardName = createShard.getModuleShardConfig().getShardName();
587             if (localShards.containsKey(shardName)) {
588                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
589                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
590             } else {
591                 doCreateShard(createShard);
592                 reply = new Status.Success(null);
593             }
594         } catch (Exception e) {
595             LOG.error("{}: onCreateShard failed", persistenceId(), e);
596             reply = new Status.Failure(e);
597         }
598
599         if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
600             getSender().tell(reply, getSelf());
601         }
602     }
603
604     private void onPrefixShardCreated(final PrefixShardCreated message) {
605         LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
606
607         final PrefixShardConfiguration config = message.getConfiguration();
608         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
609                 ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
610         final String shardName = shardId.getShardName();
611
612         if (isPreviousShardActorStopInProgress(shardName, message)) {
613             return;
614         }
615
616         if (localShards.containsKey(shardName)) {
617             LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
618             final PrefixShardConfiguration existing =
619                     configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
620
621             if (existing != null && existing.equals(config)) {
622                 // we don't have to do nothing here
623                 return;
624             }
625         }
626
627         doCreatePrefixShard(config, shardId, shardName);
628     }
629
630     private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
631         final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
632         if (stopOnComplete == null) {
633             return false;
634         }
635
636         LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
637                 shardName, messageToDefer);
638         final ActorRef sender = getSender();
639         stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
640             @Override
641             public void onComplete(final Throwable failure, final Boolean result) {
642                 LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
643                 self().tell(messageToDefer, sender);
644             }
645         });
646
647         return true;
648     }
649
650     private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId,
651             final String shardName) {
652         configuration.addPrefixShardConfiguration(config);
653
654         final Builder builder = newShardDatastoreContextBuilder(shardName);
655         builder.logicalStoreType(config.getPrefix().getDatastoreType())
656                 .storeRoot(config.getPrefix().getRootIdentifier());
657         DatastoreContext shardDatastoreContext = builder.build();
658
659         final Map<String, String> peerAddresses = getPeerAddresses(shardName);
660         final boolean isActiveMember = true;
661
662         LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
663                 persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
664
665         final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
666                 shardDatastoreContext, Shard.builder(), peerAddressResolver);
667         info.setActiveMember(isActiveMember);
668         localShards.put(info.getShardName(), info);
669
670         if (schemaContext != null) {
671             info.setSchemaContext(schemaContext);
672             info.setActor(newShardActor(info));
673         }
674     }
675
676     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
677         LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
678
679         final DOMDataTreeIdentifier prefix = message.getPrefix();
680         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
681                 ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
682
683         configuration.removePrefixShardConfiguration(prefix);
684         removeShard(shardId);
685     }
686
687     private void doCreateShard(final CreateShard createShard) {
688         final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
689         final String shardName = moduleShardConfig.getShardName();
690
691         configuration.addModuleShardConfiguration(moduleShardConfig);
692
693         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
694         if (shardDatastoreContext == null) {
695             shardDatastoreContext = newShardDatastoreContext(shardName);
696         } else {
697             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
698                     peerAddressResolver).build();
699         }
700
701         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
702
703         boolean shardWasInRecoveredSnapshot = currentSnapshot != null
704                 && currentSnapshot.getShardList().contains(shardName);
705
706         Map<String, String> peerAddresses;
707         boolean isActiveMember;
708         if (shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName)
709                 .contains(cluster.getCurrentMemberName())) {
710             peerAddresses = getPeerAddresses(shardName);
711             isActiveMember = true;
712         } else {
713             // The local member is not in the static shard member configuration and the shard did not
714             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
715             // the shard with no peers and with elections disabled so it stays as follower. A
716             // subsequent AddServer request will be needed to make it an active member.
717             isActiveMember = false;
718             peerAddresses = Collections.emptyMap();
719             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext)
720                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
721         }
722
723         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
724                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
725                 isActiveMember);
726
727         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
728                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
729         info.setActiveMember(isActiveMember);
730         localShards.put(info.getShardName(), info);
731
732         if (schemaContext != null) {
733             info.setSchemaContext(schemaContext);
734             info.setActor(newShardActor(info));
735         }
736     }
737
738     private DatastoreContext.Builder newShardDatastoreContextBuilder(final String shardName) {
739         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName))
740                 .shardPeerAddressResolver(peerAddressResolver);
741     }
742
743     private DatastoreContext newShardDatastoreContext(final String shardName) {
744         return newShardDatastoreContextBuilder(shardName).build();
745     }
746
747     private void checkReady() {
748         if (isReadyWithLeaderId()) {
749             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
750                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
751
752             waitTillReadyCountdownLatch.countDown();
753         }
754     }
755
756     private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) {
757         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
758
759         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
760         if (shardInformation != null) {
761             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
762             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
763             if (shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
764                 primaryShardInfoCache.remove(shardInformation.getShardName());
765             }
766
767             checkReady();
768         } else {
769             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
770         }
771     }
772
773     private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
774         ShardInformation shardInfo = message.getShardInfo();
775
776         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
777                 shardInfo.getShardName());
778
779         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
780
781         if (!shardInfo.isShardInitialized()) {
782             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
783             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
784         } else {
785             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
786             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
787         }
788     }
789
790     private void onFollowerInitialSyncStatus(final FollowerInitialSyncUpStatus status) {
791         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
792                 status.getName(), status.isInitialSyncDone());
793
794         ShardInformation shardInformation = findShardInformation(status.getName());
795
796         if (shardInformation != null) {
797             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
798
799             shardManagerMBean.setSyncStatus(isInSync());
800         }
801
802     }
803
804     private void onRoleChangeNotification(final RoleChangeNotification roleChanged) {
805         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
806                 roleChanged.getOldRole(), roleChanged.getNewRole());
807
808         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
809         if (shardInformation != null) {
810             shardInformation.setRole(roleChanged.getNewRole());
811             checkReady();
812             shardManagerMBean.setSyncStatus(isInSync());
813         }
814     }
815
816
817     private ShardInformation findShardInformation(final String memberId) {
818         for (ShardInformation info : localShards.values()) {
819             if (info.getShardId().toString().equals(memberId)) {
820                 return info;
821             }
822         }
823
824         return null;
825     }
826
827     private boolean isReadyWithLeaderId() {
828         boolean isReady = true;
829         for (ShardInformation info : localShards.values()) {
830             if (!info.isShardReadyWithLeaderId()) {
831                 isReady = false;
832                 break;
833             }
834         }
835         return isReady;
836     }
837
838     private boolean isInSync() {
839         for (ShardInformation info : localShards.values()) {
840             if (!info.isInSync()) {
841                 return false;
842             }
843         }
844         return true;
845     }
846
847     private void onActorInitialized(final Object message) {
848         final ActorRef sender = getSender();
849
850         if (sender == null) {
851             // why is a non-actor sending this message? Just ignore.
852             return;
853         }
854
855         String actorName = sender.path().name();
856         //find shard name from actor name; actor name is stringified shardId
857
858         final ShardIdentifier shardId;
859         try {
860             shardId = ShardIdentifier.fromShardIdString(actorName);
861         } catch (IllegalArgumentException e) {
862             LOG.debug("{}: ignoring actor {}", actorName, e);
863             return;
864         }
865
866         markShardAsInitialized(shardId.getShardName());
867     }
868
869     private void markShardAsInitialized(final String shardName) {
870         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
871
872         ShardInformation shardInformation = localShards.get(shardName);
873         if (shardInformation != null) {
874             shardInformation.setActorInitialized();
875
876             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
877         }
878     }
879
880     @Override
881     protected void handleRecover(final Object message) throws Exception {
882         if (message instanceof RecoveryCompleted) {
883             onRecoveryCompleted();
884         } else if (message instanceof SnapshotOffer) {
885             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
886         }
887     }
888
889     @SuppressWarnings("checkstyle:IllegalCatch")
890     private void onRecoveryCompleted() {
891         LOG.info("Recovery complete : {}", persistenceId());
892
893         if (currentSnapshot == null && restoreFromSnapshot != null
894                 && restoreFromSnapshot.getShardManagerSnapshot() != null) {
895             ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
896
897             LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
898
899             applyShardManagerSnapshot(snapshot);
900         }
901
902         createLocalShards();
903     }
904
905     private void sendResponse(final ShardInformation shardInformation, final boolean doWait,
906             final boolean wantShardReady, final Supplier<Object> messageSupplier) {
907         if (!shardInformation.isShardInitialized() || wantShardReady && !shardInformation.isShardReadyWithLeaderId()) {
908             if (doWait) {
909                 final ActorRef sender = getSender();
910                 final ActorRef self = self();
911
912                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
913
914                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
915                     new OnShardInitialized(replyRunnable);
916
917                 shardInformation.addOnShardInitialized(onShardInitialized);
918
919                 FiniteDuration timeout = shardInformation.getDatastoreContext()
920                         .getShardInitializationTimeout().duration();
921                 if (shardInformation.isShardInitialized()) {
922                     // If the shard is already initialized then we'll wait enough time for the shard to
923                     // elect a leader, ie 2 times the election timeout.
924                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
925                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
926                 }
927
928                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
929                         shardInformation.getShardName());
930
931                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
932                         timeout, getSelf(),
933                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
934                         getContext().dispatcher(), getSelf());
935
936                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
937
938             } else if (!shardInformation.isShardInitialized()) {
939                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
940                         shardInformation.getShardName());
941                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
942             } else {
943                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
944                         shardInformation.getShardName());
945                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
946             }
947
948             return;
949         }
950
951         getSender().tell(messageSupplier.get(), getSelf());
952     }
953
954     private static NoShardLeaderException createNoShardLeaderException(final ShardIdentifier shardId) {
955         return new NoShardLeaderException(null, shardId.toString());
956     }
957
958     private static NotInitializedException createNotInitializedException(final ShardIdentifier shardId) {
959         return new NotInitializedException(String.format(
960                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
961     }
962
963     @VisibleForTesting
964     static MemberName memberToName(final Member member) {
965         return MemberName.forName(member.roles().iterator().next());
966     }
967
968     private void memberRemoved(final ClusterEvent.MemberRemoved message) {
969         MemberName memberName = memberToName(message.member());
970
971         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
972                 message.member().address());
973
974         peerAddressResolver.removePeerAddress(memberName);
975
976         for (ShardInformation info : localShards.values()) {
977             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
978         }
979     }
980
981     private void memberExited(final ClusterEvent.MemberExited message) {
982         MemberName memberName = memberToName(message.member());
983
984         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
985                 message.member().address());
986
987         peerAddressResolver.removePeerAddress(memberName);
988
989         for (ShardInformation info : localShards.values()) {
990             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
991         }
992     }
993
994     private void memberUp(final ClusterEvent.MemberUp message) {
995         MemberName memberName = memberToName(message.member());
996
997         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
998                 message.member().address());
999
1000         memberUp(memberName, message.member().address());
1001     }
1002
1003     private void memberUp(final MemberName memberName, final Address address) {
1004         addPeerAddress(memberName, address);
1005         checkReady();
1006     }
1007
1008     private void memberWeaklyUp(final MemberWeaklyUp message) {
1009         MemberName memberName = memberToName(message.member());
1010
1011         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
1012                 message.member().address());
1013
1014         memberUp(memberName, message.member().address());
1015     }
1016
1017     private void addPeerAddress(final MemberName memberName, final Address address) {
1018         peerAddressResolver.addPeerAddress(memberName, address);
1019
1020         for (ShardInformation info : localShards.values()) {
1021             String shardName = info.getShardName();
1022             String peerId = getShardIdentifier(memberName, shardName).toString();
1023             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
1024
1025             info.peerUp(memberName, peerId, getSelf());
1026         }
1027     }
1028
1029     private void memberReachable(final ClusterEvent.ReachableMember message) {
1030         MemberName memberName = memberToName(message.member());
1031         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
1032
1033         addPeerAddress(memberName, message.member().address());
1034
1035         markMemberAvailable(memberName);
1036     }
1037
1038     private void memberUnreachable(final ClusterEvent.UnreachableMember message) {
1039         MemberName memberName = memberToName(message.member());
1040         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
1041
1042         markMemberUnavailable(memberName);
1043     }
1044
1045     private void markMemberUnavailable(final MemberName memberName) {
1046         for (ShardInformation info : localShards.values()) {
1047             String leaderId = info.getLeaderId();
1048             if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
1049                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
1050                 info.setLeaderAvailable(false);
1051
1052                 primaryShardInfoCache.remove(info.getShardName());
1053             }
1054
1055             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
1056         }
1057     }
1058
1059     private void markMemberAvailable(final MemberName memberName) {
1060         for (ShardInformation info : localShards.values()) {
1061             String leaderId = info.getLeaderId();
1062             if (leaderId != null && ShardIdentifier.fromShardIdString(leaderId).getMemberName().equals(memberName)) {
1063                 LOG.debug("Marking Leader {} as available.", leaderId);
1064                 info.setLeaderAvailable(true);
1065             }
1066
1067             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
1068         }
1069     }
1070
1071     private void onDatastoreContextFactory(final DatastoreContextFactory factory) {
1072         datastoreContextFactory = factory;
1073         for (ShardInformation info : localShards.values()) {
1074             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
1075         }
1076     }
1077
1078     private void onGetLocalShardIds() {
1079         final List<String> response = new ArrayList<>(localShards.size());
1080
1081         for (ShardInformation info : localShards.values()) {
1082             response.add(info.getShardId().toString());
1083         }
1084
1085         getSender().tell(new Status.Success(response), getSelf());
1086     }
1087
1088     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
1089         final ShardIdentifier identifier = message.getShardId();
1090
1091         if (identifier != null) {
1092             final ShardInformation info = localShards.get(identifier.getShardName());
1093             if (info == null) {
1094                 getSender().tell(new Status.Failure(
1095                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
1096                 return;
1097             }
1098
1099             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1100         } else {
1101             for (ShardInformation info : localShards.values()) {
1102                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
1103             }
1104         }
1105
1106         getSender().tell(new Status.Success(null), getSelf());
1107     }
1108
1109     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
1110         final ActorRef actor = info.getActor();
1111         if (actor != null) {
1112             actor.tell(switchBehavior, getSelf());
1113         } else {
1114             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
1115                 info.getShardName(), switchBehavior.getNewState());
1116         }
1117     }
1118
1119     /**
1120      * Notifies all the local shards of a change in the schema context.
1121      *
1122      * @param message the message to send
1123      */
1124     private void updateSchemaContext(final Object message) {
1125         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
1126
1127         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
1128
1129         for (ShardInformation info : localShards.values()) {
1130             info.setSchemaContext(schemaContext);
1131
1132             if (info.getActor() == null) {
1133                 LOG.debug("Creating Shard {}", info.getShardId());
1134                 info.setActor(newShardActor(info));
1135                 // Update peer address for every existing peer memeber to avoid missing sending
1136                 // PeerAddressResolved and PeerUp to this shard while UpdateSchemaContext comes after MemberUp.
1137                 String shardName = info.getShardName();
1138                 for (MemberName memberName : peerAddressResolver.getPeerMembers()) {
1139                     String peerId = getShardIdentifier(memberName, shardName).toString() ;
1140                     String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName);
1141                     info.updatePeerAddress(peerId, peerAddress, getSelf());
1142                     info.peerUp(memberName, peerId, getSelf());
1143                     LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}",
1144                             persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
1145                 }
1146             } else {
1147                 info.getActor().tell(message, getSelf());
1148             }
1149         }
1150     }
1151
1152     @VisibleForTesting
1153     protected ClusterWrapper getCluster() {
1154         return cluster;
1155     }
1156
1157     @VisibleForTesting
1158     protected ActorRef newShardActor(final ShardInformation info) {
1159         return getContext().actorOf(info.newProps().withDispatcher(shardDispatcherPath),
1160                 info.getShardId().toString());
1161     }
1162
1163     private void findPrimary(final FindPrimary message) {
1164         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
1165
1166         final String shardName = message.getShardName();
1167         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
1168
1169         // First see if the there is a local replica for the shard
1170         final ShardInformation info = localShards.get(shardName);
1171         if (info != null && info.isActiveMember()) {
1172             sendResponse(info, message.isWaitUntilReady(), true, () -> {
1173                 String primaryPath = info.getSerializedLeaderActor();
1174                 Object found = canReturnLocalShardState && info.isLeader()
1175                         ? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
1176                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
1177
1178                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
1179                 return found;
1180             });
1181
1182             return;
1183         }
1184
1185         final Collection<String> visitedAddresses;
1186         if (message instanceof RemoteFindPrimary) {
1187             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
1188         } else {
1189             visitedAddresses = new ArrayList<>(1);
1190         }
1191
1192         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
1193
1194         for (String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
1195             if (visitedAddresses.contains(address)) {
1196                 continue;
1197             }
1198
1199             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
1200                     persistenceId(), shardName, address, visitedAddresses);
1201
1202             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
1203                     message.isWaitUntilReady(), visitedAddresses), getContext());
1204             return;
1205         }
1206
1207         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
1208
1209         getSender().tell(new PrimaryNotFoundException(
1210                 String.format("No primary shard found for %s.", shardName)), getSelf());
1211     }
1212
1213     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1214         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1215                 .getShardInitializationTimeout().duration().$times(2));
1216
1217         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1218         futureObj.onComplete(new OnComplete<Object>() {
1219             @Override
1220             public void onComplete(final Throwable failure, final Object response) {
1221                 if (failure != null) {
1222                     handler.onFailure(failure);
1223                 } else {
1224                     if (response instanceof RemotePrimaryShardFound) {
1225                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1226                     } else if (response instanceof LocalPrimaryShardFound) {
1227                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1228                     } else {
1229                         handler.onUnknownResponse(response);
1230                     }
1231                 }
1232             }
1233         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1234     }
1235
1236     /**
1237      * Construct the name of the shard actor given the name of the member on
1238      * which the shard resides and the name of the shard.
1239      *
1240      * @param memberName the member name
1241      * @param shardName the shard name
1242      * @return a b
1243      */
1244     private ShardIdentifier getShardIdentifier(final MemberName memberName, final String shardName) {
1245         return peerAddressResolver.getShardIdentifier(memberName, shardName);
1246     }
1247
1248     /**
1249      * Create shards that are local to the member on which the ShardManager runs.
1250      */
1251     private void createLocalShards() {
1252         MemberName memberName = this.cluster.getCurrentMemberName();
1253         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
1254
1255         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
1256         if (restoreFromSnapshot != null) {
1257             for (DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1258                 shardSnapshots.put(snapshot.getName(), snapshot);
1259             }
1260         }
1261
1262         // null out to GC
1263         restoreFromSnapshot = null;
1264
1265         for (String shardName : memberShardNames) {
1266             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1267
1268             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1269
1270             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1271             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
1272                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
1273                         shardSnapshots.get(shardName)), peerAddressResolver));
1274         }
1275     }
1276
1277     /**
1278      * Given the name of the shard find the addresses of all it's peers.
1279      *
1280      * @param shardName the shard name
1281      */
1282     private Map<String, String> getPeerAddresses(final String shardName) {
1283         final Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1284         return getPeerAddresses(shardName, members);
1285     }
1286
1287     private Map<String, String> getPeerAddresses(final String shardName, final Collection<MemberName> members) {
1288         Map<String, String> peerAddresses = new HashMap<>();
1289         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1290
1291         for (MemberName memberName : members) {
1292             if (!currentMemberName.equals(memberName)) {
1293                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1294                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1295                 peerAddresses.put(shardId.toString(), address);
1296             }
1297         }
1298         return peerAddresses;
1299     }
1300
1301     @Override
1302     public SupervisorStrategy supervisorStrategy() {
1303
1304         return new OneForOneStrategy(10, Duration.create("1 minute"),
1305                 (Function<Throwable, Directive>) t -> {
1306                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1307                     return SupervisorStrategy.resume();
1308                 });
1309     }
1310
1311     @Override
1312     public String persistenceId() {
1313         return persistenceId;
1314     }
1315
1316     @VisibleForTesting
1317     ShardManagerInfoMBean getMBean() {
1318         return shardManagerMBean;
1319     }
1320
1321     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1322         if (shardReplicaOperationsInProgress.contains(shardName)) {
1323             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1324             LOG.debug("{}: {}", persistenceId(), msg);
1325             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1326             return true;
1327         }
1328
1329         return false;
1330     }
1331
1332     private void onAddPrefixShardReplica(final AddPrefixShardReplica message) {
1333         LOG.debug("{}: onAddPrefixShardReplica: {}", persistenceId(), message);
1334
1335         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1336                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1337         final String shardName = shardId.getShardName();
1338
1339         // Create the localShard
1340         if (schemaContext == null) {
1341             String msg = String.format(
1342                     "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1343             LOG.debug("{}: {}", persistenceId(), msg);
1344             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1345             return;
1346         }
1347
1348         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1349                 getSelf()) {
1350             @Override
1351             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1352                 final RunnableMessage runnable = (RunnableMessage) () -> addPrefixShard(getShardName(),
1353                         message.getShardPrefix(), response, getSender());
1354                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1355                     getSelf().tell(runnable, getTargetActor());
1356                 }
1357             }
1358
1359             @Override
1360             public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
1361                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1362             }
1363         });
1364     }
1365
1366     private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
1367         final String shardName = shardReplicaMsg.getShardName();
1368
1369         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1370
1371         // verify the shard with the specified name is present in the cluster configuration
1372         if (!this.configuration.isShardConfigured(shardName)) {
1373             String msg = String.format("No module configuration exists for shard %s", shardName);
1374             LOG.debug("{}: {}", persistenceId(), msg);
1375             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1376             return;
1377         }
1378
1379         // Create the localShard
1380         if (schemaContext == null) {
1381             String msg = String.format(
1382                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1383             LOG.debug("{}: {}", persistenceId(), msg);
1384             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1385             return;
1386         }
1387
1388         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
1389                 getSelf()) {
1390             @Override
1391             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1392                 final RunnableMessage runnable = (RunnableMessage) () ->
1393                     addShard(getShardName(), response, getSender());
1394                 if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
1395                     getSelf().tell(runnable, getTargetActor());
1396                 }
1397             }
1398
1399             @Override
1400             public void onLocalPrimaryFound(final LocalPrimaryShardFound message) {
1401                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1402             }
1403         });
1404     }
1405
1406     private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
1407         String msg = String.format("Local shard %s already exists", shardName);
1408         LOG.debug("{}: {}", persistenceId(), msg);
1409         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1410     }
1411
1412     private void addPrefixShard(final String shardName, final YangInstanceIdentifier shardPrefix,
1413                                 final RemotePrimaryShardFound response, final ActorRef sender) {
1414         if (isShardReplicaOperationInProgress(shardName, sender)) {
1415             return;
1416         }
1417
1418         shardReplicaOperationsInProgress.add(shardName);
1419
1420         final ShardInformation shardInfo;
1421         final boolean removeShardOnFailure;
1422         ShardInformation existingShardInfo = localShards.get(shardName);
1423         if (existingShardInfo == null) {
1424             removeShardOnFailure = true;
1425             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1426
1427             final Builder builder = newShardDatastoreContextBuilder(shardName);
1428             builder.storeRoot(shardPrefix).customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
1429
1430             DatastoreContext datastoreContext = builder.build();
1431
1432             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1433                     Shard.builder(), peerAddressResolver);
1434             shardInfo.setActiveMember(false);
1435             shardInfo.setSchemaContext(schemaContext);
1436             localShards.put(shardName, shardInfo);
1437             shardInfo.setActor(newShardActor(shardInfo));
1438         } else {
1439             removeShardOnFailure = false;
1440             shardInfo = existingShardInfo;
1441         }
1442
1443         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1444     }
1445
1446     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1447         if (isShardReplicaOperationInProgress(shardName, sender)) {
1448             return;
1449         }
1450
1451         shardReplicaOperationsInProgress.add(shardName);
1452
1453         final ShardInformation shardInfo;
1454         final boolean removeShardOnFailure;
1455         ShardInformation existingShardInfo = localShards.get(shardName);
1456         if (existingShardInfo == null) {
1457             removeShardOnFailure = true;
1458             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1459
1460             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName)
1461                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
1462
1463             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1464                     Shard.builder(), peerAddressResolver);
1465             shardInfo.setActiveMember(false);
1466             shardInfo.setSchemaContext(schemaContext);
1467             localShards.put(shardName, shardInfo);
1468             shardInfo.setActor(newShardActor(shardInfo));
1469         } else {
1470             removeShardOnFailure = false;
1471             shardInfo = existingShardInfo;
1472         }
1473
1474         execAddShard(shardName, shardInfo, response, removeShardOnFailure, sender);
1475     }
1476
1477     private void execAddShard(final String shardName,
1478                               final ShardInformation shardInfo,
1479                               final RemotePrimaryShardFound response,
1480                               final boolean removeShardOnFailure,
1481                               final ActorRef sender) {
1482
1483         final String localShardAddress =
1484                 peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1485
1486         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1487         LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1488                 response.getPrimaryPath(), shardInfo.getShardId());
1489
1490         final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
1491                 .getShardLeaderElectionTimeout().duration());
1492         final Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1493                 new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1494
1495         futureObj.onComplete(new OnComplete<Object>() {
1496             @Override
1497             public void onComplete(final Throwable failure, final Object addServerResponse) {
1498                 if (failure != null) {
1499                     LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
1500                             response.getPrimaryPath(), shardName, failure);
1501
1502                     final String msg = String.format("AddServer request to leader %s for shard %s failed",
1503                             response.getPrimaryPath(), shardName);
1504                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1505                 } else {
1506                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1507                             response.getPrimaryPath(), removeShardOnFailure), sender);
1508                 }
1509             }
1510         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1511     }
1512
1513     private void onAddServerFailure(final String shardName, final String message, final Throwable failure,
1514             final ActorRef sender, final boolean removeShardOnFailure) {
1515         shardReplicaOperationsInProgress.remove(shardName);
1516
1517         if (removeShardOnFailure) {
1518             ShardInformation shardInfo = localShards.remove(shardName);
1519             if (shardInfo.getActor() != null) {
1520                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1521             }
1522         }
1523
1524         sender.tell(new Status.Failure(message == null ? failure :
1525             new RuntimeException(message, failure)), getSelf());
1526     }
1527
1528     private void onAddServerReply(final ShardInformation shardInfo, final AddServerReply replyMsg,
1529             final ActorRef sender, final String leaderPath, final boolean removeShardOnFailure) {
1530         String shardName = shardInfo.getShardName();
1531         shardReplicaOperationsInProgress.remove(shardName);
1532
1533         LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1534
1535         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1536             LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1537
1538             // Make the local shard voting capable
1539             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1540             shardInfo.setActiveMember(true);
1541             persistShardList();
1542
1543             sender.tell(new Status.Success(null), getSelf());
1544         } else if (replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1545             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1546         } else {
1547             LOG.warn("{}: Leader failed to add shard replica {} with status {}",
1548                     persistenceId(), shardName, replyMsg.getStatus());
1549
1550             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
1551                     shardInfo.getShardId());
1552
1553             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1554         }
1555     }
1556
1557     private static Exception getServerChangeException(final Class<?> serverChange,
1558             final ServerChangeStatus serverChangeStatus, final String leaderPath, final ShardIdentifier shardId) {
1559         switch (serverChangeStatus) {
1560             case TIMEOUT:
1561                 return new TimeoutException(String.format(
1562                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s."
1563                         + "Possible causes - there was a problem replicating the data or shard leadership changed "
1564                         + "while replicating the shard data", leaderPath, shardId.getShardName()));
1565             case NO_LEADER:
1566                 return createNoShardLeaderException(shardId);
1567             case NOT_SUPPORTED:
1568                 return new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1569                         serverChange.getSimpleName(), shardId.getShardName()));
1570             default :
1571                 return new RuntimeException(String.format("%s request to leader %s for shard %s failed with status %s",
1572                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1573         }
1574     }
1575
1576     private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
1577         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1578
1579         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1580                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1581             @Override
1582             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1583                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1584             }
1585
1586             @Override
1587             public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
1588                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1589             }
1590
1591             private void doRemoveShardReplicaAsync(final String primaryPath) {
1592                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(),
1593                         primaryPath, getSender()), getTargetActor());
1594             }
1595         });
1596     }
1597
1598     private void onRemovePrefixShardReplica(final RemovePrefixShardReplica message) {
1599         LOG.debug("{}: onRemovePrefixShardReplica: {}", persistenceId(), message);
1600
1601         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
1602                 ClusterUtils.getCleanShardName(message.getShardPrefix()));
1603         final String shardName = shardId.getShardName();
1604
1605         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(),
1606                 shardName, persistenceId(), getSelf()) {
1607             @Override
1608             public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
1609                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1610             }
1611
1612             @Override
1613             public void onLocalPrimaryFound(final LocalPrimaryShardFound response) {
1614                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1615             }
1616
1617             private void doRemoveShardReplicaAsync(final String primaryPath) {
1618                 getSelf().tell((RunnableMessage) () -> removePrefixShardReplica(message, getShardName(),
1619                         primaryPath, getSender()), getTargetActor());
1620             }
1621         });
1622     }
1623
1624     private void persistShardList() {
1625         List<String> shardList = new ArrayList<>(localShards.keySet());
1626         for (ShardInformation shardInfo : localShards.values()) {
1627             if (!shardInfo.isActiveMember()) {
1628                 shardList.remove(shardInfo.getShardName());
1629             }
1630         }
1631         LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
1632         saveSnapshot(updateShardManagerSnapshot(shardList, configuration.getAllPrefixShardConfigurations()));
1633     }
1634
1635     private ShardManagerSnapshot updateShardManagerSnapshot(
1636             final List<String> shardList,
1637             final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> allPrefixShardConfigurations) {
1638         currentSnapshot = new ShardManagerSnapshot(shardList, allPrefixShardConfigurations);
1639         return currentSnapshot;
1640     }
1641
1642     private void applyShardManagerSnapshot(final ShardManagerSnapshot snapshot) {
1643         currentSnapshot = snapshot;
1644
1645         LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1646
1647         final MemberName currentMember = cluster.getCurrentMemberName();
1648         Set<String> configuredShardList =
1649             new HashSet<>(configuration.getMemberShardNames(currentMember));
1650         for (String shard : currentSnapshot.getShardList()) {
1651             if (!configuredShardList.contains(shard)) {
1652                 // add the current member as a replica for the shard
1653                 LOG.debug("{}: adding shard {}", persistenceId(), shard);
1654                 configuration.addMemberReplicaForShard(shard, currentMember);
1655             } else {
1656                 configuredShardList.remove(shard);
1657             }
1658         }
1659         for (String shard : configuredShardList) {
1660             // remove the member as a replica for the shard
1661             LOG.debug("{}: removing shard {}", persistenceId(), shard);
1662             configuration.removeMemberReplicaForShard(shard, currentMember);
1663         }
1664     }
1665
1666     private void onSaveSnapshotSuccess(final SaveSnapshotSuccess successMessage) {
1667         LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1668             persistenceId());
1669         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1670             0, 0));
1671     }
1672
1673     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1674         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1675
1676         String shardName = changeMembersVotingStatus.getShardName();
1677         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1678         for (Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1679             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1680                     e.getValue());
1681         }
1682
1683         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1684
1685         findLocalShard(shardName, getSender(),
1686             localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1687             localShardFound.getPath(), getSender()));
1688     }
1689
1690     private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) {
1691         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1692
1693         ActorRef sender = getSender();
1694         final String shardName = flipMembersVotingStatus.getShardName();
1695         findLocalShard(shardName, sender, localShardFound -> {
1696             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1697                     Timeout.apply(30, TimeUnit.SECONDS));
1698
1699             future.onComplete(new OnComplete<Object>() {
1700                 @Override
1701                 public void onComplete(final Throwable failure, final Object response) {
1702                     if (failure != null) {
1703                         sender.tell(new Status.Failure(new RuntimeException(
1704                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1705                         return;
1706                     }
1707
1708                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1709                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1710                     for (Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1711                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1712                     }
1713
1714                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName)
1715                             .toString(), !raftState.isVoting());
1716
1717                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1718                             shardName, localShardFound.getPath(), sender);
1719                 }
1720             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1721         });
1722
1723     }
1724
1725     private void findLocalShard(final FindLocalShard message) {
1726         LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
1727
1728         final ShardInformation shardInformation = localShards.get(message.getShardName());
1729
1730         if (shardInformation == null) {
1731             LOG.debug("{}: Local shard {} not found - shards present: {}",
1732                     persistenceId(), message.getShardName(), localShards.keySet());
1733
1734             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
1735             return;
1736         }
1737
1738         sendResponse(shardInformation, message.isWaitUntilInitialized(), false,
1739             () -> new LocalShardFound(shardInformation.getActor()));
1740     }
1741
1742     private void findLocalShard(final String shardName, final ActorRef sender,
1743             final Consumer<LocalShardFound> onLocalShardFound) {
1744         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext()
1745                 .getShardInitializationTimeout().duration().$times(2));
1746
1747         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1748         futureObj.onComplete(new OnComplete<Object>() {
1749             @Override
1750             public void onComplete(final Throwable failure, final Object response) {
1751                 if (failure != null) {
1752                     LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
1753                             failure);
1754                     sender.tell(new Status.Failure(new RuntimeException(
1755                             String.format("Failed to find local shard %s", shardName), failure)), self());
1756                 } else {
1757                     if (response instanceof LocalShardFound) {
1758                         getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
1759                                 sender);
1760                     } else if (response instanceof LocalShardNotFound) {
1761                         String msg = String.format("Local shard %s does not exist", shardName);
1762                         LOG.debug("{}: {}", persistenceId, msg);
1763                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
1764                     } else {
1765                         String msg = String.format("Failed to find local shard %s: received response: %s",
1766                                 shardName, response);
1767                         LOG.debug("{}: {}", persistenceId, msg);
1768                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1769                                 new RuntimeException(msg)), self());
1770                     }
1771                 }
1772             }
1773         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1774     }
1775
1776     private void changeShardMembersVotingStatus(final ChangeServersVotingStatus changeServersVotingStatus,
1777             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1778         if (isShardReplicaOperationInProgress(shardName, sender)) {
1779             return;
1780         }
1781
1782         shardReplicaOperationsInProgress.add(shardName);
1783
1784         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1785         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1786
1787         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1788                 changeServersVotingStatus, shardActorRef.path());
1789
1790         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1791         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1792
1793         futureObj.onComplete(new OnComplete<Object>() {
1794             @Override
1795             public void onComplete(final Throwable failure, final Object response) {
1796                 shardReplicaOperationsInProgress.remove(shardName);
1797                 if (failure != null) {
1798                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
1799                             shardActorRef.path());
1800                     LOG.debug("{}: {}", persistenceId(), msg, failure);
1801                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
1802                 } else {
1803                     LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1804
1805                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1806                     if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1807                         LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1808                         sender.tell(new Status.Success(null), getSelf());
1809                     } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1810                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1811                                 "The requested voting state change for shard %s is invalid. At least one member "
1812                                 + "must be voting", shardId.getShardName()))), getSelf());
1813                     } else {
1814                         LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1815                                 persistenceId(), shardName, replyMsg.getStatus());
1816
1817                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1818                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1819                         sender.tell(new Status.Failure(error), getSelf());
1820                     }
1821                 }
1822             }
1823         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1824     }
1825
1826     private static final class ForwardedAddServerReply {
1827         ShardInformation shardInfo;
1828         AddServerReply addServerReply;
1829         String leaderPath;
1830         boolean removeShardOnFailure;
1831
1832         ForwardedAddServerReply(final ShardInformation shardInfo, final AddServerReply addServerReply,
1833             final String leaderPath, final boolean removeShardOnFailure) {
1834             this.shardInfo = shardInfo;
1835             this.addServerReply = addServerReply;
1836             this.leaderPath = leaderPath;
1837             this.removeShardOnFailure = removeShardOnFailure;
1838         }
1839     }
1840
1841     private static final class ForwardedAddServerFailure {
1842         String shardName;
1843         String failureMessage;
1844         Throwable failure;
1845         boolean removeShardOnFailure;
1846
1847         ForwardedAddServerFailure(final String shardName, final String failureMessage, final Throwable failure,
1848                 final boolean removeShardOnFailure) {
1849             this.shardName = shardName;
1850             this.failureMessage = failureMessage;
1851             this.failure = failure;
1852             this.removeShardOnFailure = removeShardOnFailure;
1853         }
1854     }
1855
1856     static class OnShardInitialized {
1857         private final Runnable replyRunnable;
1858         private Cancellable timeoutSchedule;
1859
1860         OnShardInitialized(final Runnable replyRunnable) {
1861             this.replyRunnable = replyRunnable;
1862         }
1863
1864         Runnable getReplyRunnable() {
1865             return replyRunnable;
1866         }
1867
1868         Cancellable getTimeoutSchedule() {
1869             return timeoutSchedule;
1870         }
1871
1872         void setTimeoutSchedule(final Cancellable timeoutSchedule) {
1873             this.timeoutSchedule = timeoutSchedule;
1874         }
1875     }
1876
1877     static class OnShardReady extends OnShardInitialized {
1878         OnShardReady(final Runnable replyRunnable) {
1879             super(replyRunnable);
1880         }
1881     }
1882
1883     private interface RunnableMessage extends Runnable {
1884     }
1885
1886     /**
1887      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1888      * a remote or local find primary message is processed.
1889      */
1890     private interface FindPrimaryResponseHandler {
1891         /**
1892          * Invoked when a Failure message is received as a response.
1893          *
1894          * @param failure the failure exception
1895          */
1896         void onFailure(Throwable failure);
1897
1898         /**
1899          * Invoked when a RemotePrimaryShardFound response is received.
1900          *
1901          * @param response the response
1902          */
1903         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1904
1905         /**
1906          * Invoked when a LocalPrimaryShardFound response is received.
1907          *
1908          * @param response the response
1909          */
1910         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1911
1912         /**
1913          * Invoked when an unknown response is received. This is another type of failure.
1914          *
1915          * @param response the response
1916          */
1917         void onUnknownResponse(Object response);
1918     }
1919
1920     /**
1921      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1922      * replica and sends a wrapped Failure response to some targetActor.
1923      */
1924     private abstract static class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1925         private final ActorRef targetActor;
1926         private final String shardName;
1927         private final String persistenceId;
1928         private final ActorRef shardManagerActor;
1929
1930         /**
1931          * Constructs an instance.
1932          *
1933          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1934          * @param shardName The name of the shard for which the primary replica had to be found
1935          * @param persistenceId The persistenceId for the ShardManager
1936          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1937          */
1938         protected AutoFindPrimaryFailureResponseHandler(final ActorRef targetActor, final String shardName,
1939                 final String persistenceId, final ActorRef shardManagerActor) {
1940             this.targetActor = Preconditions.checkNotNull(targetActor);
1941             this.shardName = Preconditions.checkNotNull(shardName);
1942             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1943             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1944         }
1945
1946         public ActorRef getTargetActor() {
1947             return targetActor;
1948         }
1949
1950         public String getShardName() {
1951             return shardName;
1952         }
1953
1954         @Override
1955         public void onFailure(final Throwable failure) {
1956             LOG.debug("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1957             targetActor.tell(new Status.Failure(new RuntimeException(
1958                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1959         }
1960
1961         @Override
1962         public void onUnknownResponse(final Object response) {
1963             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1964                     shardName, response);
1965             LOG.debug("{}: {}", persistenceId, msg);
1966             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1967                     new RuntimeException(msg)), shardManagerActor);
1968         }
1969     }
1970
1971     /**
1972      * The WrappedShardResponse class wraps a response from a Shard.
1973      */
1974     private static final class WrappedShardResponse {
1975         private final ShardIdentifier shardId;
1976         private final Object response;
1977         private final String leaderPath;
1978
1979         WrappedShardResponse(final ShardIdentifier shardId, final Object response, final String leaderPath) {
1980             this.shardId = shardId;
1981             this.response = response;
1982             this.leaderPath = leaderPath;
1983         }
1984
1985         ShardIdentifier getShardId() {
1986             return shardId;
1987         }
1988
1989         Object getResponse() {
1990             return response;
1991         }
1992
1993         String getLeaderPath() {
1994             return leaderPath;
1995         }
1996     }
1997
1998     private static final class ShardNotInitializedTimeout {
1999         private final ActorRef sender;
2000         private final ShardInformation shardInfo;
2001         private final OnShardInitialized onShardInitialized;
2002
2003         ShardNotInitializedTimeout(final ShardInformation shardInfo, final OnShardInitialized onShardInitialized,
2004             final ActorRef sender) {
2005             this.sender = sender;
2006             this.shardInfo = shardInfo;
2007             this.onShardInitialized = onShardInitialized;
2008         }
2009
2010         ActorRef getSender() {
2011             return sender;
2012         }
2013
2014         ShardInformation getShardInfo() {
2015             return shardInfo;
2016         }
2017
2018         OnShardInitialized getOnShardInitialized() {
2019             return onShardInitialized;
2020         }
2021     }
2022 }
2023
2024
2025