BUG 2817 : Handle ServerRemoved message in Shard/ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Function;
23 import akka.persistence.RecoveryCompleted;
24 import akka.persistence.SaveSnapshotFailure;
25 import akka.persistence.SaveSnapshotSuccess;
26 import akka.persistence.SnapshotOffer;
27 import akka.serialization.Serialization;
28 import akka.util.Timeout;
29 import com.google.common.annotations.VisibleForTesting;
30 import com.google.common.base.Objects;
31 import com.google.common.base.Optional;
32 import com.google.common.base.Preconditions;
33 import com.google.common.base.Strings;
34 import com.google.common.base.Supplier;
35 import com.google.common.collect.Sets;
36 import java.io.Serializable;
37 import java.util.ArrayList;
38 import java.util.Collection;
39 import java.util.Collections;
40 import java.util.HashMap;
41 import java.util.HashSet;
42 import java.util.Iterator;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Set;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import javax.annotation.Nullable;
50 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
51 import org.opendaylight.controller.cluster.datastore.config.Configuration;
52 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
53 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
57 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
58 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
59 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
60 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
61 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
62 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
63 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
64 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
65 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
66 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
67 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
68 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
69 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
70 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
71 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
72 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
73 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
74 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
75 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
76 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
77 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
78 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
79 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
80 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
81 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
82 import org.opendaylight.controller.cluster.raft.RaftState;
83 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
84 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
85 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
86 import org.opendaylight.controller.cluster.raft.messages.AddServer;
87 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
88 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
89 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
90 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
91 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
92 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
93 import org.slf4j.Logger;
94 import org.slf4j.LoggerFactory;
95 import scala.concurrent.Future;
96 import scala.concurrent.duration.Duration;
97 import scala.concurrent.duration.FiniteDuration;
98
99 /**
100  * The ShardManager has the following jobs,
101  * <ul>
102  * <li> Create all the local shard replicas that belong on this cluster member
103  * <li> Find the address of the local shard
104  * <li> Find the primary replica for any given shard
105  * <li> Monitor the cluster members and store their addresses
106  * <ul>
107  */
108 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
109
110     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
111
112     // Stores a mapping between a shard name and it's corresponding information
113     // Shard names look like inventory, topology etc and are as specified in
114     // configuration
115     private final Map<String, ShardInformation> localShards = new HashMap<>();
116
117     // The type of a ShardManager reflects the type of the datastore itself
118     // A data store could be of type config/operational
119     private final String type;
120
121     private final ClusterWrapper cluster;
122
123     private final Configuration configuration;
124
125     private final String shardDispatcherPath;
126
127     private final ShardManagerInfo mBean;
128
129     private DatastoreContextFactory datastoreContextFactory;
130
131     private final CountDownLatch waitTillReadyCountdownLatch;
132
133     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
134
135     private final ShardPeerAddressResolver peerAddressResolver;
136
137     private SchemaContext schemaContext;
138
139     private DatastoreSnapshot restoreFromSnapshot;
140
141     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
142
143     private final String persistenceId;
144
145     /**
146      */
147     protected ShardManager(Builder builder) {
148
149         this.cluster = builder.cluster;
150         this.configuration = builder.configuration;
151         this.datastoreContextFactory = builder.datastoreContextFactory;
152         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
153         this.shardDispatcherPath =
154                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
155         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
156         this.primaryShardInfoCache = builder.primaryShardInfoCache;
157         this.restoreFromSnapshot = builder.restoreFromSnapshot;
158
159         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
160         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
161
162         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
163
164         // Subscribe this actor to cluster member events
165         cluster.subscribeToMemberEvents(getSelf());
166
167         List<String> localShardActorNames = new ArrayList<>();
168         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
169                 "shard-manager-" + this.type,
170                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
171                 localShardActorNames);
172         mBean.setShardManager(this);
173     }
174
175     @Override
176     public void postStop() {
177         LOG.info("Stopping ShardManager");
178
179         mBean.unregisterMBean();
180     }
181
182     @Override
183     public void handleCommand(Object message) throws Exception {
184         if (message  instanceof FindPrimary) {
185             findPrimary((FindPrimary)message);
186         } else if(message instanceof FindLocalShard){
187             findLocalShard((FindLocalShard) message);
188         } else if (message instanceof UpdateSchemaContext) {
189             updateSchemaContext(message);
190         } else if(message instanceof ActorInitialized) {
191             onActorInitialized(message);
192         } else if (message instanceof ClusterEvent.MemberUp){
193             memberUp((ClusterEvent.MemberUp) message);
194         } else if (message instanceof ClusterEvent.MemberExited){
195             memberExited((ClusterEvent.MemberExited) message);
196         } else if(message instanceof ClusterEvent.MemberRemoved) {
197             memberRemoved((ClusterEvent.MemberRemoved) message);
198         } else if(message instanceof ClusterEvent.UnreachableMember) {
199             memberUnreachable((ClusterEvent.UnreachableMember)message);
200         } else if(message instanceof ClusterEvent.ReachableMember) {
201             memberReachable((ClusterEvent.ReachableMember) message);
202         } else if(message instanceof DatastoreContextFactory) {
203             onDatastoreContextFactory((DatastoreContextFactory)message);
204         } else if(message instanceof RoleChangeNotification) {
205             onRoleChangeNotification((RoleChangeNotification) message);
206         } else if(message instanceof FollowerInitialSyncUpStatus){
207             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
208         } else if(message instanceof ShardNotInitializedTimeout) {
209             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
210         } else if(message instanceof ShardLeaderStateChanged) {
211             onLeaderStateChanged((ShardLeaderStateChanged) message);
212         } else if(message instanceof SwitchShardBehavior){
213             onSwitchShardBehavior((SwitchShardBehavior) message);
214         } else if(message instanceof CreateShard) {
215             onCreateShard((CreateShard)message);
216         } else if(message instanceof AddShardReplica){
217             onAddShardReplica((AddShardReplica)message);
218         } else if(message instanceof ForwardedAddServerReply) {
219             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
220             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
221                     msg.removeShardOnFailure);
222         } else if(message instanceof ForwardedAddServerFailure) {
223             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
224             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
225         } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
226             ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
227             addShard(msg.shardName, msg.primaryFound, getSender());
228         } else if(message instanceof RemoveShardReplica){
229             onRemoveShardReplica((RemoveShardReplica)message);
230         } else if(message instanceof GetSnapshot) {
231             onGetSnapshot();
232         } else if(message instanceof ServerRemoved){
233             onShardReplicaRemoved((ServerRemoved) message);
234         } else if (message instanceof SaveSnapshotSuccess) {
235             LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
236         } else if (message instanceof SaveSnapshotFailure) {
237             LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
238                 persistenceId(), ((SaveSnapshotFailure)message).cause());
239         } else {
240             unknownMessage(message);
241         }
242     }
243
244     private void onShardReplicaRemoved(ServerRemoved message) {
245         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
246         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
247         if(shardInformation == null) {
248             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
249             return;
250         } else if(shardInformation.getActor() != null) {
251             LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
252             shardInformation.getActor().tell(PoisonPill.getInstance(), self());
253         }
254         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
255         persistShardList();
256     }
257
258     private void onGetSnapshot() {
259         LOG.debug("{}: onGetSnapshot", persistenceId());
260
261         List<String> notInitialized = null;
262         for(ShardInformation shardInfo: localShards.values()) {
263             if(!shardInfo.isShardInitialized()) {
264                 if(notInitialized == null) {
265                     notInitialized = new ArrayList<>();
266                 }
267
268                 notInitialized.add(shardInfo.getShardName());
269             }
270         }
271
272         if(notInitialized != null) {
273             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
274                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
275             return;
276         }
277
278         byte[] shardManagerSnapshot = null;
279         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
280                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
281                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
282
283         for(ShardInformation shardInfo: localShards.values()) {
284             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
285         }
286     }
287
288     private void onCreateShard(CreateShard createShard) {
289         Object reply;
290         try {
291             String shardName = createShard.getModuleShardConfig().getShardName();
292             if(localShards.containsKey(shardName)) {
293                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
294             } else {
295                 doCreateShard(createShard);
296                 reply = new akka.actor.Status.Success(null);
297             }
298         } catch (Exception e) {
299             LOG.error("onCreateShard failed", e);
300             reply = new akka.actor.Status.Failure(e);
301         }
302
303         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
304             getSender().tell(reply, getSelf());
305         }
306     }
307
308     private void doCreateShard(CreateShard createShard) {
309         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
310         String shardName = moduleShardConfig.getShardName();
311
312         configuration.addModuleShardConfiguration(moduleShardConfig);
313
314         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
315         if(shardDatastoreContext == null) {
316             shardDatastoreContext = newShardDatastoreContext(shardName);
317         } else {
318             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
319                     peerAddressResolver).build();
320         }
321
322         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
323
324         Map<String, String> peerAddresses;
325         boolean isActiveMember;
326         if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
327             peerAddresses = getPeerAddresses(shardName);
328             isActiveMember = true;
329         } else {
330             // The local member is not in the given shard member configuration. In this case we'll create
331             // the shard with no peers and with elections disabled so it stays as follower. A
332             // subsequent AddServer request will be needed to make it an active member.
333             isActiveMember = false;
334             peerAddresses = Collections.emptyMap();
335             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
336                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
337         }
338
339         LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
340                 moduleShardConfig.getShardMemberNames(), peerAddresses);
341
342         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
343                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
344         info.setActiveMember(isActiveMember);
345         localShards.put(info.getShardName(), info);
346
347         mBean.addLocalShard(shardId.toString());
348
349         if(schemaContext != null) {
350             info.setActor(newShardActor(schemaContext, info));
351         }
352     }
353
354     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
355         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
356                 shardPeerAddressResolver(peerAddressResolver);
357     }
358
359     private DatastoreContext newShardDatastoreContext(String shardName) {
360         return newShardDatastoreContextBuilder(shardName).build();
361     }
362
363     private void checkReady(){
364         if (isReadyWithLeaderId()) {
365             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
366                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
367
368             waitTillReadyCountdownLatch.countDown();
369         }
370     }
371
372     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
373         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
374
375         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
376         if(shardInformation != null) {
377             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
378             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
379             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
380                 primaryShardInfoCache.remove(shardInformation.getShardName());
381             }
382
383             checkReady();
384         } else {
385             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
386         }
387     }
388
389     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
390         ShardInformation shardInfo = message.getShardInfo();
391
392         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
393                 shardInfo.getShardName());
394
395         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
396
397         if(!shardInfo.isShardInitialized()) {
398             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
399             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
400         } else {
401             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
402             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
403         }
404     }
405
406     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
407         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
408                 status.getName(), status.isInitialSyncDone());
409
410         ShardInformation shardInformation = findShardInformation(status.getName());
411
412         if(shardInformation != null) {
413             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
414
415             mBean.setSyncStatus(isInSync());
416         }
417
418     }
419
420     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
421         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
422                 roleChanged.getOldRole(), roleChanged.getNewRole());
423
424         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
425         if(shardInformation != null) {
426             shardInformation.setRole(roleChanged.getNewRole());
427             checkReady();
428             mBean.setSyncStatus(isInSync());
429         }
430     }
431
432
433     private ShardInformation findShardInformation(String memberId) {
434         for(ShardInformation info : localShards.values()){
435             if(info.getShardId().toString().equals(memberId)){
436                 return info;
437             }
438         }
439
440         return null;
441     }
442
443     private boolean isReadyWithLeaderId() {
444         boolean isReady = true;
445         for (ShardInformation info : localShards.values()) {
446             if(!info.isShardReadyWithLeaderId()){
447                 isReady = false;
448                 break;
449             }
450         }
451         return isReady;
452     }
453
454     private boolean isInSync(){
455         for (ShardInformation info : localShards.values()) {
456             if(!info.isInSync()){
457                 return false;
458             }
459         }
460         return true;
461     }
462
463     private void onActorInitialized(Object message) {
464         final ActorRef sender = getSender();
465
466         if (sender == null) {
467             return; //why is a non-actor sending this message? Just ignore.
468         }
469
470         String actorName = sender.path().name();
471         //find shard name from actor name; actor name is stringified shardId
472         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
473
474         if (shardId.getShardName() == null) {
475             return;
476         }
477
478         markShardAsInitialized(shardId.getShardName());
479     }
480
481     private void markShardAsInitialized(String shardName) {
482         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
483
484         ShardInformation shardInformation = localShards.get(shardName);
485         if (shardInformation != null) {
486             shardInformation.setActorInitialized();
487
488             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
489         }
490     }
491
492     @Override
493     protected void handleRecover(Object message) throws Exception {
494         if (message instanceof RecoveryCompleted) {
495             LOG.info("Recovery complete : {}", persistenceId());
496
497             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
498             // journal on upgrade from Helium.
499             deleteMessages(lastSequenceNr());
500             createLocalShards();
501         } else if (message instanceof SnapshotOffer) {
502             handleShardRecovery((SnapshotOffer) message);
503         }
504     }
505
506     private void findLocalShard(FindLocalShard message) {
507         final ShardInformation shardInformation = localShards.get(message.getShardName());
508
509         if(shardInformation == null){
510             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
511             return;
512         }
513
514         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
515             @Override
516             public Object get() {
517                 return new LocalShardFound(shardInformation.getActor());
518             }
519         });
520     }
521
522     private void sendResponse(ShardInformation shardInformation, boolean doWait,
523             boolean wantShardReady, final Supplier<Object> messageSupplier) {
524         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
525             if(doWait) {
526                 final ActorRef sender = getSender();
527                 final ActorRef self = self();
528
529                 Runnable replyRunnable = new Runnable() {
530                     @Override
531                     public void run() {
532                         sender.tell(messageSupplier.get(), self);
533                     }
534                 };
535
536                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
537                     new OnShardInitialized(replyRunnable);
538
539                 shardInformation.addOnShardInitialized(onShardInitialized);
540
541                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
542                 if(shardInformation.isShardInitialized()) {
543                     // If the shard is already initialized then we'll wait enough time for the shard to
544                     // elect a leader, ie 2 times the election timeout.
545                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
546                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
547                 }
548
549                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
550                         shardInformation.getShardName());
551
552                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
553                         timeout, getSelf(),
554                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
555                         getContext().dispatcher(), getSelf());
556
557                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
558
559             } else if (!shardInformation.isShardInitialized()) {
560                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
561                         shardInformation.getShardName());
562                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
563             } else {
564                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
565                         shardInformation.getShardName());
566                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
567             }
568
569             return;
570         }
571
572         getSender().tell(messageSupplier.get(), getSelf());
573     }
574
575     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
576         return new NoShardLeaderException(null, shardId.toString());
577     }
578
579     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
580         return new NotInitializedException(String.format(
581                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
582     }
583
584     private void memberRemoved(ClusterEvent.MemberRemoved message) {
585         String memberName = message.member().roles().head();
586
587         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
588                 message.member().address());
589
590         peerAddressResolver.removePeerAddress(memberName);
591
592         for(ShardInformation info : localShards.values()){
593             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
594         }
595     }
596
597     private void memberExited(ClusterEvent.MemberExited message) {
598         String memberName = message.member().roles().head();
599
600         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
601                 message.member().address());
602
603         peerAddressResolver.removePeerAddress(memberName);
604
605         for(ShardInformation info : localShards.values()){
606             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
607         }
608     }
609
610     private void memberUp(ClusterEvent.MemberUp message) {
611         String memberName = message.member().roles().head();
612
613         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
614                 message.member().address());
615
616         addPeerAddress(memberName, message.member().address());
617
618         checkReady();
619     }
620
621     private void addPeerAddress(String memberName, Address address) {
622         peerAddressResolver.addPeerAddress(memberName, address);
623
624         for(ShardInformation info : localShards.values()){
625             String shardName = info.getShardName();
626             String peerId = getShardIdentifier(memberName, shardName).toString();
627             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
628
629             info.peerUp(memberName, peerId, getSelf());
630         }
631     }
632
633     private void memberReachable(ClusterEvent.ReachableMember message) {
634         String memberName = message.member().roles().head();
635         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
636
637         addPeerAddress(memberName, message.member().address());
638
639         markMemberAvailable(memberName);
640     }
641
642     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
643         String memberName = message.member().roles().head();
644         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
645
646         markMemberUnavailable(memberName);
647     }
648
649     private void markMemberUnavailable(final String memberName) {
650         for(ShardInformation info : localShards.values()){
651             String leaderId = info.getLeaderId();
652             if(leaderId != null && leaderId.contains(memberName)) {
653                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
654                 info.setLeaderAvailable(false);
655
656                 primaryShardInfoCache.remove(info.getShardName());
657             }
658
659             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
660         }
661     }
662
663     private void markMemberAvailable(final String memberName) {
664         for(ShardInformation info : localShards.values()){
665             String leaderId = info.getLeaderId();
666             if(leaderId != null && leaderId.contains(memberName)) {
667                 LOG.debug("Marking Leader {} as available.", leaderId);
668                 info.setLeaderAvailable(true);
669             }
670
671             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
672         }
673     }
674
675     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
676         datastoreContextFactory = factory;
677         for (ShardInformation info : localShards.values()) {
678             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
679         }
680     }
681
682     private void onSwitchShardBehavior(SwitchShardBehavior message) {
683         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
684
685         ShardInformation shardInformation = localShards.get(identifier.getShardName());
686
687         if(shardInformation != null && shardInformation.getActor() != null) {
688             shardInformation.getActor().tell(
689                     new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
690         } else {
691             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
692                     message.getShardName(), message.getNewState());
693         }
694     }
695
696     /**
697      * Notifies all the local shards of a change in the schema context
698      *
699      * @param message
700      */
701     private void updateSchemaContext(final Object message) {
702         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
703
704         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
705
706         for (ShardInformation info : localShards.values()) {
707             if (info.getActor() == null) {
708                 LOG.debug("Creating Shard {}", info.getShardId());
709                 info.setActor(newShardActor(schemaContext, info));
710             } else {
711                 info.getActor().tell(message, getSelf());
712             }
713         }
714     }
715
716     @VisibleForTesting
717     protected ClusterWrapper getCluster() {
718         return cluster;
719     }
720
721     @VisibleForTesting
722     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
723         return getContext().actorOf(info.newProps(schemaContext)
724                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
725     }
726
727     private void findPrimary(FindPrimary message) {
728         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
729
730         final String shardName = message.getShardName();
731         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
732
733         // First see if the there is a local replica for the shard
734         final ShardInformation info = localShards.get(shardName);
735         if (info != null && info.isActiveMember()) {
736             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
737                 @Override
738                 public Object get() {
739                     String primaryPath = info.getSerializedLeaderActor();
740                     Object found = canReturnLocalShardState && info.isLeader() ?
741                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
742                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
743
744                             if(LOG.isDebugEnabled()) {
745                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
746                             }
747
748                             return found;
749                 }
750             });
751
752             return;
753         }
754
755         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
756             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
757                     shardName, address);
758
759             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
760                     message.isWaitUntilReady()), getContext());
761             return;
762         }
763
764         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
765
766         getSender().tell(new PrimaryNotFoundException(
767                 String.format("No primary shard found for %s.", shardName)), getSelf());
768     }
769
770     /**
771      * Construct the name of the shard actor given the name of the member on
772      * which the shard resides and the name of the shard
773      *
774      * @param memberName
775      * @param shardName
776      * @return
777      */
778     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
779         return peerAddressResolver.getShardIdentifier(memberName, shardName);
780     }
781
782     /**
783      * Create shards that are local to the member on which the ShardManager
784      * runs
785      *
786      */
787     private void createLocalShards() {
788         String memberName = this.cluster.getCurrentMemberName();
789         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
790
791         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
792         if(restoreFromSnapshot != null)
793         {
794             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
795                 shardSnapshots.put(snapshot.getName(), snapshot);
796             }
797         }
798
799         restoreFromSnapshot = null; // null out to GC
800
801         for(String shardName : memberShardNames){
802             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
803             Map<String, String> peerAddresses = getPeerAddresses(shardName);
804             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
805                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
806                         shardSnapshots.get(shardName)), peerAddressResolver));
807             mBean.addLocalShard(shardId.toString());
808         }
809     }
810
811     /**
812      * Given the name of the shard find the addresses of all it's peers
813      *
814      * @param shardName
815      */
816     private Map<String, String> getPeerAddresses(String shardName) {
817         Collection<String> members = configuration.getMembersFromShardName(shardName);
818         Map<String, String> peerAddresses = new HashMap<>();
819
820         String currentMemberName = this.cluster.getCurrentMemberName();
821
822         for(String memberName : members) {
823             if(!currentMemberName.equals(memberName)) {
824                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
825                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
826                 peerAddresses.put(shardId.toString(), address);
827             }
828         }
829         return peerAddresses;
830     }
831
832     @Override
833     public SupervisorStrategy supervisorStrategy() {
834
835         return new OneForOneStrategy(10, Duration.create("1 minute"),
836                 new Function<Throwable, SupervisorStrategy.Directive>() {
837             @Override
838             public SupervisorStrategy.Directive apply(Throwable t) {
839                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
840                 return SupervisorStrategy.resume();
841             }
842         }
843                 );
844
845     }
846
847     @Override
848     public String persistenceId() {
849         return persistenceId;
850     }
851
852     @VisibleForTesting
853     ShardManagerInfoMBean getMBean(){
854         return mBean;
855     }
856
857     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
858         if (shardReplicaOperationsInProgress.contains(shardName)) {
859             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
860             LOG.debug ("{}: {}", persistenceId(), msg);
861             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
862             return true;
863         }
864
865         return false;
866     }
867
868     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
869         final String shardName = shardReplicaMsg.getShardName();
870
871         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
872
873         // verify the shard with the specified name is present in the cluster configuration
874         if (!(this.configuration.isShardConfigured(shardName))) {
875             String msg = String.format("No module configuration exists for shard %s", shardName);
876             LOG.debug ("{}: {}", persistenceId(), msg);
877             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
878             return;
879         }
880
881         // Create the localShard
882         if (schemaContext == null) {
883             String msg = String.format(
884                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
885             LOG.debug ("{}: {}", persistenceId(), msg);
886             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
887             return;
888         }
889
890         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
891                 getShardInitializationTimeout().duration().$times(2));
892
893         final ActorRef sender = getSender();
894         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
895         futureObj.onComplete(new OnComplete<Object>() {
896             @Override
897             public void onComplete(Throwable failure, Object response) {
898                 if (failure != null) {
899                     LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
900                     sender.tell(new akka.actor.Status.Failure(new RuntimeException(
901                         String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
902                 } else {
903                     if(response instanceof RemotePrimaryShardFound) {
904                         self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
905                                 (RemotePrimaryShardFound)response), sender);
906                     } else if(response instanceof LocalPrimaryShardFound) {
907                         sendLocalReplicaAlreadyExistsReply(shardName, sender);
908                     } else {
909                         String msg = String.format("Failed to find leader for shard %s: received response: %s",
910                                 shardName, response);
911                         LOG.debug ("{}: {}", persistenceId(), msg);
912                         sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
913                             new RuntimeException(msg)), getSelf());
914                     }
915                 }
916             }
917         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
918     }
919
920     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
921         String msg = String.format("Local shard %s already exists", shardName);
922         LOG.debug ("{}: {}", persistenceId(), msg);
923         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
924     }
925
926     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
927         if(isShardReplicaOperationInProgress(shardName, sender)) {
928             return;
929         }
930
931         shardReplicaOperationsInProgress.add(shardName);
932
933         final ShardInformation shardInfo;
934         final boolean removeShardOnFailure;
935         ShardInformation existingShardInfo = localShards.get(shardName);
936         if(existingShardInfo == null) {
937             removeShardOnFailure = true;
938             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
939
940             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
941                     DisableElectionsRaftPolicy.class.getName()).build();
942
943             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
944                     Shard.builder(), peerAddressResolver);
945             shardInfo.setActiveMember(false);
946             localShards.put(shardName, shardInfo);
947             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
948         } else {
949             removeShardOnFailure = false;
950             shardInfo = existingShardInfo;
951         }
952
953         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
954
955         //inform ShardLeader to add this shard as a replica by sending an AddServer message
956         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
957                 response.getPrimaryPath(), shardInfo.getShardId());
958
959         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
960                 duration());
961         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
962             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
963
964         futureObj.onComplete(new OnComplete<Object>() {
965             @Override
966             public void onComplete(Throwable failure, Object addServerResponse) {
967                 if (failure != null) {
968                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
969                             response.getPrimaryPath(), shardName, failure);
970
971                     String msg = String.format("AddServer request to leader %s for shard %s failed",
972                             response.getPrimaryPath(), shardName);
973                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
974                 } else {
975                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
976                             response.getPrimaryPath(), removeShardOnFailure), sender);
977                 }
978             }
979         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
980     }
981
982     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
983             boolean removeShardOnFailure) {
984         shardReplicaOperationsInProgress.remove(shardName);
985
986         if(removeShardOnFailure) {
987             ShardInformation shardInfo = localShards.remove(shardName);
988             if (shardInfo.getActor() != null) {
989                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
990             }
991         }
992
993         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
994             new RuntimeException(message, failure)), getSelf());
995     }
996
997     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
998             String leaderPath, boolean removeShardOnFailure) {
999         String shardName = shardInfo.getShardName();
1000         shardReplicaOperationsInProgress.remove(shardName);
1001
1002         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1003
1004         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1005             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1006
1007             // Make the local shard voting capable
1008             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1009             shardInfo.setActiveMember(true);
1010             persistShardList();
1011
1012             mBean.addLocalShard(shardInfo.getShardId().toString());
1013             sender.tell(new akka.actor.Status.Success(null), getSelf());
1014         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1015             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1016         } else {
1017             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1018                     persistenceId(), shardName, replyMsg.getStatus());
1019
1020             Exception failure;
1021             switch (replyMsg.getStatus()) {
1022                 case TIMEOUT:
1023                     failure = new TimeoutException(String.format(
1024                             "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1025                             "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1026                             leaderPath, shardName));
1027                     break;
1028                 case NO_LEADER:
1029                     failure = createNoShardLeaderException(shardInfo.getShardId());
1030                     break;
1031                 default :
1032                     failure = new RuntimeException(String.format(
1033                             "AddServer request to leader %s for shard %s failed with status %s",
1034                             leaderPath, shardName, replyMsg.getStatus()));
1035             }
1036
1037             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1038         }
1039     }
1040
1041     private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
1042         String shardName = shardReplicaMsg.getShardName();
1043
1044         // verify the local shard replica is available in the controller node
1045         if (!localShards.containsKey(shardName)) {
1046             String msg = String.format("Local shard %s does not", shardName);
1047             LOG.debug ("{}: {}", persistenceId(), msg);
1048             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1049             return;
1050         }
1051         // call RemoveShard for the shardName
1052         getSender().tell(new akka.actor.Status.Success(true), getSelf());
1053         return;
1054     }
1055
1056     private void persistShardList() {
1057         List<String> shardList = new ArrayList<>(localShards.keySet());
1058         for (ShardInformation shardInfo : localShards.values()) {
1059             if (!shardInfo.isActiveMember()) {
1060                 shardList.remove(shardInfo.getShardName());
1061             }
1062         }
1063         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1064         saveSnapshot(new ShardManagerSnapshot(shardList));
1065     }
1066
1067     private void handleShardRecovery(SnapshotOffer offer) {
1068         LOG.debug ("{}: in handleShardRecovery", persistenceId());
1069         ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
1070         String currentMember = cluster.getCurrentMemberName();
1071         Set<String> configuredShardList =
1072             new HashSet<>(configuration.getMemberShardNames(currentMember));
1073         for (String shard : snapshot.getShardList()) {
1074             if (!configuredShardList.contains(shard)) {
1075                 // add the current member as a replica for the shard
1076                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1077                 configuration.addMemberReplicaForShard(shard, currentMember);
1078             } else {
1079                 configuredShardList.remove(shard);
1080             }
1081         }
1082         for (String shard : configuredShardList) {
1083             // remove the member as a replica for the shard
1084             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1085             configuration.removeMemberReplicaForShard(shard, currentMember);
1086         }
1087     }
1088
1089     private static class ForwardedAddServerPrimaryShardFound {
1090         String shardName;
1091         RemotePrimaryShardFound primaryFound;
1092
1093         ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
1094             this.shardName = shardName;
1095             this.primaryFound = primaryFound;
1096         }
1097     }
1098
1099     private static class ForwardedAddServerReply {
1100         ShardInformation shardInfo;
1101         AddServerReply addServerReply;
1102         String leaderPath;
1103         boolean removeShardOnFailure;
1104
1105         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1106                 boolean removeShardOnFailure) {
1107             this.shardInfo = shardInfo;
1108             this.addServerReply = addServerReply;
1109             this.leaderPath = leaderPath;
1110             this.removeShardOnFailure = removeShardOnFailure;
1111         }
1112     }
1113
1114     private static class ForwardedAddServerFailure {
1115         String shardName;
1116         String failureMessage;
1117         Throwable failure;
1118         boolean removeShardOnFailure;
1119
1120         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1121                 boolean removeShardOnFailure) {
1122             this.shardName = shardName;
1123             this.failureMessage = failureMessage;
1124             this.failure = failure;
1125             this.removeShardOnFailure = removeShardOnFailure;
1126         }
1127     }
1128
1129     @VisibleForTesting
1130     protected static class ShardInformation {
1131         private final ShardIdentifier shardId;
1132         private final String shardName;
1133         private ActorRef actor;
1134         private ActorPath actorPath;
1135         private final Map<String, String> initialPeerAddresses;
1136         private Optional<DataTree> localShardDataTree;
1137         private boolean leaderAvailable = false;
1138
1139         // flag that determines if the actor is ready for business
1140         private boolean actorInitialized = false;
1141
1142         private boolean followerSyncStatus = false;
1143
1144         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1145         private String role ;
1146         private String leaderId;
1147         private short leaderVersion;
1148
1149         private DatastoreContext datastoreContext;
1150         private Shard.AbstractBuilder<?, ?> builder;
1151         private final ShardPeerAddressResolver addressResolver;
1152         private boolean isActiveMember = true;
1153
1154         private ShardInformation(String shardName, ShardIdentifier shardId,
1155                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1156                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1157             this.shardName = shardName;
1158             this.shardId = shardId;
1159             this.initialPeerAddresses = initialPeerAddresses;
1160             this.datastoreContext = datastoreContext;
1161             this.builder = builder;
1162             this.addressResolver = addressResolver;
1163         }
1164
1165         Props newProps(SchemaContext schemaContext) {
1166             Preconditions.checkNotNull(builder);
1167             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1168                     schemaContext(schemaContext).props();
1169             builder = null;
1170             return props;
1171         }
1172
1173         String getShardName() {
1174             return shardName;
1175         }
1176
1177         @Nullable
1178         ActorRef getActor(){
1179             return actor;
1180         }
1181
1182         ActorPath getActorPath() {
1183             return actorPath;
1184         }
1185
1186         void setActor(ActorRef actor) {
1187             this.actor = actor;
1188             this.actorPath = actor.path();
1189         }
1190
1191         ShardIdentifier getShardId() {
1192             return shardId;
1193         }
1194
1195         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1196             this.localShardDataTree = localShardDataTree;
1197         }
1198
1199         Optional<DataTree> getLocalShardDataTree() {
1200             return localShardDataTree;
1201         }
1202
1203         DatastoreContext getDatastoreContext() {
1204             return datastoreContext;
1205         }
1206
1207         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1208             this.datastoreContext = datastoreContext;
1209             if (actor != null) {
1210                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1211                 actor.tell(this.datastoreContext, sender);
1212             }
1213         }
1214
1215         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1216             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1217
1218             if(actor != null) {
1219                 if(LOG.isDebugEnabled()) {
1220                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1221                             peerId, peerAddress, actor.path());
1222                 }
1223
1224                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1225             }
1226
1227             notifyOnShardInitializedCallbacks();
1228         }
1229
1230         void peerDown(String memberName, String peerId, ActorRef sender) {
1231             if(actor != null) {
1232                 actor.tell(new PeerDown(memberName, peerId), sender);
1233             }
1234         }
1235
1236         void peerUp(String memberName, String peerId, ActorRef sender) {
1237             if(actor != null) {
1238                 actor.tell(new PeerUp(memberName, peerId), sender);
1239             }
1240         }
1241
1242         boolean isShardReady() {
1243             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1244         }
1245
1246         boolean isShardReadyWithLeaderId() {
1247             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1248                     (isLeader() || addressResolver.resolve(leaderId) != null);
1249         }
1250
1251         boolean isShardInitialized() {
1252             return getActor() != null && actorInitialized;
1253         }
1254
1255         boolean isLeader() {
1256             return Objects.equal(leaderId, shardId.toString());
1257         }
1258
1259         String getSerializedLeaderActor() {
1260             if(isLeader()) {
1261                 return Serialization.serializedActorPath(getActor());
1262             } else {
1263                 return addressResolver.resolve(leaderId);
1264             }
1265         }
1266
1267         void setActorInitialized() {
1268             LOG.debug("Shard {} is initialized", shardId);
1269
1270             this.actorInitialized = true;
1271
1272             notifyOnShardInitializedCallbacks();
1273         }
1274
1275         private void notifyOnShardInitializedCallbacks() {
1276             if(onShardInitializedSet.isEmpty()) {
1277                 return;
1278             }
1279
1280             boolean ready = isShardReadyWithLeaderId();
1281
1282             if(LOG.isDebugEnabled()) {
1283                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1284                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1285             }
1286
1287             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1288             while(iter.hasNext()) {
1289                 OnShardInitialized onShardInitialized = iter.next();
1290                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1291                     iter.remove();
1292                     onShardInitialized.getTimeoutSchedule().cancel();
1293                     onShardInitialized.getReplyRunnable().run();
1294                 }
1295             }
1296         }
1297
1298         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1299             onShardInitializedSet.add(onShardInitialized);
1300         }
1301
1302         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1303             onShardInitializedSet.remove(onShardInitialized);
1304         }
1305
1306         void setRole(String newRole) {
1307             this.role = newRole;
1308
1309             notifyOnShardInitializedCallbacks();
1310         }
1311
1312         void setFollowerSyncStatus(boolean syncStatus){
1313             this.followerSyncStatus = syncStatus;
1314         }
1315
1316         boolean isInSync(){
1317             if(RaftState.Follower.name().equals(this.role)){
1318                 return followerSyncStatus;
1319             } else if(RaftState.Leader.name().equals(this.role)){
1320                 return true;
1321             }
1322
1323             return false;
1324         }
1325
1326         boolean setLeaderId(String leaderId) {
1327             boolean changed = !Objects.equal(this.leaderId, leaderId);
1328             this.leaderId = leaderId;
1329             if(leaderId != null) {
1330                 this.leaderAvailable = true;
1331             }
1332             notifyOnShardInitializedCallbacks();
1333
1334             return changed;
1335         }
1336
1337         String getLeaderId() {
1338             return leaderId;
1339         }
1340
1341         void setLeaderAvailable(boolean leaderAvailable) {
1342             this.leaderAvailable = leaderAvailable;
1343         }
1344
1345         short getLeaderVersion() {
1346             return leaderVersion;
1347         }
1348
1349         void setLeaderVersion(short leaderVersion) {
1350             this.leaderVersion = leaderVersion;
1351         }
1352
1353         boolean isActiveMember() {
1354             return isActiveMember;
1355         }
1356
1357         void setActiveMember(boolean isActiveMember) {
1358             this.isActiveMember = isActiveMember;
1359         }
1360     }
1361
1362     private static class OnShardInitialized {
1363         private final Runnable replyRunnable;
1364         private Cancellable timeoutSchedule;
1365
1366         OnShardInitialized(Runnable replyRunnable) {
1367             this.replyRunnable = replyRunnable;
1368         }
1369
1370         Runnable getReplyRunnable() {
1371             return replyRunnable;
1372         }
1373
1374         Cancellable getTimeoutSchedule() {
1375             return timeoutSchedule;
1376         }
1377
1378         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1379             this.timeoutSchedule = timeoutSchedule;
1380         }
1381     }
1382
1383     private static class OnShardReady extends OnShardInitialized {
1384         OnShardReady(Runnable replyRunnable) {
1385             super(replyRunnable);
1386         }
1387     }
1388
1389     private static class ShardNotInitializedTimeout {
1390         private final ActorRef sender;
1391         private final ShardInformation shardInfo;
1392         private final OnShardInitialized onShardInitialized;
1393
1394         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1395             this.sender = sender;
1396             this.shardInfo = shardInfo;
1397             this.onShardInitialized = onShardInitialized;
1398         }
1399
1400         ActorRef getSender() {
1401             return sender;
1402         }
1403
1404         ShardInformation getShardInfo() {
1405             return shardInfo;
1406         }
1407
1408         OnShardInitialized getOnShardInitialized() {
1409             return onShardInitialized;
1410         }
1411     }
1412
1413     /**
1414      * We no longer persist SchemaContextModules but keep this class around for now for backwards
1415      * compatibility so we don't get de-serialization failures on upgrade from Helium.
1416      */
1417     @Deprecated
1418     static class SchemaContextModules implements Serializable {
1419         private static final long serialVersionUID = -8884620101025936590L;
1420
1421         private final Set<String> modules;
1422
1423         SchemaContextModules(Set<String> modules){
1424             this.modules = modules;
1425         }
1426
1427         public Set<String> getModules() {
1428             return modules;
1429         }
1430     }
1431
1432     public static Builder builder() {
1433         return new Builder();
1434     }
1435
1436     public static class Builder {
1437         private ClusterWrapper cluster;
1438         private Configuration configuration;
1439         private DatastoreContextFactory datastoreContextFactory;
1440         private CountDownLatch waitTillReadyCountdownLatch;
1441         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1442         private DatastoreSnapshot restoreFromSnapshot;
1443         private volatile boolean sealed;
1444
1445         protected void checkSealed() {
1446             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1447         }
1448
1449         public Builder cluster(ClusterWrapper cluster) {
1450             checkSealed();
1451             this.cluster = cluster;
1452             return this;
1453         }
1454
1455         public Builder configuration(Configuration configuration) {
1456             checkSealed();
1457             this.configuration = configuration;
1458             return this;
1459         }
1460
1461         public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1462             checkSealed();
1463             this.datastoreContextFactory = datastoreContextFactory;
1464             return this;
1465         }
1466
1467         public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1468             checkSealed();
1469             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1470             return this;
1471         }
1472
1473         public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1474             checkSealed();
1475             this.primaryShardInfoCache = primaryShardInfoCache;
1476             return this;
1477         }
1478
1479         public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1480             checkSealed();
1481             this.restoreFromSnapshot = restoreFromSnapshot;
1482             return this;
1483         }
1484
1485         protected void verify() {
1486             sealed = true;
1487             Preconditions.checkNotNull(cluster, "cluster should not be null");
1488             Preconditions.checkNotNull(configuration, "configuration should not be null");
1489             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1490             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1491             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1492         }
1493
1494         public Props props() {
1495             verify();
1496             return Props.create(ShardManager.class, this);
1497         }
1498     }
1499 }
1500
1501
1502