Merge "BUG 2820 - LLDP refactor"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.Props;
17 import akka.actor.SupervisorStrategy;
18 import akka.cluster.ClusterEvent;
19 import akka.japi.Creator;
20 import akka.japi.Function;
21 import akka.japi.Procedure;
22 import akka.persistence.RecoveryCompleted;
23 import akka.persistence.RecoveryFailure;
24 import akka.serialization.Serialization;
25 import com.google.common.annotations.VisibleForTesting;
26 import com.google.common.base.Objects;
27 import com.google.common.base.Optional;
28 import com.google.common.base.Preconditions;
29 import com.google.common.base.Strings;
30 import com.google.common.base.Supplier;
31 import com.google.common.collect.ImmutableSet;
32 import com.google.common.collect.Sets;
33 import java.io.Serializable;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashMap;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.Set;
43 import java.util.concurrent.CountDownLatch;
44 import org.opendaylight.controller.cluster.DataPersistenceProvider;
45 import org.opendaylight.controller.cluster.NonPersistentDataProvider;
46 import org.opendaylight.controller.cluster.PersistentDataProvider;
47 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
51 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
52 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
53 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
54 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
55 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
56 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
57 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
58 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
59 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
60 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
61 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
62 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
63 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
64 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
65 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
66 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
67 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
68 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
69 import org.opendaylight.controller.cluster.raft.RaftState;
70 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
72 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
73 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
74 import org.slf4j.Logger;
75 import org.slf4j.LoggerFactory;
76 import scala.concurrent.duration.Duration;
77
78 /**
79  * The ShardManager has the following jobs,
80  * <ul>
81  * <li> Create all the local shard replicas that belong on this cluster member
82  * <li> Find the address of the local shard
83  * <li> Find the primary replica for any given shard
84  * <li> Monitor the cluster members and store their addresses
85  * <ul>
86  */
87 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
88
89     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
90
91     // Stores a mapping between a member name and the address of the member
92     // Member names look like "member-1", "member-2" etc and are as specified
93     // in configuration
94     private final Map<String, Address> memberNameToAddress = new HashMap<>();
95
96     // Stores a mapping between a shard name and it's corresponding information
97     // Shard names look like inventory, topology etc and are as specified in
98     // configuration
99     private final Map<String, ShardInformation> localShards = new HashMap<>();
100
101     // The type of a ShardManager reflects the type of the datastore itself
102     // A data store could be of type config/operational
103     private final String type;
104
105     private final String shardManagerIdentifierString;
106
107     private final ClusterWrapper cluster;
108
109     private final Configuration configuration;
110
111     private final String shardDispatcherPath;
112
113     private ShardManagerInfo mBean;
114
115     private DatastoreContext datastoreContext;
116
117     private Collection<String> knownModules = Collections.emptySet();
118
119     private final DataPersistenceProvider dataPersistenceProvider;
120
121     private final CountDownLatch waitTillReadyCountdownLatch;
122
123     /**
124      */
125     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
126             DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
127
128         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
129         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
130         this.datastoreContext = datastoreContext;
131         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
132         this.type = datastoreContext.getDataStoreType();
133         this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
134         this.shardDispatcherPath =
135                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
136         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
137
138         // Subscribe this actor to cluster member events
139         cluster.subscribeToMemberEvents(getSelf());
140
141         createLocalShards();
142     }
143
144     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
145         return (persistent) ? new PersistentDataProvider(this) : new NonPersistentDataProvider();
146     }
147
148     public static Props props(
149         final ClusterWrapper cluster,
150         final Configuration configuration,
151         final DatastoreContext datastoreContext,
152         final CountDownLatch waitTillReadyCountdownLatch) {
153
154         Preconditions.checkNotNull(cluster, "cluster should not be null");
155         Preconditions.checkNotNull(configuration, "configuration should not be null");
156         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
157
158         return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch));
159     }
160
161     @Override
162     public void postStop() {
163         LOG.info("Stopping ShardManager");
164
165         mBean.unregisterMBean();
166     }
167
168     @Override
169     public void handleCommand(Object message) throws Exception {
170         if (message  instanceof FindPrimary) {
171             findPrimary((FindPrimary)message);
172         } else if(message instanceof FindLocalShard){
173             findLocalShard((FindLocalShard) message);
174         } else if (message instanceof UpdateSchemaContext) {
175             updateSchemaContext(message);
176         } else if(message instanceof ActorInitialized) {
177             onActorInitialized(message);
178         } else if (message instanceof ClusterEvent.MemberUp){
179             memberUp((ClusterEvent.MemberUp) message);
180         } else if(message instanceof ClusterEvent.MemberRemoved) {
181             memberRemoved((ClusterEvent.MemberRemoved) message);
182         } else if(message instanceof ClusterEvent.UnreachableMember) {
183             ignoreMessage(message);
184         } else if(message instanceof DatastoreContext) {
185             onDatastoreContext((DatastoreContext)message);
186         } else if(message instanceof RoleChangeNotification) {
187             onRoleChangeNotification((RoleChangeNotification) message);
188         } else if(message instanceof FollowerInitialSyncUpStatus){
189             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
190         } else if(message instanceof ShardNotInitializedTimeout) {
191             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
192         } else if(message instanceof ShardLeaderStateChanged) {
193             onLeaderStateChanged((ShardLeaderStateChanged)message);
194         } else {
195             unknownMessage(message);
196         }
197
198     }
199
200     private void checkReady(){
201         if (isReadyWithLeaderId()) {
202             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
203                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
204
205             waitTillReadyCountdownLatch.countDown();
206         }
207     }
208
209     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
210         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
211
212         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
213         if(shardInformation != null) {
214             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
215             shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
216             checkReady();
217         } else {
218             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
219         }
220     }
221
222     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
223         ShardInformation shardInfo = message.getShardInfo();
224
225         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
226                 shardInfo.getShardName());
227
228         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
229
230         if(!shardInfo.isShardInitialized()) {
231             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
232             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
233         } else {
234             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
235             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
236         }
237     }
238
239     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
240         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
241                 status.getName(), status.isInitialSyncDone());
242
243         ShardInformation shardInformation = findShardInformation(status.getName());
244
245         if(shardInformation != null) {
246             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
247
248             mBean.setSyncStatus(isInSync());
249         }
250
251     }
252
253     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
254         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
255                 roleChanged.getOldRole(), roleChanged.getNewRole());
256
257         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
258         if(shardInformation != null) {
259             shardInformation.setRole(roleChanged.getNewRole());
260             checkReady();
261             mBean.setSyncStatus(isInSync());
262         }
263     }
264
265
266     private ShardInformation findShardInformation(String memberId) {
267         for(ShardInformation info : localShards.values()){
268             if(info.getShardId().toString().equals(memberId)){
269                 return info;
270             }
271         }
272
273         return null;
274     }
275
276     private boolean isReadyWithLeaderId() {
277         boolean isReady = true;
278         for (ShardInformation info : localShards.values()) {
279             if(!info.isShardReadyWithLeaderId()){
280                 isReady = false;
281                 break;
282             }
283         }
284         return isReady;
285     }
286
287     private boolean isInSync(){
288         for (ShardInformation info : localShards.values()) {
289             if(!info.isInSync()){
290                 return false;
291             }
292         }
293         return true;
294     }
295
296     private void onActorInitialized(Object message) {
297         final ActorRef sender = getSender();
298
299         if (sender == null) {
300             return; //why is a non-actor sending this message? Just ignore.
301         }
302
303         String actorName = sender.path().name();
304         //find shard name from actor name; actor name is stringified shardId
305         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
306
307         if (shardId.getShardName() == null) {
308             return;
309         }
310
311         markShardAsInitialized(shardId.getShardName());
312     }
313
314     private void markShardAsInitialized(String shardName) {
315         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
316
317         ShardInformation shardInformation = localShards.get(shardName);
318         if (shardInformation != null) {
319             shardInformation.setActorInitialized();
320
321             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
322         }
323     }
324
325     @Override
326     protected void handleRecover(Object message) throws Exception {
327         if(dataPersistenceProvider.isRecoveryApplicable()) {
328             if (message instanceof SchemaContextModules) {
329                 SchemaContextModules msg = (SchemaContextModules) message;
330                 knownModules = ImmutableSet.copyOf(msg.getModules());
331             } else if (message instanceof RecoveryFailure) {
332                 RecoveryFailure failure = (RecoveryFailure) message;
333                 LOG.error("Recovery failed", failure.cause());
334             } else if (message instanceof RecoveryCompleted) {
335                 LOG.info("Recovery complete : {}", persistenceId());
336
337                 // Delete all the messages from the akka journal except the last one
338                 deleteMessages(lastSequenceNr() - 1);
339             }
340         } else {
341             if (message instanceof RecoveryCompleted) {
342                 LOG.info("Recovery complete : {}", persistenceId());
343
344                 // Delete all the messages from the akka journal
345                 deleteMessages(lastSequenceNr());
346             }
347         }
348     }
349
350     private void findLocalShard(FindLocalShard message) {
351         final ShardInformation shardInformation = localShards.get(message.getShardName());
352
353         if(shardInformation == null){
354             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
355             return;
356         }
357
358         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
359             @Override
360             public Object get() {
361                 return new LocalShardFound(shardInformation.getActor());
362             }
363         });
364     }
365
366     private void sendResponse(ShardInformation shardInformation, boolean doWait,
367             boolean wantShardReady, final Supplier<Object> messageSupplier) {
368         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
369             if(doWait) {
370                 final ActorRef sender = getSender();
371                 final ActorRef self = self();
372
373                 Runnable replyRunnable = new Runnable() {
374                     @Override
375                     public void run() {
376                         sender.tell(messageSupplier.get(), self);
377                     }
378                 };
379
380                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
381                     new OnShardInitialized(replyRunnable);
382
383                 shardInformation.addOnShardInitialized(onShardInitialized);
384
385                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
386
387                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
388                         datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
389                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
390                         getContext().dispatcher(), getSelf());
391
392                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
393
394             } else if (!shardInformation.isShardInitialized()) {
395                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
396                         shardInformation.getShardName());
397                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
398             } else {
399                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
400                         shardInformation.getShardName());
401                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
402             }
403
404             return;
405         }
406
407         getSender().tell(messageSupplier.get(), getSelf());
408     }
409
410     private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
411         return new NoShardLeaderException(String.format(
412                 "Could not find a leader for shard %s. This typically happens when the system is coming up or " +
413                 "recovering and a leader is being elected. Try again later.", shardId));
414     }
415
416     private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
417         return new NotInitializedException(String.format(
418                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
419     }
420
421     private void memberRemoved(ClusterEvent.MemberRemoved message) {
422         String memberName = message.member().roles().head();
423
424         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
425                 message.member().address());
426
427         memberNameToAddress.remove(message.member().roles().head());
428     }
429
430     private void memberUp(ClusterEvent.MemberUp message) {
431         String memberName = message.member().roles().head();
432
433         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
434                 message.member().address());
435
436         memberNameToAddress.put(memberName, message.member().address());
437
438         for(ShardInformation info : localShards.values()){
439             String shardName = info.getShardName();
440             info.updatePeerAddress(getShardIdentifier(memberName, shardName).toString(),
441                 getShardActorPath(shardName, memberName), getSelf());
442         }
443
444         checkReady();
445     }
446
447     private void onDatastoreContext(DatastoreContext context) {
448         datastoreContext = context;
449         for (ShardInformation info : localShards.values()) {
450             if (info.getActor() != null) {
451                 info.getActor().tell(datastoreContext, getSelf());
452             }
453         }
454     }
455
456     /**
457      * Notifies all the local shards of a change in the schema context
458      *
459      * @param message
460      */
461     private void updateSchemaContext(final Object message) {
462         final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
463
464         Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
465         Set<String> newModules = new HashSet<>(128);
466
467         for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
468             String s = moduleIdentifier.getNamespace().toString();
469             newModules.add(s);
470         }
471
472         if(newModules.containsAll(knownModules)) {
473
474             LOG.debug("New SchemaContext has a super set of current knownModules - persisting info");
475
476             knownModules = ImmutableSet.copyOf(newModules);
477
478             dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
479
480                 @Override
481                 public void apply(SchemaContextModules param) throws Exception {
482                     LOG.debug("Sending new SchemaContext to Shards");
483                     for (ShardInformation info : localShards.values()) {
484                         if (info.getActor() == null) {
485                             info.setActor(newShardActor(schemaContext, info));
486                         } else {
487                             info.getActor().tell(message, getSelf());
488                         }
489                     }
490                 }
491
492             });
493         } else {
494             LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}",
495                     newModules, knownModules);
496         }
497
498     }
499
500     @VisibleForTesting
501     protected ClusterWrapper getCluster() {
502         return cluster;
503     }
504
505     @VisibleForTesting
506     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
507         return getContext().actorOf(Shard.props(info.getShardId(),
508                 info.getPeerAddresses(), datastoreContext, schemaContext)
509                         .withDispatcher(shardDispatcherPath), info.getShardId().toString());
510     }
511
512     private void findPrimary(FindPrimary message) {
513         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
514
515         final String shardName = message.getShardName();
516         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
517
518         // First see if the there is a local replica for the shard
519         final ShardInformation info = localShards.get(shardName);
520         if (info != null) {
521             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
522                 @Override
523                 public Object get() {
524                     String primaryPath = info.getSerializedLeaderActor();
525                     Object found = canReturnLocalShardState && info.isLeader() ?
526                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
527                                 new RemotePrimaryShardFound(primaryPath);
528
529                     if(LOG.isDebugEnabled()) {
530                         LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
531                     }
532
533                     return found;
534                 }
535             });
536
537             return;
538         }
539
540         for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
541             if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
542                 String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
543
544                 LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
545                         shardName, path);
546
547                 getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
548                         message.isWaitUntilReady()), getContext());
549                 return;
550             }
551         }
552
553         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
554
555         getSender().tell(new PrimaryNotFoundException(
556                 String.format("No primary shard found for %s.", shardName)), getSelf());
557     }
558
559     private StringBuilder getShardManagerActorPathBuilder(Address address) {
560         StringBuilder builder = new StringBuilder();
561         builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
562         return builder;
563     }
564
565     private String getShardActorPath(String shardName, String memberName) {
566         Address address = memberNameToAddress.get(memberName);
567         if(address != null) {
568             StringBuilder builder = getShardManagerActorPathBuilder(address);
569             builder.append("/")
570                 .append(getShardIdentifier(memberName, shardName));
571             return builder.toString();
572         }
573         return null;
574     }
575
576     /**
577      * Construct the name of the shard actor given the name of the member on
578      * which the shard resides and the name of the shard
579      *
580      * @param memberName
581      * @param shardName
582      * @return
583      */
584     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
585         return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
586     }
587
588     /**
589      * Create shards that are local to the member on which the ShardManager
590      * runs
591      *
592      */
593     private void createLocalShards() {
594         String memberName = this.cluster.getCurrentMemberName();
595         List<String> memberShardNames =
596             this.configuration.getMemberShardNames(memberName);
597
598         List<String> localShardActorNames = new ArrayList<>();
599         for(String shardName : memberShardNames){
600             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
601             Map<String, String> peerAddresses = getPeerAddresses(shardName);
602             localShardActorNames.add(shardId.toString());
603             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
604         }
605
606         mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
607                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
608     }
609
610     /**
611      * Given the name of the shard find the addresses of all it's peers
612      *
613      * @param shardName
614      * @return
615      */
616     private Map<String, String> getPeerAddresses(String shardName){
617
618         Map<String, String> peerAddresses = new HashMap<>();
619
620         List<String> members = this.configuration.getMembersFromShardName(shardName);
621
622         String currentMemberName = this.cluster.getCurrentMemberName();
623
624         for(String memberName : members){
625             if(!currentMemberName.equals(memberName)){
626                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
627                 String path = getShardActorPath(shardName, currentMemberName);
628                 peerAddresses.put(shardId.toString(), path);
629             }
630         }
631         return peerAddresses;
632     }
633
634     @Override
635     public SupervisorStrategy supervisorStrategy() {
636
637         return new OneForOneStrategy(10, Duration.create("1 minute"),
638             new Function<Throwable, SupervisorStrategy.Directive>() {
639                 @Override
640                 public SupervisorStrategy.Directive apply(Throwable t) {
641                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
642                     return SupervisorStrategy.resume();
643                 }
644             }
645         );
646
647     }
648
649     @Override
650     public String persistenceId() {
651         return "shard-manager-" + type;
652     }
653
654     @VisibleForTesting
655     Collection<String> getKnownModules() {
656         return knownModules;
657     }
658
659     @VisibleForTesting
660     DataPersistenceProvider getDataPersistenceProvider() {
661         return dataPersistenceProvider;
662     }
663
664     @VisibleForTesting
665     ShardManagerInfoMBean getMBean(){
666         return mBean;
667     }
668
669     @VisibleForTesting
670     protected static class ShardInformation {
671         private final ShardIdentifier shardId;
672         private final String shardName;
673         private ActorRef actor;
674         private ActorPath actorPath;
675         private final Map<String, String> peerAddresses;
676         private Optional<DataTree> localShardDataTree;
677
678         // flag that determines if the actor is ready for business
679         private boolean actorInitialized = false;
680
681         private boolean followerSyncStatus = false;
682
683         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
684         private String role ;
685         private String leaderId;
686
687         private ShardInformation(String shardName, ShardIdentifier shardId,
688                 Map<String, String> peerAddresses) {
689             this.shardName = shardName;
690             this.shardId = shardId;
691             this.peerAddresses = peerAddresses;
692         }
693
694         String getShardName() {
695             return shardName;
696         }
697
698         ActorRef getActor(){
699             return actor;
700         }
701
702         ActorPath getActorPath() {
703             return actorPath;
704         }
705
706         void setActor(ActorRef actor) {
707             this.actor = actor;
708             this.actorPath = actor.path();
709         }
710
711         ShardIdentifier getShardId() {
712             return shardId;
713         }
714
715         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
716             this.localShardDataTree = localShardDataTree;
717         }
718
719         Optional<DataTree> getLocalShardDataTree() {
720             return localShardDataTree;
721         }
722
723         Map<String, String> getPeerAddresses() {
724             return peerAddresses;
725         }
726
727         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
728             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
729                 peerAddress);
730             if(peerAddresses.containsKey(peerId)){
731                 peerAddresses.put(peerId, peerAddress);
732
733                 if(actor != null) {
734                     if(LOG.isDebugEnabled()) {
735                         LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
736                                 peerId, peerAddress, actor.path());
737                     }
738
739                     actor.tell(new PeerAddressResolved(peerId.toString(), peerAddress), sender);
740                 }
741
742                 notifyOnShardInitializedCallbacks();
743             }
744         }
745
746         boolean isShardReady() {
747             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
748         }
749
750         boolean isShardReadyWithLeaderId() {
751             return isShardReady() && (isLeader() || peerAddresses.get(leaderId) != null);
752         }
753
754         boolean isShardInitialized() {
755             return getActor() != null && actorInitialized;
756         }
757
758         boolean isLeader() {
759             return Objects.equal(leaderId, shardId.toString());
760         }
761
762         String getSerializedLeaderActor() {
763             if(isLeader()) {
764                 return Serialization.serializedActorPath(getActor());
765             } else {
766                 return peerAddresses.get(leaderId);
767             }
768         }
769
770         void setActorInitialized() {
771             LOG.debug("Shard {} is initialized", shardId);
772
773             this.actorInitialized = true;
774
775             notifyOnShardInitializedCallbacks();
776         }
777
778         private void notifyOnShardInitializedCallbacks() {
779             if(onShardInitializedSet.isEmpty()) {
780                 return;
781             }
782
783             boolean ready = isShardReadyWithLeaderId();
784
785             if(LOG.isDebugEnabled()) {
786                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
787                         ready ? "ready" : "initialized", onShardInitializedSet.size());
788             }
789
790             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
791             while(iter.hasNext()) {
792                 OnShardInitialized onShardInitialized = iter.next();
793                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
794                     iter.remove();
795                     onShardInitialized.getTimeoutSchedule().cancel();
796                     onShardInitialized.getReplyRunnable().run();
797                 }
798             }
799         }
800
801         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
802             onShardInitializedSet.add(onShardInitialized);
803         }
804
805         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
806             onShardInitializedSet.remove(onShardInitialized);
807         }
808
809         void setRole(String newRole) {
810             this.role = newRole;
811
812             notifyOnShardInitializedCallbacks();
813         }
814
815         void setFollowerSyncStatus(boolean syncStatus){
816             this.followerSyncStatus = syncStatus;
817         }
818
819         boolean isInSync(){
820             if(RaftState.Follower.name().equals(this.role)){
821                 return followerSyncStatus;
822             } else if(RaftState.Leader.name().equals(this.role)){
823                 return true;
824             }
825
826             return false;
827         }
828
829         void setLeaderId(String leaderId) {
830             this.leaderId = leaderId;
831
832             notifyOnShardInitializedCallbacks();
833         }
834     }
835
836     private static class ShardManagerCreator implements Creator<ShardManager> {
837         private static final long serialVersionUID = 1L;
838
839         final ClusterWrapper cluster;
840         final Configuration configuration;
841         final DatastoreContext datastoreContext;
842         private final CountDownLatch waitTillReadyCountdownLatch;
843
844         ShardManagerCreator(ClusterWrapper cluster,
845                             Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
846             this.cluster = cluster;
847             this.configuration = configuration;
848             this.datastoreContext = datastoreContext;
849             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
850         }
851
852         @Override
853         public ShardManager create() throws Exception {
854             return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
855         }
856     }
857
858     private static class OnShardInitialized {
859         private final Runnable replyRunnable;
860         private Cancellable timeoutSchedule;
861
862         OnShardInitialized(Runnable replyRunnable) {
863             this.replyRunnable = replyRunnable;
864         }
865
866         Runnable getReplyRunnable() {
867             return replyRunnable;
868         }
869
870         Cancellable getTimeoutSchedule() {
871             return timeoutSchedule;
872         }
873
874         void setTimeoutSchedule(Cancellable timeoutSchedule) {
875             this.timeoutSchedule = timeoutSchedule;
876         }
877     }
878
879     private static class OnShardReady extends OnShardInitialized {
880         OnShardReady(Runnable replyRunnable) {
881             super(replyRunnable);
882         }
883     }
884
885     private static class ShardNotInitializedTimeout {
886         private final ActorRef sender;
887         private final ShardInformation shardInfo;
888         private final OnShardInitialized onShardInitialized;
889
890         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
891             this.sender = sender;
892             this.shardInfo = shardInfo;
893             this.onShardInitialized = onShardInitialized;
894         }
895
896         ActorRef getSender() {
897             return sender;
898         }
899
900         ShardInformation getShardInfo() {
901             return shardInfo;
902         }
903
904         OnShardInitialized getOnShardInitialized() {
905             return onShardInitialized;
906         }
907     }
908
909     static class SchemaContextModules implements Serializable {
910         private static final long serialVersionUID = -8884620101025936590L;
911
912         private final Set<String> modules;
913
914         SchemaContextModules(Set<String> modules){
915             this.modules = modules;
916         }
917
918         public Set<String> getModules() {
919             return modules;
920         }
921     }
922 }
923
924
925