24d808cd91bf76839bcb9bdbb08e61e49a17db9b
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Cancellable;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import akka.persistence.RecoveryCompleted;
21 import akka.serialization.Serialization;
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Objects;
24 import com.google.common.base.Optional;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import com.google.common.base.Supplier;
28 import com.google.common.collect.Sets;
29 import java.io.Serializable;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.HashMap;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
40 import org.opendaylight.controller.cluster.datastore.config.Configuration;
41 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
42 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
43 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
44 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
45 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
46 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
47 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
48 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
49 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
50 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
51 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
52 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
58 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
59 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
60 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
61 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
62 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
63 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
64 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
65 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
66 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
67 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
68 import org.opendaylight.controller.cluster.raft.RaftState;
69 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
70 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
71 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
72 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
73 import org.slf4j.Logger;
74 import org.slf4j.LoggerFactory;
75 import scala.concurrent.duration.Duration;
76 import scala.concurrent.duration.FiniteDuration;
77
78 /**
79  * The ShardManager has the following jobs,
80  * <ul>
81  * <li> Create all the local shard replicas that belong on this cluster member
82  * <li> Find the address of the local shard
83  * <li> Find the primary replica for any given shard
84  * <li> Monitor the cluster members and store their addresses
85  * <ul>
86  */
87 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
88
89     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
90
91     // Stores a mapping between a shard name and it's corresponding information
92     // Shard names look like inventory, topology etc and are as specified in
93     // configuration
94     private final Map<String, ShardInformation> localShards = new HashMap<>();
95
96     // The type of a ShardManager reflects the type of the datastore itself
97     // A data store could be of type config/operational
98     private final String type;
99
100     private final ClusterWrapper cluster;
101
102     private final Configuration configuration;
103
104     private final String shardDispatcherPath;
105
106     private ShardManagerInfo mBean;
107
108     private DatastoreContext datastoreContext;
109
110     private final CountDownLatch waitTillReadyCountdownLatch;
111
112     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
113
114     private final ShardPeerAddressResolver peerAddressResolver;
115
116     private SchemaContext schemaContext;
117
118     /**
119      */
120     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
121             DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
122             PrimaryShardInfoFutureCache primaryShardInfoCache) {
123
124         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
125         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
126         this.datastoreContext = datastoreContext;
127         this.type = datastoreContext.getDataStoreType();
128         this.shardDispatcherPath =
129                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
130         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
131         this.primaryShardInfoCache = primaryShardInfoCache;
132
133         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
134         this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver(
135                 peerAddressResolver).build();
136
137         // Subscribe this actor to cluster member events
138         cluster.subscribeToMemberEvents(getSelf());
139
140         createLocalShards();
141     }
142
143     public static Props props(
144             final ClusterWrapper cluster,
145             final Configuration configuration,
146             final DatastoreContext datastoreContext,
147             final CountDownLatch waitTillReadyCountdownLatch,
148             final PrimaryShardInfoFutureCache primaryShardInfoCache) {
149
150         Preconditions.checkNotNull(cluster, "cluster should not be null");
151         Preconditions.checkNotNull(configuration, "configuration should not be null");
152         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
153         Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
154
155         return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
156                 waitTillReadyCountdownLatch, primaryShardInfoCache));
157     }
158
159     @Override
160     public void postStop() {
161         LOG.info("Stopping ShardManager");
162
163         mBean.unregisterMBean();
164     }
165
166     @Override
167     public void handleCommand(Object message) throws Exception {
168         if (message  instanceof FindPrimary) {
169             findPrimary((FindPrimary)message);
170         } else if(message instanceof FindLocalShard){
171             findLocalShard((FindLocalShard) message);
172         } else if (message instanceof UpdateSchemaContext) {
173             updateSchemaContext(message);
174         } else if(message instanceof ActorInitialized) {
175             onActorInitialized(message);
176         } else if (message instanceof ClusterEvent.MemberUp){
177             memberUp((ClusterEvent.MemberUp) message);
178         } else if (message instanceof ClusterEvent.MemberExited){
179             memberExited((ClusterEvent.MemberExited) message);
180         } else if(message instanceof ClusterEvent.MemberRemoved) {
181             memberRemoved((ClusterEvent.MemberRemoved) message);
182         } else if(message instanceof ClusterEvent.UnreachableMember) {
183             memberUnreachable((ClusterEvent.UnreachableMember)message);
184         } else if(message instanceof ClusterEvent.ReachableMember) {
185             memberReachable((ClusterEvent.ReachableMember) message);
186         } else if(message instanceof DatastoreContext) {
187             onDatastoreContext((DatastoreContext)message);
188         } else if(message instanceof RoleChangeNotification) {
189             onRoleChangeNotification((RoleChangeNotification) message);
190         } else if(message instanceof FollowerInitialSyncUpStatus){
191             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
192         } else if(message instanceof ShardNotInitializedTimeout) {
193             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
194         } else if(message instanceof ShardLeaderStateChanged) {
195             onLeaderStateChanged((ShardLeaderStateChanged) message);
196         } else if(message instanceof SwitchShardBehavior){
197             onSwitchShardBehavior((SwitchShardBehavior) message);
198         } else if(message instanceof CreateShard) {
199             onCreateShard((CreateShard)message);
200         } else {
201             unknownMessage(message);
202         }
203
204     }
205
206     private void onCreateShard(CreateShard createShard) {
207         Object reply;
208         try {
209             ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
210             if(localShards.containsKey(moduleShardConfig.getShardName())) {
211                 throw new IllegalStateException(String.format("Shard with name %s already exists",
212                         moduleShardConfig.getShardName()));
213             }
214
215             configuration.addModuleShardConfiguration(moduleShardConfig);
216
217             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
218             Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
219                     moduleShardConfig.getShardMemberNames()*/);
220
221             LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
222                     moduleShardConfig.getShardMemberNames(), peerAddresses);
223
224             DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
225             if(shardDatastoreContext == null) {
226                 shardDatastoreContext = datastoreContext;
227             } else {
228                 shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
229                         peerAddressResolver).build();
230             }
231
232             ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
233                     shardDatastoreContext, createShard.getShardPropsCreator());
234             localShards.put(info.getShardName(), info);
235
236             mBean.addLocalShard(shardId.toString());
237
238             if(schemaContext != null) {
239                 info.setActor(newShardActor(schemaContext, info));
240             }
241
242             reply = new CreateShardReply();
243         } catch (Exception e) {
244             LOG.error("onCreateShard failed", e);
245             reply = new akka.actor.Status.Failure(e);
246         }
247
248         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
249             getSender().tell(reply, getSelf());
250         }
251     }
252
253     private void checkReady(){
254         if (isReadyWithLeaderId()) {
255             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
256                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
257
258             waitTillReadyCountdownLatch.countDown();
259         }
260     }
261
262     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
263         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
264
265         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
266         if(shardInformation != null) {
267             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
268             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
269             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
270                 primaryShardInfoCache.remove(shardInformation.getShardName());
271             }
272
273             checkReady();
274         } else {
275             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
276         }
277     }
278
279     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
280         ShardInformation shardInfo = message.getShardInfo();
281
282         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
283                 shardInfo.getShardName());
284
285         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
286
287         if(!shardInfo.isShardInitialized()) {
288             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
289             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
290         } else {
291             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
292             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
293         }
294     }
295
296     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
297         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
298                 status.getName(), status.isInitialSyncDone());
299
300         ShardInformation shardInformation = findShardInformation(status.getName());
301
302         if(shardInformation != null) {
303             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
304
305             mBean.setSyncStatus(isInSync());
306         }
307
308     }
309
310     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
311         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
312                 roleChanged.getOldRole(), roleChanged.getNewRole());
313
314         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
315         if(shardInformation != null) {
316             shardInformation.setRole(roleChanged.getNewRole());
317             checkReady();
318             mBean.setSyncStatus(isInSync());
319         }
320     }
321
322
323     private ShardInformation findShardInformation(String memberId) {
324         for(ShardInformation info : localShards.values()){
325             if(info.getShardId().toString().equals(memberId)){
326                 return info;
327             }
328         }
329
330         return null;
331     }
332
333     private boolean isReadyWithLeaderId() {
334         boolean isReady = true;
335         for (ShardInformation info : localShards.values()) {
336             if(!info.isShardReadyWithLeaderId()){
337                 isReady = false;
338                 break;
339             }
340         }
341         return isReady;
342     }
343
344     private boolean isInSync(){
345         for (ShardInformation info : localShards.values()) {
346             if(!info.isInSync()){
347                 return false;
348             }
349         }
350         return true;
351     }
352
353     private void onActorInitialized(Object message) {
354         final ActorRef sender = getSender();
355
356         if (sender == null) {
357             return; //why is a non-actor sending this message? Just ignore.
358         }
359
360         String actorName = sender.path().name();
361         //find shard name from actor name; actor name is stringified shardId
362         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
363
364         if (shardId.getShardName() == null) {
365             return;
366         }
367
368         markShardAsInitialized(shardId.getShardName());
369     }
370
371     private void markShardAsInitialized(String shardName) {
372         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
373
374         ShardInformation shardInformation = localShards.get(shardName);
375         if (shardInformation != null) {
376             shardInformation.setActorInitialized();
377
378             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
379         }
380     }
381
382     @Override
383     protected void handleRecover(Object message) throws Exception {
384         if (message instanceof RecoveryCompleted) {
385             LOG.info("Recovery complete : {}", persistenceId());
386
387             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
388             // journal on upgrade from Helium.
389             deleteMessages(lastSequenceNr());
390         }
391     }
392
393     private void findLocalShard(FindLocalShard message) {
394         final ShardInformation shardInformation = localShards.get(message.getShardName());
395
396         if(shardInformation == null){
397             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
398             return;
399         }
400
401         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
402             @Override
403             public Object get() {
404                 return new LocalShardFound(shardInformation.getActor());
405             }
406         });
407     }
408
409     private void sendResponse(ShardInformation shardInformation, boolean doWait,
410             boolean wantShardReady, final Supplier<Object> messageSupplier) {
411         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
412             if(doWait) {
413                 final ActorRef sender = getSender();
414                 final ActorRef self = self();
415
416                 Runnable replyRunnable = new Runnable() {
417                     @Override
418                     public void run() {
419                         sender.tell(messageSupplier.get(), self);
420                     }
421                 };
422
423                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
424                     new OnShardInitialized(replyRunnable);
425
426                 shardInformation.addOnShardInitialized(onShardInitialized);
427
428                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
429
430                 FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
431                 if(shardInformation.isShardInitialized()) {
432                     // If the shard is already initialized then we'll wait enough time for the shard to
433                     // elect a leader, ie 2 times the election timeout.
434                     timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
435                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
436                 }
437
438                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
439                         timeout, getSelf(),
440                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
441                         getContext().dispatcher(), getSelf());
442
443                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
444
445             } else if (!shardInformation.isShardInitialized()) {
446                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
447                         shardInformation.getShardName());
448                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
449             } else {
450                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
451                         shardInformation.getShardName());
452                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
453             }
454
455             return;
456         }
457
458         getSender().tell(messageSupplier.get(), getSelf());
459     }
460
461     private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
462         return new NoShardLeaderException(null, shardId.toString());
463     }
464
465     private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
466         return new NotInitializedException(String.format(
467                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
468     }
469
470     private void memberRemoved(ClusterEvent.MemberRemoved message) {
471         String memberName = message.member().roles().head();
472
473         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
474                 message.member().address());
475
476         peerAddressResolver.removePeerAddress(memberName);
477
478         for(ShardInformation info : localShards.values()){
479             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
480         }
481     }
482
483     private void memberExited(ClusterEvent.MemberExited message) {
484         String memberName = message.member().roles().head();
485
486         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
487                 message.member().address());
488
489         peerAddressResolver.removePeerAddress(memberName);
490
491         for(ShardInformation info : localShards.values()){
492             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
493         }
494     }
495
496     private void memberUp(ClusterEvent.MemberUp message) {
497         String memberName = message.member().roles().head();
498
499         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
500                 message.member().address());
501
502         peerAddressResolver.addPeerAddress(memberName, message.member().address());
503
504         for(ShardInformation info : localShards.values()){
505             String shardName = info.getShardName();
506             String peerId = getShardIdentifier(memberName, shardName).toString();
507             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
508
509             info.peerUp(memberName, peerId, getSelf());
510         }
511
512         checkReady();
513     }
514
515     private void memberReachable(ClusterEvent.ReachableMember message) {
516         String memberName = message.member().roles().head();
517         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
518
519         markMemberAvailable(memberName);
520     }
521
522     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
523         String memberName = message.member().roles().head();
524         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
525
526         markMemberUnavailable(memberName);
527     }
528
529     private void markMemberUnavailable(final String memberName) {
530         for(ShardInformation info : localShards.values()){
531             String leaderId = info.getLeaderId();
532             if(leaderId != null && leaderId.contains(memberName)) {
533                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
534                 info.setLeaderAvailable(false);
535
536                 primaryShardInfoCache.remove(info.getShardName());
537             }
538
539             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
540         }
541     }
542
543     private void markMemberAvailable(final String memberName) {
544         for(ShardInformation info : localShards.values()){
545             String leaderId = info.getLeaderId();
546             if(leaderId != null && leaderId.contains(memberName)) {
547                 LOG.debug("Marking Leader {} as available.", leaderId);
548                 info.setLeaderAvailable(true);
549             }
550
551             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
552         }
553     }
554
555     private void onDatastoreContext(DatastoreContext context) {
556         datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver(
557                 peerAddressResolver).build();
558         for (ShardInformation info : localShards.values()) {
559             if (info.getActor() != null) {
560                 info.getActor().tell(datastoreContext, getSelf());
561             }
562         }
563     }
564
565     private void onSwitchShardBehavior(SwitchShardBehavior message) {
566         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
567
568         ShardInformation shardInformation = localShards.get(identifier.getShardName());
569
570         if(shardInformation != null && shardInformation.getActor() != null) {
571             shardInformation.getActor().tell(
572                     new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
573         } else {
574             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
575                     message.getShardName(), message.getNewState());
576         }
577     }
578
579     /**
580      * Notifies all the local shards of a change in the schema context
581      *
582      * @param message
583      */
584     private void updateSchemaContext(final Object message) {
585         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
586
587         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
588
589         for (ShardInformation info : localShards.values()) {
590             if (info.getActor() == null) {
591                 LOG.debug("Creating Shard {}", info.getShardId());
592                 info.setActor(newShardActor(schemaContext, info));
593             } else {
594                 info.getActor().tell(message, getSelf());
595             }
596         }
597     }
598
599     @VisibleForTesting
600     protected ClusterWrapper getCluster() {
601         return cluster;
602     }
603
604     @VisibleForTesting
605     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
606         return getContext().actorOf(info.newProps(schemaContext)
607                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
608     }
609
610     private void findPrimary(FindPrimary message) {
611         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
612
613         final String shardName = message.getShardName();
614         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
615
616         // First see if the there is a local replica for the shard
617         final ShardInformation info = localShards.get(shardName);
618         if (info != null) {
619             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
620                 @Override
621                 public Object get() {
622                     String primaryPath = info.getSerializedLeaderActor();
623                     Object found = canReturnLocalShardState && info.isLeader() ?
624                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
625                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
626
627                             if(LOG.isDebugEnabled()) {
628                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
629                             }
630
631                             return found;
632                 }
633             });
634
635             return;
636         }
637
638         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
639             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
640                     shardName, address);
641
642             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
643                     message.isWaitUntilReady()), getContext());
644             return;
645         }
646
647         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
648
649         getSender().tell(new PrimaryNotFoundException(
650                 String.format("No primary shard found for %s.", shardName)), getSelf());
651     }
652
653     /**
654      * Construct the name of the shard actor given the name of the member on
655      * which the shard resides and the name of the shard
656      *
657      * @param memberName
658      * @param shardName
659      * @return
660      */
661     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
662         return peerAddressResolver.getShardIdentifier(memberName, shardName);
663     }
664
665     /**
666      * Create shards that are local to the member on which the ShardManager
667      * runs
668      *
669      */
670     private void createLocalShards() {
671         String memberName = this.cluster.getCurrentMemberName();
672         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
673
674         ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator();
675         List<String> localShardActorNames = new ArrayList<>();
676         for(String shardName : memberShardNames){
677             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
678             Map<String, String> peerAddresses = getPeerAddresses(shardName);
679             localShardActorNames.add(shardId.toString());
680             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
681                     shardPropsCreator));
682         }
683
684         mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
685                 datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
686
687         mBean.setShardManager(this);
688     }
689
690     /**
691      * Given the name of the shard find the addresses of all it's peers
692      *
693      * @param shardName
694      */
695     private Map<String, String> getPeerAddresses(String shardName) {
696         Collection<String> members = configuration.getMembersFromShardName(shardName);
697         Map<String, String> peerAddresses = new HashMap<>();
698
699         String currentMemberName = this.cluster.getCurrentMemberName();
700
701         for(String memberName : members) {
702             if(!currentMemberName.equals(memberName)) {
703                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
704                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
705                 peerAddresses.put(shardId.toString(), address);
706             }
707         }
708         return peerAddresses;
709     }
710
711     @Override
712     public SupervisorStrategy supervisorStrategy() {
713
714         return new OneForOneStrategy(10, Duration.create("1 minute"),
715                 new Function<Throwable, SupervisorStrategy.Directive>() {
716             @Override
717             public SupervisorStrategy.Directive apply(Throwable t) {
718                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
719                 return SupervisorStrategy.resume();
720             }
721         }
722                 );
723
724     }
725
726     @Override
727     public String persistenceId() {
728         return "shard-manager-" + type;
729     }
730
731     @VisibleForTesting
732     ShardManagerInfoMBean getMBean(){
733         return mBean;
734     }
735
736     @VisibleForTesting
737     protected static class ShardInformation {
738         private final ShardIdentifier shardId;
739         private final String shardName;
740         private ActorRef actor;
741         private ActorPath actorPath;
742         private final Map<String, String> peerAddresses;
743         private Optional<DataTree> localShardDataTree;
744         private boolean leaderAvailable = false;
745
746         // flag that determines if the actor is ready for business
747         private boolean actorInitialized = false;
748
749         private boolean followerSyncStatus = false;
750
751         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
752         private String role ;
753         private String leaderId;
754         private short leaderVersion;
755
756         private final DatastoreContext datastoreContext;
757         private final ShardPropsCreator shardPropsCreator;
758
759         private ShardInformation(String shardName, ShardIdentifier shardId,
760                 Map<String, String> peerAddresses, DatastoreContext datastoreContext,
761                 ShardPropsCreator shardPropsCreator) {
762             this.shardName = shardName;
763             this.shardId = shardId;
764             this.peerAddresses = peerAddresses;
765             this.datastoreContext = datastoreContext;
766             this.shardPropsCreator = shardPropsCreator;
767         }
768
769         Props newProps(SchemaContext schemaContext) {
770             return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext);
771         }
772
773         String getShardName() {
774             return shardName;
775         }
776
777         ActorRef getActor(){
778             return actor;
779         }
780
781         ActorPath getActorPath() {
782             return actorPath;
783         }
784
785         void setActor(ActorRef actor) {
786             this.actor = actor;
787             this.actorPath = actor.path();
788         }
789
790         ShardIdentifier getShardId() {
791             return shardId;
792         }
793
794         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
795             this.localShardDataTree = localShardDataTree;
796         }
797
798         Optional<DataTree> getLocalShardDataTree() {
799             return localShardDataTree;
800         }
801
802         Map<String, String> getPeerAddresses() {
803             return peerAddresses;
804         }
805
806         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
807             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
808                     peerAddress);
809             if(peerAddresses.containsKey(peerId)){
810                 peerAddresses.put(peerId, peerAddress);
811
812                 if(actor != null) {
813                     if(LOG.isDebugEnabled()) {
814                         LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
815                                 peerId, peerAddress, actor.path());
816                     }
817
818                     actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
819                 }
820
821                 notifyOnShardInitializedCallbacks();
822             }
823         }
824
825         void peerDown(String memberName, String peerId, ActorRef sender) {
826             if(peerAddresses.containsKey(peerId) && actor != null) {
827                 actor.tell(new PeerDown(memberName, peerId), sender);
828             }
829         }
830
831         void peerUp(String memberName, String peerId, ActorRef sender) {
832             if(peerAddresses.containsKey(peerId) && actor != null) {
833                 actor.tell(new PeerUp(memberName, peerId), sender);
834             }
835         }
836
837         boolean isShardReady() {
838             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
839         }
840
841         boolean isShardReadyWithLeaderId() {
842             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
843                     (isLeader() || peerAddresses.get(leaderId) != null);
844         }
845
846         boolean isShardInitialized() {
847             return getActor() != null && actorInitialized;
848         }
849
850         boolean isLeader() {
851             return Objects.equal(leaderId, shardId.toString());
852         }
853
854         String getSerializedLeaderActor() {
855             if(isLeader()) {
856                 return Serialization.serializedActorPath(getActor());
857             } else {
858                 return peerAddresses.get(leaderId);
859             }
860         }
861
862         void setActorInitialized() {
863             LOG.debug("Shard {} is initialized", shardId);
864
865             this.actorInitialized = true;
866
867             notifyOnShardInitializedCallbacks();
868         }
869
870         private void notifyOnShardInitializedCallbacks() {
871             if(onShardInitializedSet.isEmpty()) {
872                 return;
873             }
874
875             boolean ready = isShardReadyWithLeaderId();
876
877             if(LOG.isDebugEnabled()) {
878                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
879                         ready ? "ready" : "initialized", onShardInitializedSet.size());
880             }
881
882             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
883             while(iter.hasNext()) {
884                 OnShardInitialized onShardInitialized = iter.next();
885                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
886                     iter.remove();
887                     onShardInitialized.getTimeoutSchedule().cancel();
888                     onShardInitialized.getReplyRunnable().run();
889                 }
890             }
891         }
892
893         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
894             onShardInitializedSet.add(onShardInitialized);
895         }
896
897         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
898             onShardInitializedSet.remove(onShardInitialized);
899         }
900
901         void setRole(String newRole) {
902             this.role = newRole;
903
904             notifyOnShardInitializedCallbacks();
905         }
906
907         void setFollowerSyncStatus(boolean syncStatus){
908             this.followerSyncStatus = syncStatus;
909         }
910
911         boolean isInSync(){
912             if(RaftState.Follower.name().equals(this.role)){
913                 return followerSyncStatus;
914             } else if(RaftState.Leader.name().equals(this.role)){
915                 return true;
916             }
917
918             return false;
919         }
920
921         boolean setLeaderId(String leaderId) {
922             boolean changed = !Objects.equal(this.leaderId, leaderId);
923             this.leaderId = leaderId;
924             if(leaderId != null) {
925                 this.leaderAvailable = true;
926             }
927             notifyOnShardInitializedCallbacks();
928
929             return changed;
930         }
931
932         String getLeaderId() {
933             return leaderId;
934         }
935
936         void setLeaderAvailable(boolean leaderAvailable) {
937             this.leaderAvailable = leaderAvailable;
938         }
939
940         short getLeaderVersion() {
941             return leaderVersion;
942         }
943
944         void setLeaderVersion(short leaderVersion) {
945             this.leaderVersion = leaderVersion;
946         }
947     }
948
949     private static class ShardManagerCreator implements Creator<ShardManager> {
950         private static final long serialVersionUID = 1L;
951
952         final ClusterWrapper cluster;
953         final Configuration configuration;
954         final DatastoreContext datastoreContext;
955         private final CountDownLatch waitTillReadyCountdownLatch;
956         private final PrimaryShardInfoFutureCache primaryShardInfoCache;
957
958         ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
959                 CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
960             this.cluster = cluster;
961             this.configuration = configuration;
962             this.datastoreContext = datastoreContext;
963             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
964             this.primaryShardInfoCache = primaryShardInfoCache;
965         }
966
967         @Override
968         public ShardManager create() throws Exception {
969             return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
970                     primaryShardInfoCache);
971         }
972     }
973
974     private static class OnShardInitialized {
975         private final Runnable replyRunnable;
976         private Cancellable timeoutSchedule;
977
978         OnShardInitialized(Runnable replyRunnable) {
979             this.replyRunnable = replyRunnable;
980         }
981
982         Runnable getReplyRunnable() {
983             return replyRunnable;
984         }
985
986         Cancellable getTimeoutSchedule() {
987             return timeoutSchedule;
988         }
989
990         void setTimeoutSchedule(Cancellable timeoutSchedule) {
991             this.timeoutSchedule = timeoutSchedule;
992         }
993     }
994
995     private static class OnShardReady extends OnShardInitialized {
996         OnShardReady(Runnable replyRunnable) {
997             super(replyRunnable);
998         }
999     }
1000
1001     private static class ShardNotInitializedTimeout {
1002         private final ActorRef sender;
1003         private final ShardInformation shardInfo;
1004         private final OnShardInitialized onShardInitialized;
1005
1006         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1007             this.sender = sender;
1008             this.shardInfo = shardInfo;
1009             this.onShardInitialized = onShardInitialized;
1010         }
1011
1012         ActorRef getSender() {
1013             return sender;
1014         }
1015
1016         ShardInformation getShardInfo() {
1017             return shardInfo;
1018         }
1019
1020         OnShardInitialized getOnShardInitialized() {
1021             return onShardInitialized;
1022         }
1023     }
1024
1025     /**
1026      * We no longer persist SchemaContextModules but keep this class around for now for backwards
1027      * compatibility so we don't get de-serialization failures on upgrade from Helium.
1028      */
1029     @Deprecated
1030     static class SchemaContextModules implements Serializable {
1031         private static final long serialVersionUID = -8884620101025936590L;
1032
1033         private final Set<String> modules;
1034
1035         SchemaContextModules(Set<String> modules){
1036             this.modules = modules;
1037         }
1038
1039         public Set<String> getModules() {
1040             return modules;
1041         }
1042     }
1043 }
1044
1045
1046