Bug 3020: Use leader version in LeaderStateChanged
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.Props;
17 import akka.actor.SupervisorStrategy;
18 import akka.cluster.ClusterEvent;
19 import akka.japi.Creator;
20 import akka.japi.Function;
21 import akka.persistence.RecoveryCompleted;
22 import akka.serialization.Serialization;
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Objects;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Preconditions;
27 import com.google.common.base.Strings;
28 import com.google.common.base.Supplier;
29 import com.google.common.collect.Sets;
30 import java.io.Serializable;
31 import java.util.ArrayList;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.CountDownLatch;
38 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
43 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
44 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
45 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
46 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
47 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
48 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
49 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
50 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
51 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
52 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
53 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
54 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
56 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
57 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
58 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
59 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
60 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
61 import org.opendaylight.controller.cluster.raft.RaftState;
62 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
63 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
64 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67 import scala.concurrent.duration.Duration;
68
69 /**
70  * The ShardManager has the following jobs,
71  * <ul>
72  * <li> Create all the local shard replicas that belong on this cluster member
73  * <li> Find the address of the local shard
74  * <li> Find the primary replica for any given shard
75  * <li> Monitor the cluster members and store their addresses
76  * <ul>
77  */
78 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
79
80     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
81
82     // Stores a mapping between a member name and the address of the member
83     // Member names look like "member-1", "member-2" etc and are as specified
84     // in configuration
85     private final Map<String, Address> memberNameToAddress = new HashMap<>();
86
87     // Stores a mapping between a shard name and it's corresponding information
88     // Shard names look like inventory, topology etc and are as specified in
89     // configuration
90     private final Map<String, ShardInformation> localShards = new HashMap<>();
91
92     // The type of a ShardManager reflects the type of the datastore itself
93     // A data store could be of type config/operational
94     private final String type;
95
96     private final String shardManagerIdentifierString;
97
98     private final ClusterWrapper cluster;
99
100     private final Configuration configuration;
101
102     private final String shardDispatcherPath;
103
104     private ShardManagerInfo mBean;
105
106     private DatastoreContext datastoreContext;
107
108     private final CountDownLatch waitTillReadyCountdownLatch;
109
110     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
111
112     /**
113      */
114     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
115             DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
116             PrimaryShardInfoFutureCache primaryShardInfoCache) {
117
118         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
119         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
120         this.datastoreContext = datastoreContext;
121         this.type = datastoreContext.getDataStoreType();
122         this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
123         this.shardDispatcherPath =
124                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
125         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
126         this.primaryShardInfoCache = primaryShardInfoCache;
127
128         // Subscribe this actor to cluster member events
129         cluster.subscribeToMemberEvents(getSelf());
130
131         createLocalShards();
132     }
133
134     public static Props props(
135         final ClusterWrapper cluster,
136         final Configuration configuration,
137         final DatastoreContext datastoreContext,
138         final CountDownLatch waitTillReadyCountdownLatch,
139         final PrimaryShardInfoFutureCache primaryShardInfoCache) {
140
141         Preconditions.checkNotNull(cluster, "cluster should not be null");
142         Preconditions.checkNotNull(configuration, "configuration should not be null");
143         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
144         Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
145
146         return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
147                 waitTillReadyCountdownLatch, primaryShardInfoCache));
148     }
149
150     @Override
151     public void postStop() {
152         LOG.info("Stopping ShardManager");
153
154         mBean.unregisterMBean();
155     }
156
157     @Override
158     public void handleCommand(Object message) throws Exception {
159         if (message  instanceof FindPrimary) {
160             findPrimary((FindPrimary)message);
161         } else if(message instanceof FindLocalShard){
162             findLocalShard((FindLocalShard) message);
163         } else if (message instanceof UpdateSchemaContext) {
164             updateSchemaContext(message);
165         } else if(message instanceof ActorInitialized) {
166             onActorInitialized(message);
167         } else if (message instanceof ClusterEvent.MemberUp){
168             memberUp((ClusterEvent.MemberUp) message);
169         } else if(message instanceof ClusterEvent.MemberRemoved) {
170             memberRemoved((ClusterEvent.MemberRemoved) message);
171         } else if(message instanceof ClusterEvent.UnreachableMember) {
172             memberUnreachable((ClusterEvent.UnreachableMember)message);
173         } else if(message instanceof ClusterEvent.ReachableMember) {
174             memberReachable((ClusterEvent.ReachableMember) message);
175         } else if(message instanceof DatastoreContext) {
176             onDatastoreContext((DatastoreContext)message);
177         } else if(message instanceof RoleChangeNotification) {
178             onRoleChangeNotification((RoleChangeNotification) message);
179         } else if(message instanceof FollowerInitialSyncUpStatus){
180             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
181         } else if(message instanceof ShardNotInitializedTimeout) {
182             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
183         } else if(message instanceof ShardLeaderStateChanged) {
184             onLeaderStateChanged((ShardLeaderStateChanged)message);
185         } else {
186             unknownMessage(message);
187         }
188
189     }
190
191     private void checkReady(){
192         if (isReadyWithLeaderId()) {
193             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
194                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
195
196             waitTillReadyCountdownLatch.countDown();
197         }
198     }
199
200     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
201         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
202
203         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
204         if(shardInformation != null) {
205             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
206             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
207             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
208                 primaryShardInfoCache.remove(shardInformation.getShardName());
209             }
210
211             checkReady();
212         } else {
213             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
214         }
215     }
216
217     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
218         ShardInformation shardInfo = message.getShardInfo();
219
220         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
221                 shardInfo.getShardName());
222
223         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
224
225         if(!shardInfo.isShardInitialized()) {
226             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
227             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
228         } else {
229             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
230             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
231         }
232     }
233
234     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
235         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
236                 status.getName(), status.isInitialSyncDone());
237
238         ShardInformation shardInformation = findShardInformation(status.getName());
239
240         if(shardInformation != null) {
241             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
242
243             mBean.setSyncStatus(isInSync());
244         }
245
246     }
247
248     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
249         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
250                 roleChanged.getOldRole(), roleChanged.getNewRole());
251
252         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
253         if(shardInformation != null) {
254             shardInformation.setRole(roleChanged.getNewRole());
255             checkReady();
256             mBean.setSyncStatus(isInSync());
257         }
258     }
259
260
261     private ShardInformation findShardInformation(String memberId) {
262         for(ShardInformation info : localShards.values()){
263             if(info.getShardId().toString().equals(memberId)){
264                 return info;
265             }
266         }
267
268         return null;
269     }
270
271     private boolean isReadyWithLeaderId() {
272         boolean isReady = true;
273         for (ShardInformation info : localShards.values()) {
274             if(!info.isShardReadyWithLeaderId()){
275                 isReady = false;
276                 break;
277             }
278         }
279         return isReady;
280     }
281
282     private boolean isInSync(){
283         for (ShardInformation info : localShards.values()) {
284             if(!info.isInSync()){
285                 return false;
286             }
287         }
288         return true;
289     }
290
291     private void onActorInitialized(Object message) {
292         final ActorRef sender = getSender();
293
294         if (sender == null) {
295             return; //why is a non-actor sending this message? Just ignore.
296         }
297
298         String actorName = sender.path().name();
299         //find shard name from actor name; actor name is stringified shardId
300         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
301
302         if (shardId.getShardName() == null) {
303             return;
304         }
305
306         markShardAsInitialized(shardId.getShardName());
307     }
308
309     private void markShardAsInitialized(String shardName) {
310         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
311
312         ShardInformation shardInformation = localShards.get(shardName);
313         if (shardInformation != null) {
314             shardInformation.setActorInitialized();
315
316             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
317         }
318     }
319
320     @Override
321     protected void handleRecover(Object message) throws Exception {
322         if (message instanceof RecoveryCompleted) {
323             LOG.info("Recovery complete : {}", persistenceId());
324
325             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
326             // journal on upgrade from Helium.
327             deleteMessages(lastSequenceNr());
328         }
329     }
330
331     private void findLocalShard(FindLocalShard message) {
332         final ShardInformation shardInformation = localShards.get(message.getShardName());
333
334         if(shardInformation == null){
335             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
336             return;
337         }
338
339         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
340             @Override
341             public Object get() {
342                 return new LocalShardFound(shardInformation.getActor());
343             }
344         });
345     }
346
347     private void sendResponse(ShardInformation shardInformation, boolean doWait,
348             boolean wantShardReady, final Supplier<Object> messageSupplier) {
349         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
350             if(doWait) {
351                 final ActorRef sender = getSender();
352                 final ActorRef self = self();
353
354                 Runnable replyRunnable = new Runnable() {
355                     @Override
356                     public void run() {
357                         sender.tell(messageSupplier.get(), self);
358                     }
359                 };
360
361                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
362                     new OnShardInitialized(replyRunnable);
363
364                 shardInformation.addOnShardInitialized(onShardInitialized);
365
366                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
367
368                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
369                         datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
370                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
371                         getContext().dispatcher(), getSelf());
372
373                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
374
375             } else if (!shardInformation.isShardInitialized()) {
376                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
377                         shardInformation.getShardName());
378                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
379             } else {
380                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
381                         shardInformation.getShardName());
382                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
383             }
384
385             return;
386         }
387
388         getSender().tell(messageSupplier.get(), getSelf());
389     }
390
391     private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
392         return new NoShardLeaderException(String.format(
393                 "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
394                 "recovering and a leader is being elected. Try again later.", shardId));
395     }
396
397     private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
398         return new NotInitializedException(String.format(
399                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
400     }
401
402     private void memberRemoved(ClusterEvent.MemberRemoved message) {
403         String memberName = message.member().roles().head();
404
405         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
406                 message.member().address());
407
408         memberNameToAddress.remove(message.member().roles().head());
409     }
410
411     private void memberUp(ClusterEvent.MemberUp message) {
412         String memberName = message.member().roles().head();
413
414         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
415                 message.member().address());
416
417         memberNameToAddress.put(memberName, message.member().address());
418
419         for(ShardInformation info : localShards.values()){
420             String shardName = info.getShardName();
421             info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
422                 getShardActorPath(shardName, memberName), getSelf());
423         }
424
425         checkReady();
426     }
427
428     private void memberReachable(ClusterEvent.ReachableMember message) {
429         String memberName = message.member().roles().head();
430         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
431
432         markMemberAvailable(memberName);
433     }
434
435     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
436         String memberName = message.member().roles().head();
437         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
438
439         markMemberUnavailable(memberName);
440     }
441
442     private void markMemberUnavailable(final String memberName) {
443         for(ShardInformation info : localShards.values()){
444             String leaderId = info.getLeaderId();
445             if(leaderId != null && leaderId.contains(memberName)) {
446                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
447                 info.setLeaderAvailable(false);
448
449                 primaryShardInfoCache.remove(info.getShardName());
450             }
451         }
452     }
453
454     private void markMemberAvailable(final String memberName) {
455         for(ShardInformation info : localShards.values()){
456             String leaderId = info.getLeaderId();
457             if(leaderId != null && leaderId.contains(memberName)) {
458                 LOG.debug("Marking Leader {} as available.", leaderId);
459                 info.setLeaderAvailable(true);
460             }
461         }
462     }
463
464     private void onDatastoreContext(DatastoreContext context) {
465         datastoreContext = context;
466         for (ShardInformation info : localShards.values()) {
467             if (info.getActor() != null) {
468                 info.getActor().tell(datastoreContext, getSelf());
469             }
470         }
471     }
472
473     /**
474      * Notifies all the local shards of a change in the schema context
475      *
476      * @param message
477      */
478     private void updateSchemaContext(final Object message) {
479         final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
480
481         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
482
483         for (ShardInformation info : localShards.values()) {
484             if (info.getActor() == null) {
485                 LOG.debug("Creating Shard {}", info.getShardId());
486                 info.setActor(newShardActor(schemaContext, info));
487             } else {
488                 info.getActor().tell(message, getSelf());
489             }
490         }
491     }
492
493     @VisibleForTesting
494     protected ClusterWrapper getCluster() {
495         return cluster;
496     }
497
498     @VisibleForTesting
499     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
500         return getContext().actorOf(Shard.props(info.getShardId(),
501                 info.getPeerAddresses(), datastoreContext, schemaContext)
502                         .withDispatcher(shardDispatcherPath), info.getShardId().toString());
503     }
504
505     private void findPrimary(FindPrimary message) {
506         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
507
508         final String shardName = message.getShardName();
509         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
510
511         // First see if the there is a local replica for the shard
512         final ShardInformation info = localShards.get(shardName);
513         if (info != null) {
514             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
515                 @Override
516                 public Object get() {
517                     String primaryPath = info.getSerializedLeaderActor();
518                     Object found = canReturnLocalShardState && info.isLeader() ?
519                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
520                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
521
522                     if(LOG.isDebugEnabled()) {
523                         LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
524                     }
525
526                     return found;
527                 }
528             });
529
530             return;
531         }
532
533         for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
534             if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
535                 String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
536
537                 LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
538                         shardName, path);
539
540                 getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
541                         message.isWaitUntilReady()), getContext());
542                 return;
543             }
544         }
545
546         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
547
548         getSender().tell(new PrimaryNotFoundException(
549                 String.format("No primary shard found for %s.", shardName)), getSelf());
550     }
551
552     private StringBuilder getShardManagerActorPathBuilder(Address address) {
553         StringBuilder builder = new StringBuilder();
554         builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
555         return builder;
556     }
557
558     private String getShardActorPath(String shardName, String memberName) {
559         Address address = memberNameToAddress.get(memberName);
560         if(address != null) {
561             StringBuilder builder = getShardManagerActorPathBuilder(address);
562             builder.append("/")
563                 .append(getShardIdentifier(memberName, shardName));
564             return builder.toString();
565         }
566         return null;
567     }
568
569     /**
570      * Construct the name of the shard actor given the name of the member on
571      * which the shard resides and the name of the shard
572      *
573      * @param memberName
574      * @param shardName
575      * @return
576      */
577     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
578         return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
579     }
580
581     /**
582      * Create shards that are local to the member on which the ShardManager
583      * runs
584      *
585      */
586     private void createLocalShards() {
587         String memberName = this.cluster.getCurrentMemberName();
588         List<String> memberShardNames =
589             this.configuration.getMemberShardNames(memberName);
590
591         List<String> localShardActorNames = new ArrayList<>();
592         for(String shardName : memberShardNames){
593             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
594             Map<String, String> peerAddresses = getPeerAddresses(shardName);
595             localShardActorNames.add(shardId.toString());
596             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
597         }
598
599         mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
600                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
601     }
602
603     /**
604      * Given the name of the shard find the addresses of all it's peers
605      *
606      * @param shardName
607      * @return
608      */
609     private Map<String, String> getPeerAddresses(String shardName){
610
611         Map<String, String> peerAddresses = new HashMap<>();
612
613         List<String> members = this.configuration.getMembersFromShardName(shardName);
614
615         String currentMemberName = this.cluster.getCurrentMemberName();
616
617         for(String memberName : members){
618             if(!currentMemberName.equals(memberName)){
619                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
620                 String path = getShardActorPath(shardName, currentMemberName);
621                 peerAddresses.put(shardId.toString(), path);
622             }
623         }
624         return peerAddresses;
625     }
626
627     @Override
628     public SupervisorStrategy supervisorStrategy() {
629
630         return new OneForOneStrategy(10, Duration.create("1 minute"),
631             new Function<Throwable, SupervisorStrategy.Directive>() {
632                 @Override
633                 public SupervisorStrategy.Directive apply(Throwable t) {
634                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
635                     return SupervisorStrategy.resume();
636                 }
637             }
638         );
639
640     }
641
642     @Override
643     public String persistenceId() {
644         return "shard-manager-" + type;
645     }
646
647     @VisibleForTesting
648     ShardManagerInfoMBean getMBean(){
649         return mBean;
650     }
651
652     @VisibleForTesting
653     protected static class ShardInformation {
654         private final ShardIdentifier shardId;
655         private final String shardName;
656         private ActorRef actor;
657         private ActorPath actorPath;
658         private final Map<String, String> peerAddresses;
659         private Optional<DataTree> localShardDataTree;
660         private boolean leaderAvailable = false;
661
662         // flag that determines if the actor is ready for business
663         private boolean actorInitialized = false;
664
665         private boolean followerSyncStatus = false;
666
667         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
668         private String role ;
669         private String leaderId;
670         private short leaderVersion;
671
672         private ShardInformation(String shardName, ShardIdentifier shardId,
673                 Map<String, String> peerAddresses) {
674             this.shardName = shardName;
675             this.shardId = shardId;
676             this.peerAddresses = peerAddresses;
677         }
678
679         String getShardName() {
680             return shardName;
681         }
682
683         ActorRef getActor(){
684             return actor;
685         }
686
687         ActorPath getActorPath() {
688             return actorPath;
689         }
690
691         void setActor(ActorRef actor) {
692             this.actor = actor;
693             this.actorPath = actor.path();
694         }
695
696         ShardIdentifier getShardId() {
697             return shardId;
698         }
699
700         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
701             this.localShardDataTree = localShardDataTree;
702         }
703
704         Optional<DataTree> getLocalShardDataTree() {
705             return localShardDataTree;
706         }
707
708         Map<String, String> getPeerAddresses() {
709             return peerAddresses;
710         }
711
712         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
713             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
714                 peerAddress);
715             if(peerAddresses.containsKey(peerId)){
716                 peerAddresses.put(peerId, peerAddress);
717
718                 if(actor != null) {
719                     if(LOG.isDebugEnabled()) {
720                         LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
721                                 peerId, peerAddress, actor.path());
722                     }
723
724                     actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender);
725                 }
726
727                 notifyOnShardInitializedCallbacks();
728             }
729         }
730
731         boolean isShardReady() {
732             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
733         }
734
735         boolean isShardReadyWithLeaderId() {
736             return leaderAvailable && isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
737         }
738
739         boolean isShardInitialized() {
740             return getActor() != null && actorInitialized;
741         }
742
743         boolean isLeader() {
744             return Objects.equal(leaderId, shardId.toString());
745         }
746
747         String getSerializedLeaderActor() {
748             if(isLeader()) {
749                 return Serialization.serializedActorPath(getActor());
750             } else {
751                 return peerAddresses.get(leaderId);
752             }
753         }
754
755         void setActorInitialized() {
756             LOG.debug("Shard {} is initialized", shardId);
757
758             this.actorInitialized = true;
759
760             notifyOnShardInitializedCallbacks();
761         }
762
763         private void notifyOnShardInitializedCallbacks() {
764             if(onShardInitializedSet.isEmpty()) {
765                 return;
766             }
767
768             boolean ready = isShardReadyWithLeaderId();
769
770             if(LOG.isDebugEnabled()) {
771                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
772                         ready ? "ready" : "initialized", onShardInitializedSet.size());
773             }
774
775             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
776             while(iter.hasNext()) {
777                 OnShardInitialized onShardInitialized = iter.next();
778                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
779                     iter.remove();
780                     onShardInitialized.getTimeoutSchedule().cancel();
781                     onShardInitialized.getReplyRunnable().run();
782                 }
783             }
784         }
785
786         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
787             onShardInitializedSet.add(onShardInitialized);
788         }
789
790         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
791             onShardInitializedSet.remove(onShardInitialized);
792         }
793
794         void setRole(String newRole) {
795             this.role = newRole;
796
797             notifyOnShardInitializedCallbacks();
798         }
799
800         void setFollowerSyncStatus(boolean syncStatus){
801             this.followerSyncStatus = syncStatus;
802         }
803
804         boolean isInSync(){
805             if(RaftState.Follower.name().equals(this.role)){
806                 return followerSyncStatus;
807             } else if(RaftState.Leader.name().equals(this.role)){
808                 return true;
809             }
810
811             return false;
812         }
813
814         boolean setLeaderId(String leaderId) {
815             boolean changed = !Objects.equal(this.leaderId, leaderId);
816             this.leaderId = leaderId;
817             if(leaderId != null) {
818                 this.leaderAvailable = true;
819             }
820             notifyOnShardInitializedCallbacks();
821
822             return changed;
823         }
824
825         String getLeaderId() {
826             return leaderId;
827         }
828
829         void setLeaderAvailable(boolean leaderAvailable) {
830             this.leaderAvailable = leaderAvailable;
831         }
832
833         short getLeaderVersion() {
834             return leaderVersion;
835         }
836
837         void setLeaderVersion(short leaderVersion) {
838             this.leaderVersion = leaderVersion;
839         }
840     }
841
842     private static class ShardManagerCreator implements Creator<ShardManager> {
843         private static final long serialVersionUID = 1L;
844
845         final ClusterWrapper cluster;
846         final Configuration configuration;
847         final DatastoreContext datastoreContext;
848         private final CountDownLatch waitTillReadyCountdownLatch;
849         private final PrimaryShardInfoFutureCache primaryShardInfoCache;
850
851         ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
852                 CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
853             this.cluster = cluster;
854             this.configuration = configuration;
855             this.datastoreContext = datastoreContext;
856             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
857             this.primaryShardInfoCache = primaryShardInfoCache;
858         }
859
860         @Override
861         public ShardManager create() throws Exception {
862             return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
863                     primaryShardInfoCache);
864         }
865     }
866
867     private static class OnShardInitialized {
868         private final Runnable replyRunnable;
869         private Cancellable timeoutSchedule;
870
871         OnShardInitialized(Runnable replyRunnable) {
872             this.replyRunnable = replyRunnable;
873         }
874
875         Runnable getReplyRunnable() {
876             return replyRunnable;
877         }
878
879         Cancellable getTimeoutSchedule() {
880             return timeoutSchedule;
881         }
882
883         void setTimeoutSchedule(Cancellable timeoutSchedule) {
884             this.timeoutSchedule = timeoutSchedule;
885         }
886     }
887
888     private static class OnShardReady extends OnShardInitialized {
889         OnShardReady(Runnable replyRunnable) {
890             super(replyRunnable);
891         }
892     }
893
894     private static class ShardNotInitializedTimeout {
895         private final ActorRef sender;
896         private final ShardInformation shardInfo;
897         private final OnShardInitialized onShardInitialized;
898
899         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
900             this.sender = sender;
901             this.shardInfo = shardInfo;
902             this.onShardInitialized = onShardInitialized;
903         }
904
905         ActorRef getSender() {
906             return sender;
907         }
908
909         ShardInformation getShardInfo() {
910             return shardInfo;
911         }
912
913         OnShardInitialized getOnShardInitialized() {
914             return onShardInitialized;
915         }
916     }
917
918     /**
919      * We no longer persist SchemaContextModules but keep this class around for now for backwards
920      * compatibility so we don't get de-serialization failures on upgrade from Helium.
921      */
922     @Deprecated
923     static class SchemaContextModules implements Serializable {
924         private static final long serialVersionUID = -8884620101025936590L;
925
926         private final Set<String> modules;
927
928         SchemaContextModules(Set<String> modules){
929             this.modules = modules;
930         }
931
932         public Set<String> getModules() {
933             return modules;
934         }
935     }
936 }
937
938
939