Bug 2187: Bootstrap EOS shard when no local shards configured
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Function;
23 import akka.persistence.RecoveryCompleted;
24 import akka.persistence.SaveSnapshotFailure;
25 import akka.persistence.SaveSnapshotSuccess;
26 import akka.persistence.SnapshotOffer;
27 import akka.serialization.Serialization;
28 import akka.util.Timeout;
29 import com.google.common.annotations.VisibleForTesting;
30 import com.google.common.base.Objects;
31 import com.google.common.base.Optional;
32 import com.google.common.base.Preconditions;
33 import com.google.common.base.Strings;
34 import com.google.common.base.Supplier;
35 import com.google.common.collect.Sets;
36 import java.io.Serializable;
37 import java.util.ArrayList;
38 import java.util.Collection;
39 import java.util.Collections;
40 import java.util.HashMap;
41 import java.util.HashSet;
42 import java.util.Iterator;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Set;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
50 import org.opendaylight.controller.cluster.datastore.config.Configuration;
51 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
52 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
53 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
54 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
56 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
57 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
58 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
59 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
60 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
61 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
62 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
63 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
64 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
65 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
66 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
67 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
68 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
69 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
70 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
71 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
72 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
73 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
74 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
75 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
76 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
77 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
78 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
79 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
80 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
81 import org.opendaylight.controller.cluster.raft.RaftState;
82 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
83 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
84 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
85 import org.opendaylight.controller.cluster.raft.messages.AddServer;
86 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
87 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
88 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
89 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
90 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
91 import org.slf4j.Logger;
92 import org.slf4j.LoggerFactory;
93 import scala.concurrent.Future;
94 import scala.concurrent.duration.Duration;
95 import scala.concurrent.duration.FiniteDuration;
96
97 /**
98  * The ShardManager has the following jobs,
99  * <ul>
100  * <li> Create all the local shard replicas that belong on this cluster member
101  * <li> Find the address of the local shard
102  * <li> Find the primary replica for any given shard
103  * <li> Monitor the cluster members and store their addresses
104  * <ul>
105  */
106 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
107
108     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
109
110     // Stores a mapping between a shard name and it's corresponding information
111     // Shard names look like inventory, topology etc and are as specified in
112     // configuration
113     private final Map<String, ShardInformation> localShards = new HashMap<>();
114
115     // The type of a ShardManager reflects the type of the datastore itself
116     // A data store could be of type config/operational
117     private final String type;
118
119     private final ClusterWrapper cluster;
120
121     private final Configuration configuration;
122
123     private final String shardDispatcherPath;
124
125     private final ShardManagerInfo mBean;
126
127     private DatastoreContextFactory datastoreContextFactory;
128
129     private final CountDownLatch waitTillReadyCountdownLatch;
130
131     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
132
133     private final ShardPeerAddressResolver peerAddressResolver;
134
135     private SchemaContext schemaContext;
136
137     private DatastoreSnapshot restoreFromSnapshot;
138
139     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
140
141     private final String id;
142
143     /**
144      */
145     protected ShardManager(Builder builder) {
146
147         this.cluster = builder.cluster;
148         this.configuration = builder.configuration;
149         this.datastoreContextFactory = builder.datastoreContextFactory;
150         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
151         this.shardDispatcherPath =
152                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
153         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
154         this.primaryShardInfoCache = builder.primaryShardInfoCache;
155         this.restoreFromSnapshot = builder.restoreFromSnapshot;
156
157         id = "shard-manager-" + type;
158
159         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
160
161         // Subscribe this actor to cluster member events
162         cluster.subscribeToMemberEvents(getSelf());
163
164         List<String> localShardActorNames = new ArrayList<>();
165         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
166                 "shard-manager-" + this.type,
167                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
168                 localShardActorNames);
169         mBean.setShardManager(this);
170     }
171
172     @Override
173     public void postStop() {
174         LOG.info("Stopping ShardManager");
175
176         mBean.unregisterMBean();
177     }
178
179     @Override
180     public void handleCommand(Object message) throws Exception {
181         if (message  instanceof FindPrimary) {
182             findPrimary((FindPrimary)message);
183         } else if(message instanceof FindLocalShard){
184             findLocalShard((FindLocalShard) message);
185         } else if (message instanceof UpdateSchemaContext) {
186             updateSchemaContext(message);
187         } else if(message instanceof ActorInitialized) {
188             onActorInitialized(message);
189         } else if (message instanceof ClusterEvent.MemberUp){
190             memberUp((ClusterEvent.MemberUp) message);
191         } else if (message instanceof ClusterEvent.MemberExited){
192             memberExited((ClusterEvent.MemberExited) message);
193         } else if(message instanceof ClusterEvent.MemberRemoved) {
194             memberRemoved((ClusterEvent.MemberRemoved) message);
195         } else if(message instanceof ClusterEvent.UnreachableMember) {
196             memberUnreachable((ClusterEvent.UnreachableMember)message);
197         } else if(message instanceof ClusterEvent.ReachableMember) {
198             memberReachable((ClusterEvent.ReachableMember) message);
199         } else if(message instanceof DatastoreContextFactory) {
200             onDatastoreContextFactory((DatastoreContextFactory)message);
201         } else if(message instanceof RoleChangeNotification) {
202             onRoleChangeNotification((RoleChangeNotification) message);
203         } else if(message instanceof FollowerInitialSyncUpStatus){
204             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
205         } else if(message instanceof ShardNotInitializedTimeout) {
206             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
207         } else if(message instanceof ShardLeaderStateChanged) {
208             onLeaderStateChanged((ShardLeaderStateChanged) message);
209         } else if(message instanceof SwitchShardBehavior){
210             onSwitchShardBehavior((SwitchShardBehavior) message);
211         } else if(message instanceof CreateShard) {
212             onCreateShard((CreateShard)message);
213         } else if(message instanceof AddShardReplica){
214             onAddShardReplica((AddShardReplica)message);
215         } else if(message instanceof RemoveShardReplica){
216             onRemoveShardReplica((RemoveShardReplica)message);
217         } else if(message instanceof GetSnapshot) {
218             onGetSnapshot();
219         } else if (message instanceof SaveSnapshotSuccess) {
220             LOG.debug ("{} saved ShardManager snapshot successfully", persistenceId());
221         } else if (message instanceof SaveSnapshotFailure) {
222             LOG.error ("{}: SaveSnapshotFailure received for saving snapshot of shards",
223                 persistenceId(), ((SaveSnapshotFailure)message).cause());
224         } else {
225             unknownMessage(message);
226         }
227
228     }
229
230     private void onGetSnapshot() {
231         LOG.debug("{}: onGetSnapshot", persistenceId());
232
233         List<String> notInitialized = null;
234         for(ShardInformation shardInfo: localShards.values()) {
235             if(!shardInfo.isShardInitialized()) {
236                 if(notInitialized == null) {
237                     notInitialized = new ArrayList<>();
238                 }
239
240                 notInitialized.add(shardInfo.getShardName());
241             }
242         }
243
244         if(notInitialized != null) {
245             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
246                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
247             return;
248         }
249
250         byte[] shardManagerSnapshot = null;
251         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
252                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
253                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
254
255         for(ShardInformation shardInfo: localShards.values()) {
256             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
257         }
258     }
259
260     private void onCreateShard(CreateShard createShard) {
261         Object reply;
262         try {
263             String shardName = createShard.getModuleShardConfig().getShardName();
264             if(localShards.containsKey(shardName)) {
265                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
266             } else {
267                 doCreateShard(createShard);
268                 reply = new akka.actor.Status.Success(null);
269             }
270         } catch (Exception e) {
271             LOG.error("onCreateShard failed", e);
272             reply = new akka.actor.Status.Failure(e);
273         }
274
275         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
276             getSender().tell(reply, getSelf());
277         }
278     }
279
280     private void doCreateShard(CreateShard createShard) {
281         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
282         String shardName = moduleShardConfig.getShardName();
283
284         configuration.addModuleShardConfiguration(moduleShardConfig);
285
286         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
287         if(shardDatastoreContext == null) {
288             shardDatastoreContext = newShardDatastoreContext(shardName);
289         } else {
290             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
291                     peerAddressResolver).build();
292         }
293
294         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
295
296         Map<String, String> peerAddresses;
297         boolean isActiveMember;
298         if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
299             peerAddresses = getPeerAddresses(shardName);
300             isActiveMember = true;
301         } else {
302             // The local member is not in the given shard member configuration. In this case we'll create
303             // the shard with no peers and with elections disabled so it stays as follower. A
304             // subsequent AddServer request will be needed to make it an active member.
305             isActiveMember = false;
306             peerAddresses = Collections.emptyMap();
307             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
308                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
309         }
310
311         LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
312                 moduleShardConfig.getShardMemberNames(), peerAddresses);
313
314         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
315                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
316         info.setActiveMember(isActiveMember);
317         localShards.put(info.getShardName(), info);
318
319         mBean.addLocalShard(shardId.toString());
320
321         if(schemaContext != null) {
322             info.setActor(newShardActor(schemaContext, info));
323         }
324     }
325
326     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
327         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
328                 shardPeerAddressResolver(peerAddressResolver);
329     }
330
331     private DatastoreContext newShardDatastoreContext(String shardName) {
332         return newShardDatastoreContextBuilder(shardName).build();
333     }
334
335     private void checkReady(){
336         if (isReadyWithLeaderId()) {
337             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
338                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
339
340             waitTillReadyCountdownLatch.countDown();
341         }
342     }
343
344     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
345         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
346
347         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
348         if(shardInformation != null) {
349             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
350             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
351             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
352                 primaryShardInfoCache.remove(shardInformation.getShardName());
353             }
354
355             checkReady();
356         } else {
357             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
358         }
359     }
360
361     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
362         ShardInformation shardInfo = message.getShardInfo();
363
364         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
365                 shardInfo.getShardName());
366
367         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
368
369         if(!shardInfo.isShardInitialized()) {
370             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
371             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
372         } else {
373             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
374             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
375         }
376     }
377
378     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
379         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
380                 status.getName(), status.isInitialSyncDone());
381
382         ShardInformation shardInformation = findShardInformation(status.getName());
383
384         if(shardInformation != null) {
385             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
386
387             mBean.setSyncStatus(isInSync());
388         }
389
390     }
391
392     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
393         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
394                 roleChanged.getOldRole(), roleChanged.getNewRole());
395
396         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
397         if(shardInformation != null) {
398             shardInformation.setRole(roleChanged.getNewRole());
399             checkReady();
400             mBean.setSyncStatus(isInSync());
401         }
402     }
403
404
405     private ShardInformation findShardInformation(String memberId) {
406         for(ShardInformation info : localShards.values()){
407             if(info.getShardId().toString().equals(memberId)){
408                 return info;
409             }
410         }
411
412         return null;
413     }
414
415     private boolean isReadyWithLeaderId() {
416         boolean isReady = true;
417         for (ShardInformation info : localShards.values()) {
418             if(!info.isShardReadyWithLeaderId()){
419                 isReady = false;
420                 break;
421             }
422         }
423         return isReady;
424     }
425
426     private boolean isInSync(){
427         for (ShardInformation info : localShards.values()) {
428             if(!info.isInSync()){
429                 return false;
430             }
431         }
432         return true;
433     }
434
435     private void onActorInitialized(Object message) {
436         final ActorRef sender = getSender();
437
438         if (sender == null) {
439             return; //why is a non-actor sending this message? Just ignore.
440         }
441
442         String actorName = sender.path().name();
443         //find shard name from actor name; actor name is stringified shardId
444         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
445
446         if (shardId.getShardName() == null) {
447             return;
448         }
449
450         markShardAsInitialized(shardId.getShardName());
451     }
452
453     private void markShardAsInitialized(String shardName) {
454         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
455
456         ShardInformation shardInformation = localShards.get(shardName);
457         if (shardInformation != null) {
458             shardInformation.setActorInitialized();
459
460             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
461         }
462     }
463
464     @Override
465     protected void handleRecover(Object message) throws Exception {
466         if (message instanceof RecoveryCompleted) {
467             LOG.info("Recovery complete : {}", persistenceId());
468
469             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
470             // journal on upgrade from Helium.
471             deleteMessages(lastSequenceNr());
472             createLocalShards();
473         } else if (message instanceof SnapshotOffer) {
474             handleShardRecovery((SnapshotOffer) message);
475         }
476     }
477
478     private void findLocalShard(FindLocalShard message) {
479         final ShardInformation shardInformation = localShards.get(message.getShardName());
480
481         if(shardInformation == null){
482             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
483             return;
484         }
485
486         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
487             @Override
488             public Object get() {
489                 return new LocalShardFound(shardInformation.getActor());
490             }
491         });
492     }
493
494     private void sendResponse(ShardInformation shardInformation, boolean doWait,
495             boolean wantShardReady, final Supplier<Object> messageSupplier) {
496         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
497             if(doWait) {
498                 final ActorRef sender = getSender();
499                 final ActorRef self = self();
500
501                 Runnable replyRunnable = new Runnable() {
502                     @Override
503                     public void run() {
504                         sender.tell(messageSupplier.get(), self);
505                     }
506                 };
507
508                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
509                     new OnShardInitialized(replyRunnable);
510
511                 shardInformation.addOnShardInitialized(onShardInitialized);
512
513                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
514                 if(shardInformation.isShardInitialized()) {
515                     // If the shard is already initialized then we'll wait enough time for the shard to
516                     // elect a leader, ie 2 times the election timeout.
517                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
518                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
519                 }
520
521                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
522                         shardInformation.getShardName());
523
524                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
525                         timeout, getSelf(),
526                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
527                         getContext().dispatcher(), getSelf());
528
529                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
530
531             } else if (!shardInformation.isShardInitialized()) {
532                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
533                         shardInformation.getShardName());
534                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
535             } else {
536                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
537                         shardInformation.getShardName());
538                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
539             }
540
541             return;
542         }
543
544         getSender().tell(messageSupplier.get(), getSelf());
545     }
546
547     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
548         return new NoShardLeaderException(null, shardId.toString());
549     }
550
551     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
552         return new NotInitializedException(String.format(
553                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
554     }
555
556     private void memberRemoved(ClusterEvent.MemberRemoved message) {
557         String memberName = message.member().roles().head();
558
559         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
560                 message.member().address());
561
562         peerAddressResolver.removePeerAddress(memberName);
563
564         for(ShardInformation info : localShards.values()){
565             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
566         }
567     }
568
569     private void memberExited(ClusterEvent.MemberExited message) {
570         String memberName = message.member().roles().head();
571
572         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
573                 message.member().address());
574
575         peerAddressResolver.removePeerAddress(memberName);
576
577         for(ShardInformation info : localShards.values()){
578             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
579         }
580     }
581
582     private void memberUp(ClusterEvent.MemberUp message) {
583         String memberName = message.member().roles().head();
584
585         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
586                 message.member().address());
587
588         addPeerAddress(memberName, message.member().address());
589
590         checkReady();
591     }
592
593     private void addPeerAddress(String memberName, Address address) {
594         peerAddressResolver.addPeerAddress(memberName, address);
595
596         for(ShardInformation info : localShards.values()){
597             String shardName = info.getShardName();
598             String peerId = getShardIdentifier(memberName, shardName).toString();
599             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
600
601             info.peerUp(memberName, peerId, getSelf());
602         }
603     }
604
605     private void memberReachable(ClusterEvent.ReachableMember message) {
606         String memberName = message.member().roles().head();
607         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
608
609         addPeerAddress(memberName, message.member().address());
610
611         markMemberAvailable(memberName);
612     }
613
614     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
615         String memberName = message.member().roles().head();
616         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
617
618         markMemberUnavailable(memberName);
619     }
620
621     private void markMemberUnavailable(final String memberName) {
622         for(ShardInformation info : localShards.values()){
623             String leaderId = info.getLeaderId();
624             if(leaderId != null && leaderId.contains(memberName)) {
625                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
626                 info.setLeaderAvailable(false);
627
628                 primaryShardInfoCache.remove(info.getShardName());
629             }
630
631             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
632         }
633     }
634
635     private void markMemberAvailable(final String memberName) {
636         for(ShardInformation info : localShards.values()){
637             String leaderId = info.getLeaderId();
638             if(leaderId != null && leaderId.contains(memberName)) {
639                 LOG.debug("Marking Leader {} as available.", leaderId);
640                 info.setLeaderAvailable(true);
641             }
642
643             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
644         }
645     }
646
647     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
648         datastoreContextFactory = factory;
649         for (ShardInformation info : localShards.values()) {
650             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
651         }
652     }
653
654     private void onSwitchShardBehavior(SwitchShardBehavior message) {
655         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
656
657         ShardInformation shardInformation = localShards.get(identifier.getShardName());
658
659         if(shardInformation != null && shardInformation.getActor() != null) {
660             shardInformation.getActor().tell(
661                     new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
662         } else {
663             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
664                     message.getShardName(), message.getNewState());
665         }
666     }
667
668     /**
669      * Notifies all the local shards of a change in the schema context
670      *
671      * @param message
672      */
673     private void updateSchemaContext(final Object message) {
674         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
675
676         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
677
678         for (ShardInformation info : localShards.values()) {
679             if (info.getActor() == null) {
680                 LOG.debug("Creating Shard {}", info.getShardId());
681                 info.setActor(newShardActor(schemaContext, info));
682             } else {
683                 info.getActor().tell(message, getSelf());
684             }
685         }
686     }
687
688     @VisibleForTesting
689     protected ClusterWrapper getCluster() {
690         return cluster;
691     }
692
693     @VisibleForTesting
694     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
695         return getContext().actorOf(info.newProps(schemaContext)
696                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
697     }
698
699     private void findPrimary(FindPrimary message) {
700         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
701
702         final String shardName = message.getShardName();
703         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
704
705         // First see if the there is a local replica for the shard
706         final ShardInformation info = localShards.get(shardName);
707         if (info != null && info.isActiveMember()) {
708             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
709                 @Override
710                 public Object get() {
711                     String primaryPath = info.getSerializedLeaderActor();
712                     Object found = canReturnLocalShardState && info.isLeader() ?
713                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
714                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
715
716                             if(LOG.isDebugEnabled()) {
717                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
718                             }
719
720                             return found;
721                 }
722             });
723
724             return;
725         }
726
727         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
728             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
729                     shardName, address);
730
731             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
732                     message.isWaitUntilReady()), getContext());
733             return;
734         }
735
736         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
737
738         getSender().tell(new PrimaryNotFoundException(
739                 String.format("No primary shard found for %s.", shardName)), getSelf());
740     }
741
742     /**
743      * Construct the name of the shard actor given the name of the member on
744      * which the shard resides and the name of the shard
745      *
746      * @param memberName
747      * @param shardName
748      * @return
749      */
750     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
751         return peerAddressResolver.getShardIdentifier(memberName, shardName);
752     }
753
754     /**
755      * Create shards that are local to the member on which the ShardManager
756      * runs
757      *
758      */
759     private void createLocalShards() {
760         String memberName = this.cluster.getCurrentMemberName();
761         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
762
763         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
764         if(restoreFromSnapshot != null)
765         {
766             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
767                 shardSnapshots.put(snapshot.getName(), snapshot);
768             }
769         }
770
771         restoreFromSnapshot = null; // null out to GC
772
773         for(String shardName : memberShardNames){
774             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
775             Map<String, String> peerAddresses = getPeerAddresses(shardName);
776             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
777                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
778                         shardSnapshots.get(shardName)), peerAddressResolver));
779             mBean.addLocalShard(shardId.toString());
780         }
781     }
782
783     /**
784      * Given the name of the shard find the addresses of all it's peers
785      *
786      * @param shardName
787      */
788     private Map<String, String> getPeerAddresses(String shardName) {
789         Collection<String> members = configuration.getMembersFromShardName(shardName);
790         Map<String, String> peerAddresses = new HashMap<>();
791
792         String currentMemberName = this.cluster.getCurrentMemberName();
793
794         for(String memberName : members) {
795             if(!currentMemberName.equals(memberName)) {
796                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
797                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
798                 peerAddresses.put(shardId.toString(), address);
799             }
800         }
801         return peerAddresses;
802     }
803
804     @Override
805     public SupervisorStrategy supervisorStrategy() {
806
807         return new OneForOneStrategy(10, Duration.create("1 minute"),
808                 new Function<Throwable, SupervisorStrategy.Directive>() {
809             @Override
810             public SupervisorStrategy.Directive apply(Throwable t) {
811                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
812                 return SupervisorStrategy.resume();
813             }
814         }
815                 );
816
817     }
818
819     @Override
820     public String persistenceId() {
821         return id;
822     }
823
824     @VisibleForTesting
825     ShardManagerInfoMBean getMBean(){
826         return mBean;
827     }
828
829     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
830         if (shardReplicaOperationsInProgress.contains(shardName)) {
831             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
832             LOG.debug ("{}: {}", persistenceId(), msg);
833             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
834             return true;
835         }
836
837         return false;
838     }
839
840     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
841         final String shardName = shardReplicaMsg.getShardName();
842
843         LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
844
845         // verify the shard with the specified name is present in the cluster configuration
846         if (!(this.configuration.isShardConfigured(shardName))) {
847             String msg = String.format("No module configuration exists for shard %s", shardName);
848             LOG.debug ("{}: {}", persistenceId(), msg);
849             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
850             return;
851         }
852
853         // Create the localShard
854         if (schemaContext == null) {
855             String msg = String.format(
856                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
857             LOG.debug ("{}: {}", persistenceId(), msg);
858             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
859             return;
860         }
861
862         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
863                 getShardInitializationTimeout().duration().$times(2));
864
865         final ActorRef sender = getSender();
866         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
867         futureObj.onComplete(new OnComplete<Object>() {
868             @Override
869             public void onComplete(Throwable failure, Object response) {
870                 if (failure != null) {
871                     LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
872                     sender.tell(new akka.actor.Status.Failure(new RuntimeException(
873                         String.format("Failed to find leader for shard %s", shardName), failure)), getSelf());
874                 } else {
875                     if(response instanceof RemotePrimaryShardFound) {
876                         RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
877                         addShard (shardName, message, sender);
878                     } else if(response instanceof LocalPrimaryShardFound) {
879                         sendLocalReplicaAlreadyExistsReply(shardName, sender);
880                     } else {
881                         String msg = String.format("Failed to find leader for shard %s: received response: %s",
882                                 shardName, response);
883                         LOG.debug ("{}: {}", persistenceId(), msg);
884                         sender.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable)response :
885                             new RuntimeException(msg)), getSelf());
886                     }
887                 }
888             }
889         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
890     }
891
892     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
893         String msg = String.format("Local shard %s already exists", shardName);
894         LOG.debug ("{}: {}", persistenceId(), msg);
895         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
896     }
897
898     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
899         if(isShardReplicaOperationInProgress(shardName, sender)) {
900             return;
901         }
902
903         shardReplicaOperationsInProgress.add(shardName);
904
905         final ShardInformation shardInfo;
906         final boolean removeShardOnFailure;
907         ShardInformation existingShardInfo = localShards.get(shardName);
908         if(existingShardInfo == null) {
909             removeShardOnFailure = true;
910             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
911
912             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
913                     DisableElectionsRaftPolicy.class.getName()).build();
914
915             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
916                     Shard.builder(), peerAddressResolver);
917             shardInfo.setActiveMember(false);
918             localShards.put(shardName, shardInfo);
919             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
920         } else {
921             removeShardOnFailure = false;
922             shardInfo = existingShardInfo;
923         }
924
925         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
926
927         //inform ShardLeader to add this shard as a replica by sending an AddServer message
928         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
929                 response.getPrimaryPath(), shardInfo.getShardId());
930
931         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
932                 duration());
933         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
934             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
935
936         futureObj.onComplete(new OnComplete<Object>() {
937             @Override
938             public void onComplete(Throwable failure, Object addServerResponse) {
939                 shardReplicaOperationsInProgress.remove(shardName);
940                 if (failure != null) {
941                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
942                             response.getPrimaryPath(), shardName, failure);
943
944                     onAddServerFailure(shardName, String.format("AddServer request to leader %s for shard %s failed",
945                             response.getPrimaryPath(), shardName), failure, sender, removeShardOnFailure);
946                 } else {
947                     AddServerReply reply = (AddServerReply)addServerResponse;
948                     onAddServerReply(shardInfo, reply, sender, response.getPrimaryPath(), removeShardOnFailure);
949                 }
950             }
951         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
952     }
953
954     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
955             boolean removeShardOnFailure) {
956         if(removeShardOnFailure) {
957             ShardInformation shardInfo = localShards.remove(shardName);
958             if (shardInfo.getActor() != null) {
959                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
960             }
961         }
962
963         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
964             new RuntimeException(message, failure)), getSelf());
965     }
966
967     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
968             String leaderPath, boolean removeShardOnFailure) {
969         String shardName = shardInfo.getShardName();
970         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
971
972         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
973             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
974
975             // Make the local shard voting capable
976             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
977             shardInfo.setActiveMember(true);
978             persistShardList();
979
980             mBean.addLocalShard(shardInfo.getShardId().toString());
981             sender.tell(new akka.actor.Status.Success(null), getSelf());
982         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
983             sendLocalReplicaAlreadyExistsReply(shardName, sender);
984         } else {
985             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
986                     persistenceId(), shardName, replyMsg.getStatus());
987
988             Exception failure;
989             switch (replyMsg.getStatus()) {
990                 case TIMEOUT:
991                     failure = new TimeoutException(String.format(
992                             "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
993                             "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
994                             leaderPath, shardName));
995                     break;
996                 case NO_LEADER:
997                     failure = createNoShardLeaderException(shardInfo.getShardId());
998                     break;
999                 default :
1000                     failure = new RuntimeException(String.format(
1001                             "AddServer request to leader %s for shard %s failed with status %s",
1002                             leaderPath, shardName, replyMsg.getStatus()));
1003             }
1004
1005             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1006         }
1007     }
1008
1009     private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
1010         String shardName = shardReplicaMsg.getShardName();
1011
1012         // verify the local shard replica is available in the controller node
1013         if (!localShards.containsKey(shardName)) {
1014             String msg = String.format("Local shard %s does not", shardName);
1015             LOG.debug ("{}: {}", persistenceId(), msg);
1016             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1017             return;
1018         }
1019         // call RemoveShard for the shardName
1020         getSender().tell(new akka.actor.Status.Success(true), getSelf());
1021         return;
1022     }
1023
1024     private void persistShardList() {
1025         List<String> shardList = new ArrayList<>(localShards.keySet());
1026         for (ShardInformation shardInfo : localShards.values()) {
1027             if (!shardInfo.isActiveMember()) {
1028                 shardList.remove(shardInfo.getShardName());
1029             }
1030         }
1031         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1032         saveSnapshot(new ShardManagerSnapshot(shardList));
1033     }
1034
1035     private void handleShardRecovery(SnapshotOffer offer) {
1036         LOG.debug ("{}: in handleShardRecovery", persistenceId());
1037         ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
1038         String currentMember = cluster.getCurrentMemberName();
1039         Set<String> configuredShardList =
1040             new HashSet<>(configuration.getMemberShardNames(currentMember));
1041         for (String shard : snapshot.getShardList()) {
1042             if (!configuredShardList.contains(shard)) {
1043                 // add the current member as a replica for the shard
1044                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1045                 configuration.addMemberReplicaForShard(shard, currentMember);
1046             } else {
1047                 configuredShardList.remove(shard);
1048             }
1049         }
1050         for (String shard : configuredShardList) {
1051             // remove the member as a replica for the shard
1052             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1053             configuration.removeMemberReplicaForShard(shard, currentMember);
1054         }
1055     }
1056
1057     @VisibleForTesting
1058     protected static class ShardInformation {
1059         private final ShardIdentifier shardId;
1060         private final String shardName;
1061         private ActorRef actor;
1062         private ActorPath actorPath;
1063         private final Map<String, String> initialPeerAddresses;
1064         private Optional<DataTree> localShardDataTree;
1065         private boolean leaderAvailable = false;
1066
1067         // flag that determines if the actor is ready for business
1068         private boolean actorInitialized = false;
1069
1070         private boolean followerSyncStatus = false;
1071
1072         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1073         private String role ;
1074         private String leaderId;
1075         private short leaderVersion;
1076
1077         private DatastoreContext datastoreContext;
1078         private Shard.AbstractBuilder<?, ?> builder;
1079         private final ShardPeerAddressResolver addressResolver;
1080         private boolean isActiveMember = true;
1081
1082         private ShardInformation(String shardName, ShardIdentifier shardId,
1083                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1084                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1085             this.shardName = shardName;
1086             this.shardId = shardId;
1087             this.initialPeerAddresses = initialPeerAddresses;
1088             this.datastoreContext = datastoreContext;
1089             this.builder = builder;
1090             this.addressResolver = addressResolver;
1091         }
1092
1093         Props newProps(SchemaContext schemaContext) {
1094             Preconditions.checkNotNull(builder);
1095             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1096                     schemaContext(schemaContext).props();
1097             builder = null;
1098             return props;
1099         }
1100
1101         String getShardName() {
1102             return shardName;
1103         }
1104
1105         ActorRef getActor(){
1106             return actor;
1107         }
1108
1109         ActorPath getActorPath() {
1110             return actorPath;
1111         }
1112
1113         void setActor(ActorRef actor) {
1114             this.actor = actor;
1115             this.actorPath = actor.path();
1116         }
1117
1118         ShardIdentifier getShardId() {
1119             return shardId;
1120         }
1121
1122         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1123             this.localShardDataTree = localShardDataTree;
1124         }
1125
1126         Optional<DataTree> getLocalShardDataTree() {
1127             return localShardDataTree;
1128         }
1129
1130         DatastoreContext getDatastoreContext() {
1131             return datastoreContext;
1132         }
1133
1134         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1135             this.datastoreContext = datastoreContext;
1136             if (actor != null) {
1137                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1138                 actor.tell(this.datastoreContext, sender);
1139             }
1140         }
1141
1142         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1143             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1144
1145             if(actor != null) {
1146                 if(LOG.isDebugEnabled()) {
1147                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1148                             peerId, peerAddress, actor.path());
1149                 }
1150
1151                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1152             }
1153
1154             notifyOnShardInitializedCallbacks();
1155         }
1156
1157         void peerDown(String memberName, String peerId, ActorRef sender) {
1158             if(actor != null) {
1159                 actor.tell(new PeerDown(memberName, peerId), sender);
1160             }
1161         }
1162
1163         void peerUp(String memberName, String peerId, ActorRef sender) {
1164             if(actor != null) {
1165                 actor.tell(new PeerUp(memberName, peerId), sender);
1166             }
1167         }
1168
1169         boolean isShardReady() {
1170             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1171         }
1172
1173         boolean isShardReadyWithLeaderId() {
1174             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1175                     (isLeader() || addressResolver.resolve(leaderId) != null);
1176         }
1177
1178         boolean isShardInitialized() {
1179             return getActor() != null && actorInitialized;
1180         }
1181
1182         boolean isLeader() {
1183             return Objects.equal(leaderId, shardId.toString());
1184         }
1185
1186         String getSerializedLeaderActor() {
1187             if(isLeader()) {
1188                 return Serialization.serializedActorPath(getActor());
1189             } else {
1190                 return addressResolver.resolve(leaderId);
1191             }
1192         }
1193
1194         void setActorInitialized() {
1195             LOG.debug("Shard {} is initialized", shardId);
1196
1197             this.actorInitialized = true;
1198
1199             notifyOnShardInitializedCallbacks();
1200         }
1201
1202         private void notifyOnShardInitializedCallbacks() {
1203             if(onShardInitializedSet.isEmpty()) {
1204                 return;
1205             }
1206
1207             boolean ready = isShardReadyWithLeaderId();
1208
1209             if(LOG.isDebugEnabled()) {
1210                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1211                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1212             }
1213
1214             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1215             while(iter.hasNext()) {
1216                 OnShardInitialized onShardInitialized = iter.next();
1217                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1218                     iter.remove();
1219                     onShardInitialized.getTimeoutSchedule().cancel();
1220                     onShardInitialized.getReplyRunnable().run();
1221                 }
1222             }
1223         }
1224
1225         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1226             onShardInitializedSet.add(onShardInitialized);
1227         }
1228
1229         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1230             onShardInitializedSet.remove(onShardInitialized);
1231         }
1232
1233         void setRole(String newRole) {
1234             this.role = newRole;
1235
1236             notifyOnShardInitializedCallbacks();
1237         }
1238
1239         void setFollowerSyncStatus(boolean syncStatus){
1240             this.followerSyncStatus = syncStatus;
1241         }
1242
1243         boolean isInSync(){
1244             if(RaftState.Follower.name().equals(this.role)){
1245                 return followerSyncStatus;
1246             } else if(RaftState.Leader.name().equals(this.role)){
1247                 return true;
1248             }
1249
1250             return false;
1251         }
1252
1253         boolean setLeaderId(String leaderId) {
1254             boolean changed = !Objects.equal(this.leaderId, leaderId);
1255             this.leaderId = leaderId;
1256             if(leaderId != null) {
1257                 this.leaderAvailable = true;
1258             }
1259             notifyOnShardInitializedCallbacks();
1260
1261             return changed;
1262         }
1263
1264         String getLeaderId() {
1265             return leaderId;
1266         }
1267
1268         void setLeaderAvailable(boolean leaderAvailable) {
1269             this.leaderAvailable = leaderAvailable;
1270         }
1271
1272         short getLeaderVersion() {
1273             return leaderVersion;
1274         }
1275
1276         void setLeaderVersion(short leaderVersion) {
1277             this.leaderVersion = leaderVersion;
1278         }
1279
1280         boolean isActiveMember() {
1281             return isActiveMember;
1282         }
1283
1284         void setActiveMember(boolean isActiveMember) {
1285             this.isActiveMember = isActiveMember;
1286         }
1287     }
1288
1289     private static class OnShardInitialized {
1290         private final Runnable replyRunnable;
1291         private Cancellable timeoutSchedule;
1292
1293         OnShardInitialized(Runnable replyRunnable) {
1294             this.replyRunnable = replyRunnable;
1295         }
1296
1297         Runnable getReplyRunnable() {
1298             return replyRunnable;
1299         }
1300
1301         Cancellable getTimeoutSchedule() {
1302             return timeoutSchedule;
1303         }
1304
1305         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1306             this.timeoutSchedule = timeoutSchedule;
1307         }
1308     }
1309
1310     private static class OnShardReady extends OnShardInitialized {
1311         OnShardReady(Runnable replyRunnable) {
1312             super(replyRunnable);
1313         }
1314     }
1315
1316     private static class ShardNotInitializedTimeout {
1317         private final ActorRef sender;
1318         private final ShardInformation shardInfo;
1319         private final OnShardInitialized onShardInitialized;
1320
1321         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1322             this.sender = sender;
1323             this.shardInfo = shardInfo;
1324             this.onShardInitialized = onShardInitialized;
1325         }
1326
1327         ActorRef getSender() {
1328             return sender;
1329         }
1330
1331         ShardInformation getShardInfo() {
1332             return shardInfo;
1333         }
1334
1335         OnShardInitialized getOnShardInitialized() {
1336             return onShardInitialized;
1337         }
1338     }
1339
1340     /**
1341      * We no longer persist SchemaContextModules but keep this class around for now for backwards
1342      * compatibility so we don't get de-serialization failures on upgrade from Helium.
1343      */
1344     @Deprecated
1345     static class SchemaContextModules implements Serializable {
1346         private static final long serialVersionUID = -8884620101025936590L;
1347
1348         private final Set<String> modules;
1349
1350         SchemaContextModules(Set<String> modules){
1351             this.modules = modules;
1352         }
1353
1354         public Set<String> getModules() {
1355             return modules;
1356         }
1357     }
1358
1359     public static Builder builder() {
1360         return new Builder();
1361     }
1362
1363     public static class Builder {
1364         private ClusterWrapper cluster;
1365         private Configuration configuration;
1366         private DatastoreContextFactory datastoreContextFactory;
1367         private CountDownLatch waitTillReadyCountdownLatch;
1368         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1369         private DatastoreSnapshot restoreFromSnapshot;
1370         private volatile boolean sealed;
1371
1372         protected void checkSealed() {
1373             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1374         }
1375
1376         public Builder cluster(ClusterWrapper cluster) {
1377             checkSealed();
1378             this.cluster = cluster;
1379             return this;
1380         }
1381
1382         public Builder configuration(Configuration configuration) {
1383             checkSealed();
1384             this.configuration = configuration;
1385             return this;
1386         }
1387
1388         public Builder datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1389             checkSealed();
1390             this.datastoreContextFactory = datastoreContextFactory;
1391             return this;
1392         }
1393
1394         public Builder waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1395             checkSealed();
1396             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1397             return this;
1398         }
1399
1400         public Builder primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1401             checkSealed();
1402             this.primaryShardInfoCache = primaryShardInfoCache;
1403             return this;
1404         }
1405
1406         public Builder restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1407             checkSealed();
1408             this.restoreFromSnapshot = restoreFromSnapshot;
1409             return this;
1410         }
1411
1412         protected void verify() {
1413             sealed = true;
1414             Preconditions.checkNotNull(cluster, "cluster should not be null");
1415             Preconditions.checkNotNull(configuration, "configuration should not be null");
1416             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1417             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1418             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1419         }
1420
1421         public Props props() {
1422             verify();
1423             return Props.create(ShardManager.class, this);
1424         }
1425     }
1426 }
1427
1428
1429