Bug 2187: Implement add-shard-replica RPC
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Function;
23 import akka.persistence.RecoveryCompleted;
24 import akka.persistence.SaveSnapshotFailure;
25 import akka.persistence.SaveSnapshotSuccess;
26 import akka.persistence.SnapshotOffer;
27 import akka.serialization.Serialization;
28 import akka.util.Timeout;
29 import com.google.common.annotations.VisibleForTesting;
30 import com.google.common.base.Objects;
31 import com.google.common.base.Optional;
32 import com.google.common.base.Preconditions;
33 import com.google.common.base.Strings;
34 import com.google.common.base.Supplier;
35 import com.google.common.collect.Sets;
36 import java.io.Serializable;
37 import java.util.ArrayList;
38 import java.util.Collection;
39 import java.util.Collections;
40 import java.util.HashMap;
41 import java.util.HashSet;
42 import java.util.Iterator;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Set;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
50 import org.opendaylight.controller.cluster.datastore.config.Configuration;
51 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
52 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
56 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
57 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
58 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
59 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
60 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
62 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
63 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
64 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
65 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
66 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
67 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
68 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
69 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
70 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
71 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
72 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
73 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
74 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
75 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
78 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
79 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
80 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
81 import org.opendaylight.controller.cluster.raft.RaftState;
82 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
83 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
84 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
85 import org.opendaylight.controller.cluster.raft.messages.AddServer;
86 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
87 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
88 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
89 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
90 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
91 import org.slf4j.Logger;
92 import org.slf4j.LoggerFactory;
93 import scala.concurrent.Future;
94 import scala.concurrent.duration.Duration;
95 import scala.concurrent.duration.FiniteDuration;
96
97 /**
98  * The ShardManager has the following jobs,
99  * <ul>
100  * <li> Create all the local shard replicas that belong on this cluster member
101  * <li> Find the address of the local shard
102  * <li> Find the primary replica for any given shard
103  * <li> Monitor the cluster members and store their addresses
104  * <ul>
105  */
106 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
107
108     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
109
110     // Stores a mapping between a shard name and it's corresponding information
111     // Shard names look like inventory, topology etc and are as specified in
112     // configuration
113     private final Map<String, ShardInformation> localShards = new HashMap<>();
114
115     // The type of a ShardManager reflects the type of the datastore itself
116     // A data store could be of type config/operational
117     private final String type;
118
119     private final ClusterWrapper cluster;
120
121     private final Configuration configuration;
122
123     private final String shardDispatcherPath;
124
125     private final ShardManagerInfo mBean;
126
127     private DatastoreContextFactory datastoreContextFactory;
128
129     private final CountDownLatch waitTillReadyCountdownLatch;
130
131     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
132
133     private final ShardPeerAddressResolver peerAddressResolver;
134
135     private SchemaContext schemaContext;
136
137     private DatastoreSnapshot restoreFromSnapshot;
138
139     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
140
141     private final String persistenceId;
142
143     /**
144      */
145     protected ShardManager(Builder builder) {
146
147         this.cluster = builder.cluster;
148         this.configuration = builder.configuration;
149         this.datastoreContextFactory = builder.datastoreContextFactory;
150         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
151         this.shardDispatcherPath =
152                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
153         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
154         this.primaryShardInfoCache = builder.primaryShardInfoCache;
155         this.restoreFromSnapshot = builder.restoreFromSnapshot;
156
157         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
158         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
159
160         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
161
162         // Subscribe this actor to cluster member events
163         cluster.subscribeToMemberEvents(getSelf());
164
165         List<String> localShardActorNames = new ArrayList<>();
166         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
167                 "shard-manager-" + this.type,
168                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
169                 localShardActorNames);
170         mBean.setShardManager(this);
171     }
172
173     @Override
174     public void postStop() {
175         LOG.info("Stopping ShardManager");
176
177         mBean.unregisterMBean();
178     }
179
180     @Override
181     public void handleCommand(Object message) throws Exception {
182         if (message  instanceof FindPrimary) {
183             findPrimary((FindPrimary)message);
184         } else if(message instanceof FindLocalShard){
185             findLocalShard((FindLocalShard) message);
186         } else if (message instanceof UpdateSchemaContext) {
187             updateSchemaContext(message);
188         } else if(message instanceof ActorInitialized) {
189             onActorInitialized(message);
190         } else if (message instanceof ClusterEvent.MemberUp){
191             memberUp((ClusterEvent.MemberUp) message);
192         } else if (message instanceof ClusterEvent.MemberExited){
193             memberExited((ClusterEvent.MemberExited) message);
194         } else if(message instanceof ClusterEvent.MemberRemoved) {
195             memberRemoved((ClusterEvent.MemberRemoved) message);
196         } else if(message instanceof ClusterEvent.UnreachableMember) {
197             memberUnreachable((ClusterEvent.UnreachableMember)message);
198         } else if(message instanceof ClusterEvent.ReachableMember) {
199             memberReachable((ClusterEvent.ReachableMember) message);
200         } else if(message instanceof DatastoreContextFactory) {
201             onDatastoreContextFactory((DatastoreContextFactory)message);
202         } else if(message instanceof RoleChangeNotification) {
203             onRoleChangeNotification((RoleChangeNotification) message);
204         } else if(message instanceof FollowerInitialSyncUpStatus){
205             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
206         } else if(message instanceof ShardNotInitializedTimeout) {
207             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
208         } else if(message instanceof ShardLeaderStateChanged) {
209             onLeaderStateChanged((ShardLeaderStateChanged) message);
210         } else if(message instanceof SwitchShardBehavior){
211             onSwitchShardBehavior((SwitchShardBehavior) message);
212         } else if(message instanceof CreateShard) {
213             onCreateShard((CreateShard)message);
214         } else if(message instanceof AddShardReplica){
215             onAddShardReplica((AddShardReplica)message);
216         } else if(message instanceof ForwardedAddServerReply) {
217             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
218             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
219                     msg.removeShardOnFailure);
220         } else if(message instanceof ForwardedAddServerFailure) {
221             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
222             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
223         } else if(message instanceof ForwardedAddServerPrimaryShardFound) {
224             ForwardedAddServerPrimaryShardFound msg = (ForwardedAddServerPrimaryShardFound)message;
225             addShard(msg.shardName, msg.primaryFound, getSender());
226         } else if(message instanceof RemoveShardReplica){
227             onRemoveShardReplica((RemoveShardReplica)message);
228         } else if(message instanceof GetSnapshot) {
229             onGetSnapshot();
230         } else if (message instanceof SaveSnapshotSuccess) {
231             LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
232         } else if (message instanceof SaveSnapshotFailure) {
233             LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
234                 persistenceId(), ((SaveSnapshotFailure)message).cause());
235         } else {
236             unknownMessage(message);
237         }
238     }
239
240     private void onGetSnapshot() {
241         LOG.debug("{}: onGetSnapshot", persistenceId());
242
243         List<String> notInitialized = null;
244         for(ShardInformation shardInfo: localShards.values()) {
245             if(!shardInfo.isShardInitialized()) {
246                 if(notInitialized == null) {
247                     notInitialized = new ArrayList<>();
248                 }
249
250                 notInitialized.add(shardInfo.getShardName());
251             }
252         }
253
254         if(notInitialized != null) {
255             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
256                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
257             return;
258         }
259
260         byte[] shardManagerSnapshot = null;
261         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
262                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
263                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
264
265         for(ShardInformation shardInfo: localShards.values()) {
266             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
267         }
268     }
269
270     private void onCreateShard(CreateShard createShard) {
271         Object reply;
272         try {
273             String shardName = createShard.getModuleShardConfig().getShardName();
274             if(localShards.containsKey(shardName)) {
275                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
276             } else {
277                 doCreateShard(createShard);
278                 reply = new akka.actor.Status.Success(null);
279             }
280         } catch (Exception e) {
281             LOG.error("onCreateShard failed", e);
282             reply = new akka.actor.Status.Failure(e);
283         }
284
285         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
286             getSender().tell(reply, getSelf());
287         }
288     }
289
290     private void doCreateShard(CreateShard createShard) {
291         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
292         String shardName = moduleShardConfig.getShardName();
293
294         configuration.addModuleShardConfiguration(moduleShardConfig);
295
296         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
297         if(shardDatastoreContext == null) {
298             shardDatastoreContext = newShardDatastoreContext(shardName);
299         } else {
300             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
301                     peerAddressResolver).build();
302         }
303
304         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
305
306         Map<String, String> peerAddresses;
307         boolean isActiveMember;
308         if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
309             peerAddresses = getPeerAddresses(shardName);
310             isActiveMember = true;
311         } else {
312             // The local member is not in the given shard member configuration. In this case we'll create
313             // the shard with no peers and with elections disabled so it stays as follower. A
314             // subsequent AddServer request will be needed to make it an active member.
315             isActiveMember = false;
316             peerAddresses = Collections.emptyMap();
317             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
318                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
319         }
320
321         LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
322                 moduleShardConfig.getShardMemberNames(), peerAddresses);
323
324         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
325                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
326         info.setActiveMember(isActiveMember);
327         localShards.put(info.getShardName(), info);
328
329         mBean.addLocalShard(shardId.toString());
330
331         if(schemaContext != null) {
332             info.setActor(newShardActor(schemaContext, info));
333         }
334     }
335
336     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
337         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
338                 shardPeerAddressResolver(peerAddressResolver);
339     }
340
341     private DatastoreContext newShardDatastoreContext(String shardName) {
342         return newShardDatastoreContextBuilder(shardName).build();
343     }
344
345     private void checkReady(){
346         if (isReadyWithLeaderId()) {
347             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
348                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
349
350             waitTillReadyCountdownLatch.countDown();
351         }
352     }
353
354     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
355         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
356
357         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
358         if(shardInformation != null) {
359             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
360             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
361             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
362                 primaryShardInfoCache.remove(shardInformation.getShardName());
363             }
364
365             checkReady();
366         } else {
367             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
368         }
369     }
370
371     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
372         ShardInformation shardInfo = message.getShardInfo();
373
374         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
375                 shardInfo.getShardName());
376
377         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
378
379         if(!shardInfo.isShardInitialized()) {
380             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
381             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
382         } else {
383             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
384             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
385         }
386     }
387
388     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
389         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
390                 status.getName(), status.isInitialSyncDone());
391
392         ShardInformation shardInformation = findShardInformation(status.getName());
393
394         if(shardInformation != null) {
395             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
396
397             mBean.setSyncStatus(isInSync());
398         }
399
400     }
401
402     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
403         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
404                 roleChanged.getOldRole(), roleChanged.getNewRole());
405
406         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
407         if(shardInformation != null) {
408             shardInformation.setRole(roleChanged.getNewRole());
409             checkReady();
410             mBean.setSyncStatus(isInSync());
411         }
412     }
413
414
415     private ShardInformation findShardInformation(String memberId) {
416         for(ShardInformation info : localShards.values()){
417             if(info.getShardId().toString().equals(memberId)){
418                 return info;
419             }
420         }
421
422         return null;
423     }
424
425     private boolean isReadyWithLeaderId() {
426         boolean isReady = true;
427         for (ShardInformation info : localShards.values()) {
428             if(!info.isShardReadyWithLeaderId()){
429                 isReady = false;
430                 break;
431             }
432         }
433         return isReady;
434     }
435
436     private boolean isInSync(){
437         for (ShardInformation info : localShards.values()) {
438             if(!info.isInSync()){
439                 return false;
440             }
441         }
442         return true;
443     }
444
445     private void onActorInitialized(Object message) {
446         final ActorRef sender = getSender();
447
448         if (sender == null) {
449             return; //why is a non-actor sending this message? Just ignore.
450         }
451
452         String actorName = sender.path().name();
453         //find shard name from actor name; actor name is stringified shardId
454         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
455
456         if (shardId.getShardName() == null) {
457             return;
458         }
459
460         markShardAsInitialized(shardId.getShardName());
461     }
462
463     private void markShardAsInitialized(String shardName) {
464         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
465
466         ShardInformation shardInformation = localShards.get(shardName);
467         if (shardInformation != null) {
468             shardInformation.setActorInitialized();
469
470             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
471         }
472     }
473
474     @Override
475     protected void handleRecover(Object message) throws Exception {
476         if (message instanceof RecoveryCompleted) {
477             LOG.info("Recovery complete : {}", persistenceId());
478
479             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
480             // journal on upgrade from Helium.
481             deleteMessages(lastSequenceNr());
482             createLocalShards();
483         } else if (message instanceof SnapshotOffer) {
484             handleShardRecovery((SnapshotOffer) message);
485         }
486     }
487
488     private void findLocalShard(FindLocalShard message) {
489         final ShardInformation shardInformation = localShards.get(message.getShardName());
490
491         if(shardInformation == null){
492             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
493             return;
494         }
495
496         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
497             @Override
498             public Object get() {
499                 return new LocalShardFound(shardInformation.getActor());
500             }
501         });
502     }
503
504     private void sendResponse(ShardInformation shardInformation, boolean doWait,
505             boolean wantShardReady, final Supplier<Object> messageSupplier) {
506         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
507             if(doWait) {
508                 final ActorRef sender = getSender();
509                 final ActorRef self = self();
510
511                 Runnable replyRunnable = new Runnable() {
512                     @Override
513                     public void run() {
514                         sender.tell(messageSupplier.get(), self);
515                     }
516                 };
517
518                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
519                     new OnShardInitialized(replyRunnable);
520
521                 shardInformation.addOnShardInitialized(onShardInitialized);
522
523                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
524                 if(shardInformation.isShardInitialized()) {
525                     // If the shard is already initialized then we'll wait enough time for the shard to
526                     // elect a leader, ie 2 times the election timeout.
527                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
528                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
529                 }
530
531                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
532                         shardInformation.getShardName());
533
534                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
535                         timeout, getSelf(),
536                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
537                         getContext().dispatcher(), getSelf());
538
539                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
540
541             } else if (!shardInformation.isShardInitialized()) {
542                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
543                         shardInformation.getShardName());
544                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
545             } else {
546                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
547                         shardInformation.getShardName());
548                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
549             }
550
551             return;
552         }
553
554         getSender().tell(messageSupplier.get(), getSelf());
555     }
556
557     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
558         return new NoShardLeaderException(null, shardId.toString());
559     }
560
561     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
562         return new NotInitializedException(String.format(
563                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
564     }
565
566     private void memberRemoved(ClusterEvent.MemberRemoved message) {
567         String memberName = message.member().roles().head();
568
569         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
570                 message.member().address());
571
572         peerAddressResolver.removePeerAddress(memberName);
573
574         for(ShardInformation info : localShards.values()){
575             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
576         }
577     }
578
579     private void memberExited(ClusterEvent.MemberExited message) {
580         String memberName = message.member().roles().head();
581
582         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
583                 message.member().address());
584
585         peerAddressResolver.removePeerAddress(memberName);
586
587         for(ShardInformation info : localShards.values()){
588             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
589         }
590     }
591
592     private void memberUp(ClusterEvent.MemberUp message) {
593         String memberName = message.member().roles().head();
594
595         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
596                 message.member().address());
597
598         addPeerAddress(memberName, message.member().address());
599
600         checkReady();
601     }
602
603     private void addPeerAddress(String memberName, Address address) {
604         peerAddressResolver.addPeerAddress(memberName, address);
605
606         for(ShardInformation info : localShards.values()){
607             String shardName = info.getShardName();
608             String peerId = getShardIdentifier(memberName, shardName).toString();
609             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
610
611             info.peerUp(memberName, peerId, getSelf());
612         }
613     }
614
615     private void memberReachable(ClusterEvent.ReachableMember message) {
616         String memberName = message.member().roles().head();
617         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
618
619         addPeerAddress(memberName, message.member().address());
620
621         markMemberAvailable(memberName);
622     }
623
624     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
625         String memberName = message.member().roles().head();
626         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
627
628         markMemberUnavailable(memberName);
629     }
630
631     private void markMemberUnavailable(final String memberName) {
632         for(ShardInformation info : localShards.values()){
633             String leaderId = info.getLeaderId();
634             if(leaderId != null && leaderId.contains(memberName)) {
635                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
636                 info.setLeaderAvailable(false);
637
638                 primaryShardInfoCache.remove(info.getShardName());
639             }
640
641             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
642         }
643     }
644
645     private void markMemberAvailable(final String memberName) {
646         for(ShardInformation info : localShards.values()){
647             String leaderId = info.getLeaderId();
648             if(leaderId != null && leaderId.contains(memberName)) {
649                 LOG.debug("Marking Leader {} as available.", leaderId);
650                 info.setLeaderAvailable(true);
651             }
652
653             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
654         }
655     }
656
657     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
658         datastoreContextFactory = factory;
659         for (ShardInformation info : localShards.values()) {
660             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
661         }
662     }
663
664     private void onSwitchShardBehavior(SwitchShardBehavior message) {
665         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
666
667         ShardInformation shardInformation = localShards.get(identifier.getShardName());
668
669         if(shardInformation != null && shardInformation.getActor() != null) {
670             shardInformation.getActor().tell(
671                     new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
672         } else {
673             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
674                     message.getShardName(), message.getNewState());
675         }
676     }
677
678     /**
679      * Notifies all the local shards of a change in the schema context
680      *
681      * @param message
682      */
683     private void updateSchemaContext(final Object message) {
684         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
685
686         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
687
688         for (ShardInformation info : localShards.values()) {
689             if (info.getActor() == null) {
690                 LOG.debug("Creating Shard {}", info.getShardId());
691                 info.setActor(newShardActor(schemaContext, info));
692             } else {
693                 info.getActor().tell(message, getSelf());
694             }
695         }
696     }
697
698     @VisibleForTesting
699     protected ClusterWrapper getCluster() {
700         return cluster;
701     }
702
703     @VisibleForTesting
704     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
705         return getContext().actorOf(info.newProps(schemaContext)
706                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
707     }
708
709     private void findPrimary(FindPrimary message) {
710         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
711
712         final String shardName = message.getShardName();
713         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
714
715         // First see if the there is a local replica for the shard
716         final ShardInformation info = localShards.get(shardName);
717         if (info != null && info.isActiveMember()) {
718             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
719                 @Override
720                 public Object get() {
721                     String primaryPath = info.getSerializedLeaderActor();
722                     Object found = canReturnLocalShardState && info.isLeader() ?
723                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
724                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
725
726                             if(LOG.isDebugEnabled()) {
727                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
728                             }
729
730                             return found;
731                 }
732             });
733
734             return;
735         }
736
737         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
738             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
739                     shardName, address);
740
741             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
742                     message.isWaitUntilReady()), getContext());
743             return;
744         }
745
746         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
747
748         getSender().tell(new PrimaryNotFoundException(
749                 String.format("No primary shard found for %s.", shardName)), getSelf());
750     }
751
752     /**
753      * Construct the name of the shard actor given the name of the member on
754      * which the shard resides and the name of the shard
755      *
756      * @param memberName
757      * @param shardName
758      * @return
759      */
760     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
761         return peerAddressResolver.getShardIdentifier(memberName, shardName);
762     }
763
764     /**
765      * Create shards that are local to the member on which the ShardManager
766      * runs
767      *
768      */
769     private void createLocalShards() {
770         String memberName = this.cluster.getCurrentMemberName();
771         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
772
773         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
774         if(restoreFromSnapshot != null)
775         {
776             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
777                 shardSnapshots.put(snapshot.getName(), snapshot);
778             }
779         }
780
781         restoreFromSnapshot = null; // null out to GC
782
783         for(String shardName : memberShardNames){
784             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
785             Map<String, String> peerAddresses = getPeerAddresses(shardName);
786             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
787                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
788                         shardSnapshots.get(shardName)), peerAddressResolver));
789             mBean.addLocalShard(shardId.toString());
790         }
791     }
792
793     /**
794      * Given the name of the shard find the addresses of all it's peers
795      *
796      * @param shardName
797      */
798     private Map<String, String> getPeerAddresses(String shardName) {
799         Collection<String> members = configuration.getMembersFromShardName(shardName);
800         Map<String, String> peerAddresses = new HashMap<>();
801
802         String currentMemberName = this.cluster.getCurrentMemberName();
803
804         for(String memberName : members) {
805             if(!currentMemberName.equals(memberName)) {
806                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
807                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
808                 peerAddresses.put(shardId.toString(), address);
809             }
810         }
811         return peerAddresses;
812     }
813
814     @Override
815     public SupervisorStrategy supervisorStrategy() {
816
817         return new OneForOneStrategy(10, Duration.create("1 minute"),
818                 new Function<Throwable, SupervisorStrategy.Directive>() {
819             @Override
820             public SupervisorStrategy.Directive apply(Throwable t) {
821                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
822                 return SupervisorStrategy.resume();
823             }
824         }
825                 );
826
827     }
828
829     @Override
830     public String persistenceId() {
831         return persistenceId;
832     }
833
834     @VisibleForTesting
835     ShardManagerInfoMBean getMBean(){
836         return mBean;
837     }
838
839     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
840         if (shardReplicaOperationsInProgress.contains(shardName)) {
841             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
842             LOG.debug ("{}: {}", persistenceId(), msg);
843             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
844             return true;
845         }
846
847         return false;
848     }
849
850     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
851         final String shardName = shardReplicaMsg.getShardName();
852
853         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
854
855         // verify the shard with the specified name is present in the cluster configuration
856         if (!(this.configuration.isShardConfigured(shardName))) {
857             String msg = String.format("No module configuration exists for shard %s", shardName);
858             LOG.debug ("{}: {}", persistenceId(), msg);
859             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
860             return;
861         }
862
863         // Create the localShard
864         if (schemaContext == null) {
865             String msg = String.format(
866                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
867             LOG.debug ("{}: {}", persistenceId(), msg);
868             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
869             return;
870         }
871
872         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
873                 getShardInitializationTimeout().duration().$times(2));
874
875         final ActorRef sender = getSender();
876         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
877         futureObj.onComplete(new OnComplete<Object>() {
878             @Override
879             public void onComplete(Throwable failure, Object response) {
880                 if (failure != null) {
881                     LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
882                     sender.tell(new akka.actor.Status.Failure(new RuntimeException(
883                         String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
884                 } else {
885                     if(response instanceof RemotePrimaryShardFound) {
886                         self().tell(new ForwardedAddServerPrimaryShardFound(shardName,
887                                 (RemotePrimaryShardFound)response), sender);
888                     } else if(response instanceof LocalPrimaryShardFound) {
889                         sendLocalReplicaAlreadyExistsReply(shardName, sender);
890                     } else {
891                         String msg = String.format("Failed to find leader for shard %s: received response: %s",
892                                 shardName, response);
893                         LOG.debug ("{}: {}", persistenceId(), msg);
894                         sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
895                             new RuntimeException(msg)), getSelf());
896                     }
897                 }
898             }
899         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
900     }
901
902     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
903         String msg = String.format("Local shard %s already exists", shardName);
904         LOG.debug ("{}: {}", persistenceId(), msg);
905         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
906     }
907
908     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
909         if(isShardReplicaOperationInProgress(shardName, sender)) {
910             return;
911         }
912
913         shardReplicaOperationsInProgress.add(shardName);
914
915         final ShardInformation shardInfo;
916         final boolean removeShardOnFailure;
917         ShardInformation existingShardInfo = localShards.get(shardName);
918         if(existingShardInfo == null) {
919             removeShardOnFailure = true;
920             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
921
922             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
923                     DisableElectionsRaftPolicy.class.getName()).build();
924
925             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
926                     Shard.builder(), peerAddressResolver);
927             shardInfo.setActiveMember(false);
928             localShards.put(shardName, shardInfo);
929             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
930         } else {
931             removeShardOnFailure = false;
932             shardInfo = existingShardInfo;
933         }
934
935         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
936
937         //inform ShardLeader to add this shard as a replica by sending an AddServer message
938         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
939                 response.getPrimaryPath(), shardInfo.getShardId());
940
941         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
942                 duration());
943         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
944             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
945
946         futureObj.onComplete(new OnComplete<Object>() {
947             @Override
948             public void onComplete(Throwable failure, Object addServerResponse) {
949                 if (failure != null) {
950                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
951                             response.getPrimaryPath(), shardName, failure);
952
953                     String msg = String.format("AddServer request to leader %s for shard %s failed",
954                             response.getPrimaryPath(), shardName);
955                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
956                 } else {
957                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
958                             response.getPrimaryPath(), removeShardOnFailure), sender);
959                 }
960             }
961         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
962     }
963
964     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
965             boolean removeShardOnFailure) {
966         shardReplicaOperationsInProgress.remove(shardName);
967
968         if(removeShardOnFailure) {
969             ShardInformation shardInfo = localShards.remove(shardName);
970             if (shardInfo.getActor() != null) {
971                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
972             }
973         }
974
975         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
976             new RuntimeException(message, failure)), getSelf());
977     }
978
979     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
980             String leaderPath, boolean removeShardOnFailure) {
981         String shardName = shardInfo.getShardName();
982         shardReplicaOperationsInProgress.remove(shardName);
983
984         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
985
986         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
987             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
988
989             // Make the local shard voting capable
990             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
991             shardInfo.setActiveMember(true);
992             persistShardList();
993
994             mBean.addLocalShard(shardInfo.getShardId().toString());
995             sender.tell(new akka.actor.Status.Success(null), getSelf());
996         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
997             sendLocalReplicaAlreadyExistsReply(shardName, sender);
998         } else {
999             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1000                     persistenceId(), shardName, replyMsg.getStatus());
1001
1002             Exception failure;
1003             switch (replyMsg.getStatus()) {
1004                 case TIMEOUT:
1005                     failure = new TimeoutException(String.format(
1006                             "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1007                             "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1008                             leaderPath, shardName));
1009                     break;
1010                 case NO_LEADER:
1011                     failure = createNoShardLeaderException(shardInfo.getShardId());
1012                     break;
1013                 default :
1014                     failure = new RuntimeException(String.format(
1015                             "AddServer request to leader %s for shard %s failed with status %s",
1016                             leaderPath, shardName, replyMsg.getStatus()));
1017             }
1018
1019             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1020         }
1021     }
1022
1023     private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
1024         String shardName = shardReplicaMsg.getShardName();
1025
1026         // verify the local shard replica is available in the controller node
1027         if (!localShards.containsKey(shardName)) {
1028             String msg = String.format("Local shard %s does not", shardName);
1029             LOG.debug ("{}: {}", persistenceId(), msg);
1030             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1031             return;
1032         }
1033         // call RemoveShard for the shardName
1034         getSender().tell(new akka.actor.Status.Success(true), getSelf());
1035         return;
1036     }
1037
1038     private void persistShardList() {
1039         List<String> shardList = new ArrayList<>(localShards.keySet());
1040         for (ShardInformation shardInfo : localShards.values()) {
1041             if (!shardInfo.isActiveMember()) {
1042                 shardList.remove(shardInfo.getShardName());
1043             }
1044         }
1045         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1046         saveSnapshot(new ShardManagerSnapshot(shardList));
1047     }
1048
1049     private void handleShardRecovery(SnapshotOffer offer) {
1050         LOG.debug ("{}: in handleShardRecovery", persistenceId());
1051         ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
1052         String currentMember = cluster.getCurrentMemberName();
1053         Set<String> configuredShardList =
1054             new HashSet<>(configuration.getMemberShardNames(currentMember));
1055         for (String shard : snapshot.getShardList()) {
1056             if (!configuredShardList.contains(shard)) {
1057                 // add the current member as a replica for the shard
1058                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1059                 configuration.addMemberReplicaForShard(shard, currentMember);
1060             } else {
1061                 configuredShardList.remove(shard);
1062             }
1063         }
1064         for (String shard : configuredShardList) {
1065             // remove the member as a replica for the shard
1066             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1067             configuration.removeMemberReplicaForShard(shard, currentMember);
1068         }
1069     }
1070
1071     private static class ForwardedAddServerPrimaryShardFound {
1072         String shardName;
1073         RemotePrimaryShardFound primaryFound;
1074
1075         ForwardedAddServerPrimaryShardFound(String shardName, RemotePrimaryShardFound primaryFound) {
1076             this.shardName = shardName;
1077             this.primaryFound = primaryFound;
1078         }
1079     }
1080
1081     private static class ForwardedAddServerReply {
1082         ShardInformation shardInfo;
1083         AddServerReply addServerReply;
1084         String leaderPath;
1085         boolean removeShardOnFailure;
1086
1087         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1088                 boolean removeShardOnFailure) {
1089             this.shardInfo = shardInfo;
1090             this.addServerReply = addServerReply;
1091             this.leaderPath = leaderPath;
1092             this.removeShardOnFailure = removeShardOnFailure;
1093         }
1094     }
1095
1096     private static class ForwardedAddServerFailure {
1097         String shardName;
1098         String failureMessage;
1099         Throwable failure;
1100         boolean removeShardOnFailure;
1101
1102         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1103                 boolean removeShardOnFailure) {
1104             this.shardName = shardName;
1105             this.failureMessage = failureMessage;
1106             this.failure = failure;
1107             this.removeShardOnFailure = removeShardOnFailure;
1108         }
1109     }
1110
1111     @VisibleForTesting
1112     protected static class ShardInformation {
1113         private final ShardIdentifier shardId;
1114         private final String shardName;
1115         private ActorRef actor;
1116         private ActorPath actorPath;
1117         private final Map<String, String> initialPeerAddresses;
1118         private Optional<DataTree> localShardDataTree;
1119         private boolean leaderAvailable = false;
1120
1121         // flag that determines if the actor is ready for business
1122         private boolean actorInitialized = false;
1123
1124         private boolean followerSyncStatus = false;
1125
1126         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1127         private String role ;
1128         private String leaderId;
1129         private short leaderVersion;
1130
1131         private DatastoreContext datastoreContext;
1132         private Shard.AbstractBuilder<?, ?> builder;
1133         private final ShardPeerAddressResolver addressResolver;
1134         private boolean isActiveMember = true;
1135
1136         private ShardInformation(String shardName, ShardIdentifier shardId,
1137                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1138                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1139             this.shardName = shardName;
1140             this.shardId = shardId;
1141             this.initialPeerAddresses = initialPeerAddresses;
1142             this.datastoreContext = datastoreContext;
1143             this.builder = builder;
1144             this.addressResolver = addressResolver;
1145         }
1146
1147         Props newProps(SchemaContext schemaContext) {
1148             Preconditions.checkNotNull(builder);
1149             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1150                     schemaContext(schemaContext).props();
1151             builder = null;
1152             return props;
1153         }
1154
1155         String getShardName() {
1156             return shardName;
1157         }
1158
1159         ActorRef getActor(){
1160             return actor;
1161         }
1162
1163         ActorPath getActorPath() {
1164             return actorPath;
1165         }
1166
1167         void setActor(ActorRef actor) {
1168             this.actor = actor;
1169             this.actorPath = actor.path();
1170         }
1171
1172         ShardIdentifier getShardId() {
1173             return shardId;
1174         }
1175
1176         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1177             this.localShardDataTree = localShardDataTree;
1178         }
1179
1180         Optional<DataTree> getLocalShardDataTree() {
1181             return localShardDataTree;
1182         }
1183
1184         DatastoreContext getDatastoreContext() {
1185             return datastoreContext;
1186         }
1187
1188         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1189             this.datastoreContext = datastoreContext;
1190             if (actor != null) {
1191                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1192                 actor.tell(this.datastoreContext, sender);
1193             }
1194         }
1195
1196         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1197             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1198
1199             if(actor != null) {
1200                 if(LOG.isDebugEnabled()) {
1201                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1202                             peerId, peerAddress, actor.path());
1203                 }
1204
1205                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1206             }
1207
1208             notifyOnShardInitializedCallbacks();
1209         }
1210
1211         void peerDown(String memberName, String peerId, ActorRef sender) {
1212             if(actor != null) {
1213                 actor.tell(new PeerDown(memberName, peerId), sender);
1214             }
1215         }
1216
1217         void peerUp(String memberName, String peerId, ActorRef sender) {
1218             if(actor != null) {
1219                 actor.tell(new PeerUp(memberName, peerId), sender);
1220             }
1221         }
1222
1223         boolean isShardReady() {
1224             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1225         }
1226
1227         boolean isShardReadyWithLeaderId() {
1228             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1229                     (isLeader() || addressResolver.resolve(leaderId) != null);
1230         }
1231
1232         boolean isShardInitialized() {
1233             return getActor() != null && actorInitialized;
1234         }
1235
1236         boolean isLeader() {
1237             return Objects.equal(leaderId, shardId.toString());
1238         }
1239
1240         String getSerializedLeaderActor() {
1241             if(isLeader()) {
1242                 return Serialization.serializedActorPath(getActor());
1243             } else {
1244                 return addressResolver.resolve(leaderId);
1245             }
1246         }
1247
1248         void setActorInitialized() {
1249             LOG.debug("Shard {} is initialized", shardId);
1250
1251             this.actorInitialized = true;
1252
1253             notifyOnShardInitializedCallbacks();
1254         }
1255
1256         private void notifyOnShardInitializedCallbacks() {
1257             if(onShardInitializedSet.isEmpty()) {
1258                 return;
1259             }
1260
1261             boolean ready = isShardReadyWithLeaderId();
1262
1263             if(LOG.isDebugEnabled()) {
1264                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1265                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1266             }
1267
1268             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1269             while(iter.hasNext()) {
1270                 OnShardInitialized onShardInitialized = iter.next();
1271                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1272                     iter.remove();
1273                     onShardInitialized.getTimeoutSchedule().cancel();
1274                     onShardInitialized.getReplyRunnable().run();
1275                 }
1276             }
1277         }
1278
1279         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1280             onShardInitializedSet.add(onShardInitialized);
1281         }
1282
1283         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1284             onShardInitializedSet.remove(onShardInitialized);
1285         }
1286
1287         void setRole(String newRole) {
1288             this.role = newRole;
1289
1290             notifyOnShardInitializedCallbacks();
1291         }
1292
1293         void setFollowerSyncStatus(boolean syncStatus){
1294             this.followerSyncStatus = syncStatus;
1295         }
1296
1297         boolean isInSync(){
1298             if(RaftState.Follower.name().equals(this.role)){
1299                 return followerSyncStatus;
1300             } else if(RaftState.Leader.name().equals(this.role)){
1301                 return true;
1302             }
1303
1304             return false;
1305         }
1306
1307         boolean setLeaderId(String leaderId) {
1308             boolean changed = !Objects.equal(this.leaderId, leaderId);
1309             this.leaderId = leaderId;
1310             if(leaderId != null) {
1311                 this.leaderAvailable = true;
1312             }
1313             notifyOnShardInitializedCallbacks();
1314
1315             return changed;
1316         }
1317
1318         String getLeaderId() {
1319             return leaderId;
1320         }
1321
1322         void setLeaderAvailable(boolean leaderAvailable) {
1323             this.leaderAvailable = leaderAvailable;
1324         }
1325
1326         short getLeaderVersion() {
1327             return leaderVersion;
1328         }
1329
1330         void setLeaderVersion(short leaderVersion) {
1331             this.leaderVersion = leaderVersion;
1332         }
1333
1334         boolean isActiveMember() {
1335             return isActiveMember;
1336         }
1337
1338         void setActiveMember(boolean isActiveMember) {
1339             this.isActiveMember = isActiveMember;
1340         }
1341     }
1342
1343     private static class OnShardInitialized {
1344         private final Runnable replyRunnable;
1345         private Cancellable timeoutSchedule;
1346
1347         OnShardInitialized(Runnable replyRunnable) {
1348             this.replyRunnable = replyRunnable;
1349         }
1350
1351         Runnable getReplyRunnable() {
1352             return replyRunnable;
1353         }
1354
1355         Cancellable getTimeoutSchedule() {
1356             return timeoutSchedule;
1357         }
1358
1359         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1360             this.timeoutSchedule = timeoutSchedule;
1361         }
1362     }
1363
1364     private static class OnShardReady extends OnShardInitialized {
1365         OnShardReady(Runnable replyRunnable) {
1366             super(replyRunnable);
1367         }
1368     }
1369
1370     private static class ShardNotInitializedTimeout {
1371         private final ActorRef sender;
1372         private final ShardInformation shardInfo;
1373         private final OnShardInitialized onShardInitialized;
1374
1375         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1376             this.sender = sender;
1377             this.shardInfo = shardInfo;
1378             this.onShardInitialized = onShardInitialized;
1379         }
1380
1381         ActorRef getSender() {
1382             return sender;
1383         }
1384
1385         ShardInformation getShardInfo() {
1386             return shardInfo;
1387         }
1388
1389         OnShardInitialized getOnShardInitialized() {
1390             return onShardInitialized;
1391         }
1392     }
1393
1394     /**
1395      * We no longer persist SchemaContextModules but keep this class around for now for backwards
1396      * compatibility so we don't get de-serialization failures on upgrade from Helium.
1397      */
1398     @Deprecated
1399     static class SchemaContextModules implements Serializable {
1400         private static final long serialVersionUID = -8884620101025936590L;
1401
1402         private final Set<String> modules;
1403
1404         SchemaContextModules(Set<String> modules){
1405             this.modules = modules;
1406         }
1407
1408         public Set<String> getModules() {
1409             return modules;
1410         }
1411     }
1412
1413     public static Builder builder() {
1414         return new Builder();
1415     }
1416
1417     public static class Builder {
1418         private ClusterWrapper cluster;
1419         private Configuration configuration;
1420         private DatastoreContextFactory datastoreContextFactory;
1421         private CountDownLatch waitTillReadyCountdownLatch;
1422         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1423         private DatastoreSnapshot restoreFromSnapshot;
1424         private volatile boolean sealed;
1425
1426         protected void checkSealed() {
1427             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1428         }
1429
1430         public Builder cluster(ClusterWrapper cluster) {
1431             checkSealed();
1432             this.cluster = cluster;
1433             return this;
1434         }
1435
1436         public Builder configuration(Configuration configuration) {
1437             checkSealed();
1438             this.configuration = configuration;
1439             return this;
1440         }
1441
1442         public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1443             checkSealed();
1444             this.datastoreContextFactory = datastoreContextFactory;
1445             return this;
1446         }
1447
1448         public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1449             checkSealed();
1450             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1451             return this;
1452         }
1453
1454         public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1455             checkSealed();
1456             this.primaryShardInfoCache = primaryShardInfoCache;
1457             return this;
1458         }
1459
1460         public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1461             checkSealed();
1462             this.restoreFromSnapshot = restoreFromSnapshot;
1463             return this;
1464         }
1465
1466         protected void verify() {
1467             sealed = true;
1468             Preconditions.checkNotNull(cluster, "cluster should not be null");
1469             Preconditions.checkNotNull(configuration, "configuration should not be null");
1470             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1471             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1472             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1473         }
1474
1475         public Props props() {
1476             verify();
1477             return Props.create(ShardManager.class, this);
1478         }
1479     }
1480 }
1481
1482
1483