Bug 3194: Dynamically update PrimaryShardInfo cache when leader changes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.Props;
17 import akka.actor.SupervisorStrategy;
18 import akka.cluster.ClusterEvent;
19 import akka.japi.Creator;
20 import akka.japi.Function;
21 import akka.japi.Procedure;
22 import akka.persistence.RecoveryCompleted;
23 import akka.persistence.RecoveryFailure;
24 import akka.serialization.Serialization;
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Objects;
27 import com.google.common.base.Optional;
28 import com.google.common.base.Preconditions;
29 import com.google.common.base.Strings;
30 import com.google.common.base.Supplier;
31 import com.google.common.collect.ImmutableSet;
32 import com.google.common.collect.Sets;
33 import java.io.Serializable;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.Set;
43 import java.util.concurrent.CountDownLatch;
44 import org.opendaylight.controller.cluster.DataPersistenceProvider;
45 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
46 import org.opendaylight.controller.cluster.PersistentDataProvider;
47 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
51 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
52 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
53 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
54 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
55 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
56 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
57 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
58 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
59 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
60 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
61 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
62 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
63 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
64 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
65 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
66 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
67 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
68 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
69 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
70 import org.opendaylight.controller.cluster.raft.RaftState;
71 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
72 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
73 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
74 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 import org.slf4j.Logger;
76 import org.slf4j.LoggerFactory;
77 import scala.concurrent.duration.Duration;
78
79 /**
80  * The ShardManager has the following jobs,
81  * <ul>
82  * <li> Create all the local shard replicas that belong on this cluster member
83  * <li> Find the address of the local shard
84  * <li> Find the primary replica for any given shard
85  * <li> Monitor the cluster members and store their addresses
86  * <ul>
87  */
88 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
89
90     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
91
92     // Stores a mapping between a member name and the address of the member
93     // Member names look like "member-1", "member-2" etc and are as specified
94     // in configuration
95     private final Map<String, Address> memberNameToAddress = new HashMap<>();
96
97     // Stores a mapping between a shard name and it's corresponding information
98     // Shard names look like inventory, topology etc and are as specified in
99     // configuration
100     private final Map<String, ShardInformation> localShards = new HashMap<>();
101
102     // The type of a ShardManager reflects the type of the datastore itself
103     // A data store could be of type config/operational
104     private final String type;
105
106     private final String shardManagerIdentifierString;
107
108     private final ClusterWrapper cluster;
109
110     private final Configuration configuration;
111
112     private final String shardDispatcherPath;
113
114     private ShardManagerInfo mBean;
115
116     private DatastoreContext datastoreContext;
117
118     private Collection<String> knownModules = Collections.emptySet();
119
120     private final DataPersistenceProvider dataPersistenceProvider;
121
122     private final CountDownLatch waitTillReadyCountdownLatch;
123
124     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
125
126     /**
127      */
128     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
129             DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
130             PrimaryShardInfoFutureCache primaryShardInfoCache) {
131
132         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
133         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
134         this.datastoreContext = datastoreContext;
135         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
136         this.type = datastoreContext.getDataStoreType();
137         this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
138         this.shardDispatcherPath =
139                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
140         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
141         this.primaryShardInfoCache = primaryShardInfoCache;
142
143         // Subscribe this actor to cluster member events
144         cluster.subscribeToMemberEvents(getSelf());
145
146         createLocalShards();
147     }
148
149     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
150         return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider();
151     }
152
153     public static Props props(
154         final ClusterWrapper cluster,
155         final Configuration configuration,
156         final DatastoreContext datastoreContext,
157         final CountDownLatch waitTillReadyCountdownLatch,
158         final PrimaryShardInfoFutureCache primaryShardInfoCache) {
159
160         Preconditions.checkNotNull(cluster, "cluster should not be null");
161         Preconditions.checkNotNull(configuration, "configuration should not be null");
162         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
163         Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
164
165         return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
166                 waitTillReadyCountdownLatch, primaryShardInfoCache));
167     }
168
169     @Override
170     public void postStop() {
171         LOG.info("Stopping ShardManager");
172
173         mBean.unregisterMBean();
174     }
175
176     @Override
177     public void handleCommand(Object message) throws Exception {
178         if (message  instanceof FindPrimary) {
179             findPrimary((FindPrimary)message);
180         } else if(message instanceof FindLocalShard){
181             findLocalShard((FindLocalShard) message);
182         } else if (message instanceof UpdateSchemaContext) {
183             updateSchemaContext(message);
184         } else if(message instanceof ActorInitialized) {
185             onActorInitialized(message);
186         } else if (message instanceof ClusterEvent.MemberUp){
187             memberUp((ClusterEvent.MemberUp) message);
188         } else if(message instanceof ClusterEvent.MemberRemoved) {
189             memberRemoved((ClusterEvent.MemberRemoved) message);
190         } else if(message instanceof ClusterEvent.UnreachableMember) {
191             ignoreMessage(message);
192         } else if(message instanceof DatastoreContext) {
193             onDatastoreContext((DatastoreContext)message);
194         } else if(message instanceof RoleChangeNotification) {
195             onRoleChangeNotification((RoleChangeNotification) message);
196         } else if(message instanceof FollowerInitialSyncUpStatus){
197             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
198         } else if(message instanceof ShardNotInitializedTimeout) {
199             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
200         } else if(message instanceof ShardLeaderStateChanged) {
201             onLeaderStateChanged((ShardLeaderStateChanged)message);
202         } else {
203             unknownMessage(message);
204         }
205
206     }
207
208     private void checkReady(){
209         if (isReadyWithLeaderId()) {
210             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
211                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
212
213             waitTillReadyCountdownLatch.countDown();
214         }
215     }
216
217     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
218         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
219
220         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
221         if(shardInformation != null) {
222             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
223             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
224                 primaryShardInfoCache.remove(shardInformation.getShardName());
225             }
226
227             checkReady();
228         } else {
229             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
230         }
231     }
232
233     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
234         ShardInformation shardInfo = message.getShardInfo();
235
236         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
237                 shardInfo.getShardName());
238
239         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
240
241         if(!shardInfo.isShardInitialized()) {
242             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
243             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
244         } else {
245             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
246             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
247         }
248     }
249
250     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
251         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
252                 status.getName(), status.isInitialSyncDone());
253
254         ShardInformation shardInformation = findShardInformation(status.getName());
255
256         if(shardInformation != null) {
257             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
258
259             mBean.setSyncStatus(isInSync());
260         }
261
262     }
263
264     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
265         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
266                 roleChanged.getOldRole(), roleChanged.getNewRole());
267
268         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
269         if(shardInformation != null) {
270             shardInformation.setRole(roleChanged.getNewRole());
271             checkReady();
272             mBean.setSyncStatus(isInSync());
273         }
274     }
275
276
277     private ShardInformation findShardInformation(String memberId) {
278         for(ShardInformation info : localShards.values()){
279             if(info.getShardId().toString().equals(memberId)){
280                 return info;
281             }
282         }
283
284         return null;
285     }
286
287     private boolean isReadyWithLeaderId() {
288         boolean isReady = true;
289         for (ShardInformation info : localShards.values()) {
290             if(!info.isShardReadyWithLeaderId()){
291                 isReady = false;
292                 break;
293             }
294         }
295         return isReady;
296     }
297
298     private boolean isInSync(){
299         for (ShardInformation info : localShards.values()) {
300             if(!info.isInSync()){
301                 return false;
302             }
303         }
304         return true;
305     }
306
307     private void onActorInitialized(Object message) {
308         final ActorRef sender = getSender();
309
310         if (sender == null) {
311             return; //why is a non-actor sending this message? Just ignore.
312         }
313
314         String actorName = sender.path().name();
315         //find shard name from actor name; actor name is stringified shardId
316         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
317
318         if (shardId.getShardName() == null) {
319             return;
320         }
321
322         markShardAsInitialized(shardId.getShardName());
323     }
324
325     private void markShardAsInitialized(String shardName) {
326         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
327
328         ShardInformation shardInformation = localShards.get(shardName);
329         if (shardInformation != null) {
330             shardInformation.setActorInitialized();
331
332             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
333         }
334     }
335
336     @Override
337     protected void handleRecover(Object message) throws Exception {
338         if(dataPersistenceProvider.isRecoveryApplicable()) {
339             if (message instanceof SchemaContextModules) {
340                 SchemaContextModules msg = (SchemaContextModules) message;
341                 knownModules = ImmutableSet.copyOf(msg.getModules());
342             } else if (message instanceof RecoveryFailure) {
343                 RecoveryFailure failure = (RecoveryFailure) message;
344                 LOG.error("Recovery failed", failure.cause());
345             } else if (message instanceof RecoveryCompleted) {
346                 LOG.info("Recovery complete : {}", persistenceId());
347
348                 // Delete all the messages from the akka journal except the last one
349                 deleteMessages(lastSequenceNr() - 1);
350             }
351         } else {
352             if (message instanceof RecoveryCompleted) {
353                 LOG.info("Recovery complete : {}", persistenceId());
354
355                 // Delete all the messages from the akka journal
356                 deleteMessages(lastSequenceNr());
357             }
358         }
359     }
360
361     private void findLocalShard(FindLocalShard message) {
362         final ShardInformation shardInformation = localShards.get(message.getShardName());
363
364         if(shardInformation == null){
365             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
366             return;
367         }
368
369         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
370             @Override
371             public Object get() {
372                 return new LocalShardFound(shardInformation.getActor());
373             }
374         });
375     }
376
377     private void sendResponse(ShardInformation shardInformation, boolean doWait,
378             boolean wantShardReady, final Supplier<Object> messageSupplier) {
379         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
380             if(doWait) {
381                 final ActorRef sender = getSender();
382                 final ActorRef self = self();
383
384                 Runnable replyRunnable = new Runnable() {
385                     @Override
386                     public void run() {
387                         sender.tell(messageSupplier.get(), self);
388                     }
389                 };
390
391                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
392                     new OnShardInitialized(replyRunnable);
393
394                 shardInformation.addOnShardInitialized(onShardInitialized);
395
396                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
397
398                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
399                         datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
400                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
401                         getContext().dispatcher(), getSelf());
402
403                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
404
405             } else if (!shardInformation.isShardInitialized()) {
406                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
407                         shardInformation.getShardName());
408                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
409             } else {
410                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
411                         shardInformation.getShardName());
412                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
413             }
414
415             return;
416         }
417
418         getSender().tell(messageSupplier.get(), getSelf());
419     }
420
421     private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
422         return new NoShardLeaderException(String.format(
423                 "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
424                 "recovering and a leader is being elected. Try again later.", shardId));
425     }
426
427     private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
428         return new NotInitializedException(String.format(
429                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
430     }
431
432     private void memberRemoved(ClusterEvent.MemberRemoved message) {
433         String memberName = message.member().roles().head();
434
435         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
436                 message.member().address());
437
438         memberNameToAddress.remove(message.member().roles().head());
439     }
440
441     private void memberUp(ClusterEvent.MemberUp message) {
442         String memberName = message.member().roles().head();
443
444         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
445                 message.member().address());
446
447         memberNameToAddress.put(memberName, message.member().address());
448
449         for(ShardInformation info : localShards.values()){
450             String shardName = info.getShardName();
451             info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
452                 getShardActorPath(shardName, memberName), getSelf());
453         }
454
455         checkReady();
456     }
457
458     private void onDatastoreContext(DatastoreContext context) {
459         datastoreContext = context;
460         for (ShardInformation info : localShards.values()) {
461             if (info.getActor() != null) {
462                 info.getActor().tell(datastoreContext, getSelf());
463             }
464         }
465     }
466
467     /**
468      * Notifies all the local shards of a change in the schema context
469      *
470      * @param message
471      */
472     private void updateSchemaContext(final Object message) {
473         final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
474
475         Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
476         Set<String> newModules = new HashSet<>(128);
477
478         for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
479             String s = moduleIdentifier.getNamespace().toString();
480             newModules.add(s);
481         }
482
483         if(newModules.containsAll(knownModules)) {
484
485             LOG.debug("New SchemaContext has a super set of current knownModules - persisting info");
486
487             knownModules = ImmutableSet.copyOf(newModules);
488
489             dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
490
491                 @Override
492                 public void apply(SchemaContextModules param) throws Exception {
493                     LOG.debug("Sending new SchemaContext to Shards");
494                     for (ShardInformation info : localShards.values()) {
495                         if (info.getActor() == null) {
496                             info.setActor(newShardActor(schemaContext, info));
497                         } else {
498                             info.getActor().tell(message, getSelf());
499                         }
500                     }
501                 }
502
503             });
504         } else {
505             LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}",
506                     newModules, knownModules);
507         }
508
509     }
510
511     @VisibleForTesting
512     protected ClusterWrapper getCluster() {
513         return cluster;
514     }
515
516     @VisibleForTesting
517     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
518         return getContext().actorOf(Shard.props(info.getShardId(),
519                 info.getPeerAddresses(), datastoreContext, schemaContext)
520                         .withDispatcher(shardDispatcherPath), info.getShardId().toString());
521     }
522
523     private void findPrimary(FindPrimary message) {
524         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
525
526         final String shardName = message.getShardName();
527         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
528
529         // First see if the there is a local replica for the shard
530         final ShardInformation info = localShards.get(shardName);
531         if (info != null) {
532             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
533                 @Override
534                 public Object get() {
535                     String primaryPath = info.getSerializedLeaderActor();
536                     Object found = canReturnLocalShardState && info.isLeader() ?
537                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
538                                 new RemotePrimaryShardFound(primaryPath);
539
540                     if(LOG.isDebugEnabled()) {
541                         LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
542                     }
543
544                     return found;
545                 }
546             });
547
548             return;
549         }
550
551         for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
552             if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
553                 String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
554
555                 LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
556                         shardName, path);
557
558                 getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
559                         message.isWaitUntilReady()), getContext());
560                 return;
561             }
562         }
563
564         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
565
566         getSender().tell(new PrimaryNotFoundException(
567                 String.format("No primary shard found for %s.", shardName)), getSelf());
568     }
569
570     private StringBuilder getShardManagerActorPathBuilder(Address address) {
571         StringBuilder builder = new StringBuilder();
572         builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
573         return builder;
574     }
575
576     private String getShardActorPath(String shardName, String memberName) {
577         Address address = memberNameToAddress.get(memberName);
578         if(address != null) {
579             StringBuilder builder = getShardManagerActorPathBuilder(address);
580             builder.append("/")
581                 .append(getShardIdentifier(memberName, shardName));
582             return builder.toString();
583         }
584         return null;
585     }
586
587     /**
588      * Construct the name of the shard actor given the name of the member on
589      * which the shard resides and the name of the shard
590      *
591      * @param memberName
592      * @param shardName
593      * @return
594      */
595     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
596         return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
597     }
598
599     /**
600      * Create shards that are local to the member on which the ShardManager
601      * runs
602      *
603      */
604     private void createLocalShards() {
605         String memberName = this.cluster.getCurrentMemberName();
606         List<String> memberShardNames =
607             this.configuration.getMemberShardNames(memberName);
608
609         List<String> localShardActorNames = new ArrayList<>();
610         for(String shardName : memberShardNames){
611             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
612             Map<String, String> peerAddresses = getPeerAddresses(shardName);
613             localShardActorNames.add(shardId.toString());
614             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
615         }
616
617         mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
618                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
619     }
620
621     /**
622      * Given the name of the shard find the addresses of all it's peers
623      *
624      * @param shardName
625      * @return
626      */
627     private Map<String, String> getPeerAddresses(String shardName){
628
629         Map<String, String> peerAddresses = new HashMap<>();
630
631         List<String> members = this.configuration.getMembersFromShardName(shardName);
632
633         String currentMemberName = this.cluster.getCurrentMemberName();
634
635         for(String memberName : members){
636             if(!currentMemberName.equals(memberName)){
637                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
638                 String path = getShardActorPath(shardName, currentMemberName);
639                 peerAddresses.put(shardId.toString(), path);
640             }
641         }
642         return peerAddresses;
643     }
644
645     @Override
646     public SupervisorStrategy supervisorStrategy() {
647
648         return new OneForOneStrategy(10, Duration.create("1 minute"),
649             new Function<Throwable, SupervisorStrategy.Directive>() {
650                 @Override
651                 public SupervisorStrategy.Directive apply(Throwable t) {
652                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
653                     return SupervisorStrategy.resume();
654                 }
655             }
656         );
657
658     }
659
660     @Override
661     public String persistenceId() {
662         return "shard-manager-" + type;
663     }
664
665     @VisibleForTesting
666     Collection<String> getKnownModules() {
667         return knownModules;
668     }
669
670     @VisibleForTesting
671     DataPersistenceProvider getDataPersistenceProvider() {
672         return dataPersistenceProvider;
673     }
674
675     @VisibleForTesting
676     ShardManagerInfoMBean getMBean(){
677         return mBean;
678     }
679
680     @VisibleForTesting
681     protected static class ShardInformation {
682         private final ShardIdentifier shardId;
683         private final String shardName;
684         private ActorRef actor;
685         private ActorPath actorPath;
686         private final Map<String, String> peerAddresses;
687         private Optional<DataTree> localShardDataTree;
688
689         // flag that determines if the actor is ready for business
690         private boolean actorInitialized = false;
691
692         private boolean followerSyncStatus = false;
693
694         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
695         private String role ;
696         private String leaderId;
697
698         private ShardInformation(String shardName, ShardIdentifier shardId,
699                 Map<String, String> peerAddresses) {
700             this.shardName = shardName;
701             this.shardId = shardId;
702             this.peerAddresses = peerAddresses;
703         }
704
705         String getShardName() {
706             return shardName;
707         }
708
709         ActorRef getActor(){
710             return actor;
711         }
712
713         ActorPath getActorPath() {
714             return actorPath;
715         }
716
717         void setActor(ActorRef actor) {
718             this.actor = actor;
719             this.actorPath = actor.path();
720         }
721
722         ShardIdentifier getShardId() {
723             return shardId;
724         }
725
726         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
727             this.localShardDataTree = localShardDataTree;
728         }
729
730         Optional<DataTree> getLocalShardDataTree() {
731             return localShardDataTree;
732         }
733
734         Map<String, String> getPeerAddresses() {
735             return peerAddresses;
736         }
737
738         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
739             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
740                 peerAddress);
741             if(peerAddresses.containsKey(peerId)){
742                 peerAddresses.put(peerId, peerAddress);
743
744                 if(actor != null) {
745                     if(LOG.isDebugEnabled()) {
746                         LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
747                                 peerId, peerAddress, actor.path());
748                     }
749
750                     actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender);
751                 }
752
753                 notifyOnShardInitializedCallbacks();
754             }
755         }
756
757         boolean isShardReady() {
758             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
759         }
760
761         boolean isShardReadyWithLeaderId() {
762             return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
763         }
764
765         boolean isShardInitialized() {
766             return getActor() != null && actorInitialized;
767         }
768
769         boolean isLeader() {
770             return Objects.equal(leaderId, shardId.toString());
771         }
772
773         String getSerializedLeaderActor() {
774             if(isLeader()) {
775                 return Serialization.serializedActorPath(getActor());
776             } else {
777                 return peerAddresses.get(leaderId);
778             }
779         }
780
781         void setActorInitialized() {
782             LOG.debug("Shard {} is initialized", shardId);
783
784             this.actorInitialized = true;
785
786             notifyOnShardInitializedCallbacks();
787         }
788
789         private void notifyOnShardInitializedCallbacks() {
790             if(onShardInitializedSet.isEmpty()) {
791                 return;
792             }
793
794             boolean ready = isShardReadyWithLeaderId();
795
796             if(LOG.isDebugEnabled()) {
797                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
798                         ready ? "ready" : "initialized", onShardInitializedSet.size());
799             }
800
801             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
802             while(iter.hasNext()) {
803                 OnShardInitialized onShardInitialized = iter.next();
804                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
805                     iter.remove();
806                     onShardInitialized.getTimeoutSchedule().cancel();
807                     onShardInitialized.getReplyRunnable().run();
808                 }
809             }
810         }
811
812         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
813             onShardInitializedSet.add(onShardInitialized);
814         }
815
816         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
817             onShardInitializedSet.remove(onShardInitialized);
818         }
819
820         void setRole(String newRole) {
821             this.role = newRole;
822
823             notifyOnShardInitializedCallbacks();
824         }
825
826         void setFollowerSyncStatus(boolean syncStatus){
827             this.followerSyncStatus = syncStatus;
828         }
829
830         boolean isInSync(){
831             if(RaftState.Follower.name().equals(this.role)){
832                 return followerSyncStatus;
833             } else if(RaftState.Leader.name().equals(this.role)){
834                 return true;
835             }
836
837             return false;
838         }
839
840         boolean setLeaderId(String leaderId) {
841             boolean changed = !Objects.equal(this.leaderId, leaderId);
842             this.leaderId = leaderId;
843
844             notifyOnShardInitializedCallbacks();
845
846             return changed;
847         }
848     }
849
850     private static class ShardManagerCreator implements Creator<ShardManager> {
851         private static final long serialVersionUID = 1L;
852
853         final ClusterWrapper cluster;
854         final Configuration configuration;
855         final DatastoreContext datastoreContext;
856         private final CountDownLatch waitTillReadyCountdownLatch;
857         private final PrimaryShardInfoFutureCache primaryShardInfoCache;
858
859         ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
860                 CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
861             this.cluster = cluster;
862             this.configuration = configuration;
863             this.datastoreContext = datastoreContext;
864             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
865             this.primaryShardInfoCache = primaryShardInfoCache;
866         }
867
868         @Override
869         public ShardManager create() throws Exception {
870             return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
871                     primaryShardInfoCache);
872         }
873     }
874
875     private static class OnShardInitialized {
876         private final Runnable replyRunnable;
877         private Cancellable timeoutSchedule;
878
879         OnShardInitialized(Runnable replyRunnable) {
880             this.replyRunnable = replyRunnable;
881         }
882
883         Runnable getReplyRunnable() {
884             return replyRunnable;
885         }
886
887         Cancellable getTimeoutSchedule() {
888             return timeoutSchedule;
889         }
890
891         void setTimeoutSchedule(Cancellable timeoutSchedule) {
892             this.timeoutSchedule = timeoutSchedule;
893         }
894     }
895
896     private static class OnShardReady extends OnShardInitialized {
897         OnShardReady(Runnable replyRunnable) {
898             super(replyRunnable);
899         }
900     }
901
902     private static class ShardNotInitializedTimeout {
903         private final ActorRef sender;
904         private final ShardInformation shardInfo;
905         private final OnShardInitialized onShardInitialized;
906
907         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
908             this.sender = sender;
909             this.shardInfo = shardInfo;
910             this.onShardInitialized = onShardInitialized;
911         }
912
913         ActorRef getSender() {
914             return sender;
915         }
916
917         ShardInformation getShardInfo() {
918             return shardInfo;
919         }
920
921         OnShardInitialized getOnShardInitialized() {
922             return onShardInitialized;
923         }
924     }
925
926     static class SchemaContextModules implements Serializable {
927         private static final long serialVersionUID = -8884620101025936590L;
928
929         private final Set<String> modules;
930
931         SchemaContextModules(Set<String> modules){
932             this.modules = modules;
933         }
934
935         public Set<String> getModules() {
936             return modules;
937         }
938     }
939 }
940
941
942