eb39a34dc0dc508b3543e5d3f37976c749672bfe
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.Props;
17 import akka.actor.SupervisorStrategy;
18 import akka.cluster.ClusterEvent;
19 import akka.japi.Creator;
20 import akka.japi.Function;
21 import akka.persistence.RecoveryCompleted;
22 import akka.serialization.Serialization;
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Objects;
25 import com.google.common.base.Optional;
26 import com.google.common.base.Preconditions;
27 import com.google.common.base.Strings;
28 import com.google.common.base.Supplier;
29 import com.google.common.collect.Sets;
30 import java.io.Serializable;
31 import java.util.ArrayList;
32 import java.util.Collection;
33 import java.util.HashMap;
34 import java.util.Iterator;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.CountDownLatch;
39 import java.util.concurrent.TimeUnit;
40 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
41 import org.opendaylight.controller.cluster.datastore.config.Configuration;
42 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
43 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
44 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
45 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
46 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
47 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
48 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
49 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
50 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
51 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
52 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
53 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
58 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
59 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
60 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
61 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
62 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
63 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
64 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
65 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
66 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
67 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
68 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
69 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
70 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
71 import org.opendaylight.controller.cluster.raft.RaftState;
72 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
73 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
74 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
75 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
76 import org.slf4j.Logger;
77 import org.slf4j.LoggerFactory;
78 import scala.concurrent.duration.Duration;
79 import scala.concurrent.duration.FiniteDuration;
80
81 /**
82  * The ShardManager has the following jobs,
83  * <ul>
84  * <li> Create all the local shard replicas that belong on this cluster member
85  * <li> Find the address of the local shard
86  * <li> Find the primary replica for any given shard
87  * <li> Monitor the cluster members and store their addresses
88  * <ul>
89  */
90 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
91
92     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
93
94     // Stores a mapping between a shard name and it's corresponding information
95     // Shard names look like inventory, topology etc and are as specified in
96     // configuration
97     private final Map<String, ShardInformation> localShards = new HashMap<>();
98
99     // The type of a ShardManager reflects the type of the datastore itself
100     // A data store could be of type config/operational
101     private final String type;
102
103     private final ClusterWrapper cluster;
104
105     private final Configuration configuration;
106
107     private final String shardDispatcherPath;
108
109     private ShardManagerInfo mBean;
110
111     private DatastoreContext datastoreContext;
112
113     private final CountDownLatch waitTillReadyCountdownLatch;
114
115     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
116
117     private final ShardPeerAddressResolver peerAddressResolver;
118
119     private SchemaContext schemaContext;
120
121     /**
122      */
123     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
124             DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
125             PrimaryShardInfoFutureCache primaryShardInfoCache) {
126
127         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
128         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
129         this.datastoreContext = datastoreContext;
130         this.type = datastoreContext.getDataStoreType();
131         this.shardDispatcherPath =
132                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
133         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
134         this.primaryShardInfoCache = primaryShardInfoCache;
135
136         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
137         this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver(
138                 peerAddressResolver).build();
139
140         // Subscribe this actor to cluster member events
141         cluster.subscribeToMemberEvents(getSelf());
142
143         createLocalShards();
144     }
145
146     public static Props props(
147             final ClusterWrapper cluster,
148             final Configuration configuration,
149             final DatastoreContext datastoreContext,
150             final CountDownLatch waitTillReadyCountdownLatch,
151             final PrimaryShardInfoFutureCache primaryShardInfoCache) {
152
153         Preconditions.checkNotNull(cluster, "cluster should not be null");
154         Preconditions.checkNotNull(configuration, "configuration should not be null");
155         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
156         Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
157
158         return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
159                 waitTillReadyCountdownLatch, primaryShardInfoCache));
160     }
161
162     @Override
163     public void postStop() {
164         LOG.info("Stopping ShardManager");
165
166         mBean.unregisterMBean();
167     }
168
169     @Override
170     public void handleCommand(Object message) throws Exception {
171         if (message  instanceof FindPrimary) {
172             findPrimary((FindPrimary)message);
173         } else if(message instanceof FindLocalShard){
174             findLocalShard((FindLocalShard) message);
175         } else if (message instanceof UpdateSchemaContext) {
176             updateSchemaContext(message);
177         } else if(message instanceof ActorInitialized) {
178             onActorInitialized(message);
179         } else if (message instanceof ClusterEvent.MemberUp){
180             memberUp((ClusterEvent.MemberUp) message);
181         } else if (message instanceof ClusterEvent.MemberExited){
182             memberExited((ClusterEvent.MemberExited) message);
183         } else if(message instanceof ClusterEvent.MemberRemoved) {
184             memberRemoved((ClusterEvent.MemberRemoved) message);
185         } else if(message instanceof ClusterEvent.UnreachableMember) {
186             memberUnreachable((ClusterEvent.UnreachableMember)message);
187         } else if(message instanceof ClusterEvent.ReachableMember) {
188             memberReachable((ClusterEvent.ReachableMember) message);
189         } else if(message instanceof DatastoreContext) {
190             onDatastoreContext((DatastoreContext)message);
191         } else if(message instanceof RoleChangeNotification) {
192             onRoleChangeNotification((RoleChangeNotification) message);
193         } else if(message instanceof FollowerInitialSyncUpStatus){
194             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
195         } else if(message instanceof ShardNotInitializedTimeout) {
196             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
197         } else if(message instanceof ShardLeaderStateChanged) {
198             onLeaderStateChanged((ShardLeaderStateChanged) message);
199         } else if(message instanceof SwitchShardBehavior){
200             onSwitchShardBehavior((SwitchShardBehavior) message);
201         } else if(message instanceof CreateShard) {
202             onCreateShard((CreateShard)message);
203         } else if(message instanceof AddShardReplica){
204             onAddShardReplica((AddShardReplica)message);
205         } else if(message instanceof RemoveShardReplica){
206             onRemoveShardReplica((RemoveShardReplica)message);
207         } else {
208             unknownMessage(message);
209         }
210
211     }
212
213     private void onCreateShard(CreateShard createShard) {
214         Object reply;
215         try {
216             ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
217             if(localShards.containsKey(moduleShardConfig.getShardName())) {
218                 throw new IllegalStateException(String.format("Shard with name %s already exists",
219                         moduleShardConfig.getShardName()));
220             }
221
222             configuration.addModuleShardConfiguration(moduleShardConfig);
223
224             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
225             Map<String, String> peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
226                     moduleShardConfig.getShardMemberNames()*/);
227
228             LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
229                     moduleShardConfig.getShardMemberNames(), peerAddresses);
230
231             DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
232             if(shardDatastoreContext == null) {
233                 shardDatastoreContext = datastoreContext;
234             } else {
235                 shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
236                         peerAddressResolver).build();
237             }
238
239             ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
240                     shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
241             localShards.put(info.getShardName(), info);
242
243             mBean.addLocalShard(shardId.toString());
244
245             if(schemaContext != null) {
246                 info.setActor(newShardActor(schemaContext, info));
247             }
248
249             reply = new CreateShardReply();
250         } catch (Exception e) {
251             LOG.error("onCreateShard failed", e);
252             reply = new akka.actor.Status.Failure(e);
253         }
254
255         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
256             getSender().tell(reply, getSelf());
257         }
258     }
259
260     private void checkReady(){
261         if (isReadyWithLeaderId()) {
262             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
263                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
264
265             waitTillReadyCountdownLatch.countDown();
266         }
267     }
268
269     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
270         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
271
272         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
273         if(shardInformation != null) {
274             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
275             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
276             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
277                 primaryShardInfoCache.remove(shardInformation.getShardName());
278             }
279
280             checkReady();
281         } else {
282             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
283         }
284     }
285
286     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
287         ShardInformation shardInfo = message.getShardInfo();
288
289         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
290                 shardInfo.getShardName());
291
292         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
293
294         if(!shardInfo.isShardInitialized()) {
295             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
296             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
297         } else {
298             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
299             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
300         }
301     }
302
303     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
304         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
305                 status.getName(), status.isInitialSyncDone());
306
307         ShardInformation shardInformation = findShardInformation(status.getName());
308
309         if(shardInformation != null) {
310             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
311
312             mBean.setSyncStatus(isInSync());
313         }
314
315     }
316
317     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
318         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
319                 roleChanged.getOldRole(), roleChanged.getNewRole());
320
321         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
322         if(shardInformation != null) {
323             shardInformation.setRole(roleChanged.getNewRole());
324             checkReady();
325             mBean.setSyncStatus(isInSync());
326         }
327     }
328
329
330     private ShardInformation findShardInformation(String memberId) {
331         for(ShardInformation info : localShards.values()){
332             if(info.getShardId().toString().equals(memberId)){
333                 return info;
334             }
335         }
336
337         return null;
338     }
339
340     private boolean isReadyWithLeaderId() {
341         boolean isReady = true;
342         for (ShardInformation info : localShards.values()) {
343             if(!info.isShardReadyWithLeaderId()){
344                 isReady = false;
345                 break;
346             }
347         }
348         return isReady;
349     }
350
351     private boolean isInSync(){
352         for (ShardInformation info : localShards.values()) {
353             if(!info.isInSync()){
354                 return false;
355             }
356         }
357         return true;
358     }
359
360     private void onActorInitialized(Object message) {
361         final ActorRef sender = getSender();
362
363         if (sender == null) {
364             return; //why is a non-actor sending this message? Just ignore.
365         }
366
367         String actorName = sender.path().name();
368         //find shard name from actor name; actor name is stringified shardId
369         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
370
371         if (shardId.getShardName() == null) {
372             return;
373         }
374
375         markShardAsInitialized(shardId.getShardName());
376     }
377
378     private void markShardAsInitialized(String shardName) {
379         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
380
381         ShardInformation shardInformation = localShards.get(shardName);
382         if (shardInformation != null) {
383             shardInformation.setActorInitialized();
384
385             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
386         }
387     }
388
389     @Override
390     protected void handleRecover(Object message) throws Exception {
391         if (message instanceof RecoveryCompleted) {
392             LOG.info("Recovery complete : {}", persistenceId());
393
394             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
395             // journal on upgrade from Helium.
396             deleteMessages(lastSequenceNr());
397         }
398     }
399
400     private void findLocalShard(FindLocalShard message) {
401         final ShardInformation shardInformation = localShards.get(message.getShardName());
402
403         if(shardInformation == null){
404             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
405             return;
406         }
407
408         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
409             @Override
410             public Object get() {
411                 return new LocalShardFound(shardInformation.getActor());
412             }
413         });
414     }
415
416     private void sendResponse(ShardInformation shardInformation, boolean doWait,
417             boolean wantShardReady, final Supplier<Object> messageSupplier) {
418         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
419             if(doWait) {
420                 final ActorRef sender = getSender();
421                 final ActorRef self = self();
422
423                 Runnable replyRunnable = new Runnable() {
424                     @Override
425                     public void run() {
426                         sender.tell(messageSupplier.get(), self);
427                     }
428                 };
429
430                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
431                     new OnShardInitialized(replyRunnable);
432
433                 shardInformation.addOnShardInitialized(onShardInitialized);
434
435                 LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
436
437                 FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
438                 if(shardInformation.isShardInitialized()) {
439                     // If the shard is already initialized then we'll wait enough time for the shard to
440                     // elect a leader, ie 2 times the election timeout.
441                     timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
442                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
443                 }
444
445                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
446                         timeout, getSelf(),
447                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
448                         getContext().dispatcher(), getSelf());
449
450                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
451
452             } else if (!shardInformation.isShardInitialized()) {
453                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
454                         shardInformation.getShardName());
455                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
456             } else {
457                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
458                         shardInformation.getShardName());
459                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
460             }
461
462             return;
463         }
464
465         getSender().tell(messageSupplier.get(), getSelf());
466     }
467
468     private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
469         return new NoShardLeaderException(null, shardId.toString());
470     }
471
472     private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
473         return new NotInitializedException(String.format(
474                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
475     }
476
477     private void memberRemoved(ClusterEvent.MemberRemoved message) {
478         String memberName = message.member().roles().head();
479
480         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
481                 message.member().address());
482
483         peerAddressResolver.removePeerAddress(memberName);
484
485         for(ShardInformation info : localShards.values()){
486             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
487         }
488     }
489
490     private void memberExited(ClusterEvent.MemberExited message) {
491         String memberName = message.member().roles().head();
492
493         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
494                 message.member().address());
495
496         peerAddressResolver.removePeerAddress(memberName);
497
498         for(ShardInformation info : localShards.values()){
499             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
500         }
501     }
502
503     private void memberUp(ClusterEvent.MemberUp message) {
504         String memberName = message.member().roles().head();
505
506         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
507                 message.member().address());
508
509         addPeerAddress(memberName, message.member().address());
510
511         checkReady();
512     }
513
514     private void addPeerAddress(String memberName, Address address) {
515         peerAddressResolver.addPeerAddress(memberName, address);
516
517         for(ShardInformation info : localShards.values()){
518             String shardName = info.getShardName();
519             String peerId = getShardIdentifier(memberName, shardName).toString();
520             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
521
522             info.peerUp(memberName, peerId, getSelf());
523         }
524     }
525
526     private void memberReachable(ClusterEvent.ReachableMember message) {
527         String memberName = message.member().roles().head();
528         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
529
530         addPeerAddress(memberName, message.member().address());
531
532         markMemberAvailable(memberName);
533     }
534
535     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
536         String memberName = message.member().roles().head();
537         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
538
539         markMemberUnavailable(memberName);
540     }
541
542     private void markMemberUnavailable(final String memberName) {
543         for(ShardInformation info : localShards.values()){
544             String leaderId = info.getLeaderId();
545             if(leaderId != null && leaderId.contains(memberName)) {
546                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
547                 info.setLeaderAvailable(false);
548
549                 primaryShardInfoCache.remove(info.getShardName());
550             }
551
552             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
553         }
554     }
555
556     private void markMemberAvailable(final String memberName) {
557         for(ShardInformation info : localShards.values()){
558             String leaderId = info.getLeaderId();
559             if(leaderId != null && leaderId.contains(memberName)) {
560                 LOG.debug("Marking Leader {} as available.", leaderId);
561                 info.setLeaderAvailable(true);
562             }
563
564             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
565         }
566     }
567
568     private void onDatastoreContext(DatastoreContext context) {
569         datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver(
570                 peerAddressResolver).build();
571         for (ShardInformation info : localShards.values()) {
572             if (info.getActor() != null) {
573                 info.getActor().tell(datastoreContext, getSelf());
574             }
575         }
576     }
577
578     private void onSwitchShardBehavior(SwitchShardBehavior message) {
579         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
580
581         ShardInformation shardInformation = localShards.get(identifier.getShardName());
582
583         if(shardInformation != null && shardInformation.getActor() != null) {
584             shardInformation.getActor().tell(
585                     new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
586         } else {
587             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
588                     message.getShardName(), message.getNewState());
589         }
590     }
591
592     /**
593      * Notifies all the local shards of a change in the schema context
594      *
595      * @param message
596      */
597     private void updateSchemaContext(final Object message) {
598         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
599
600         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
601
602         for (ShardInformation info : localShards.values()) {
603             if (info.getActor() == null) {
604                 LOG.debug("Creating Shard {}", info.getShardId());
605                 info.setActor(newShardActor(schemaContext, info));
606             } else {
607                 info.getActor().tell(message, getSelf());
608             }
609         }
610     }
611
612     @VisibleForTesting
613     protected ClusterWrapper getCluster() {
614         return cluster;
615     }
616
617     @VisibleForTesting
618     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
619         return getContext().actorOf(info.newProps(schemaContext)
620                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
621     }
622
623     private void findPrimary(FindPrimary message) {
624         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
625
626         final String shardName = message.getShardName();
627         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
628
629         // First see if the there is a local replica for the shard
630         final ShardInformation info = localShards.get(shardName);
631         if (info != null) {
632             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
633                 @Override
634                 public Object get() {
635                     String primaryPath = info.getSerializedLeaderActor();
636                     Object found = canReturnLocalShardState && info.isLeader() ?
637                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
638                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
639
640                             if(LOG.isDebugEnabled()) {
641                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
642                             }
643
644                             return found;
645                 }
646             });
647
648             return;
649         }
650
651         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
652             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
653                     shardName, address);
654
655             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
656                     message.isWaitUntilReady()), getContext());
657             return;
658         }
659
660         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
661
662         getSender().tell(new PrimaryNotFoundException(
663                 String.format("No primary shard found for %s.", shardName)), getSelf());
664     }
665
666     /**
667      * Construct the name of the shard actor given the name of the member on
668      * which the shard resides and the name of the shard
669      *
670      * @param memberName
671      * @param shardName
672      * @return
673      */
674     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
675         return peerAddressResolver.getShardIdentifier(memberName, shardName);
676     }
677
678     /**
679      * Create shards that are local to the member on which the ShardManager
680      * runs
681      *
682      */
683     private void createLocalShards() {
684         String memberName = this.cluster.getCurrentMemberName();
685         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
686
687         ShardPropsCreator shardPropsCreator = new DefaultShardPropsCreator();
688         List<String> localShardActorNames = new ArrayList<>();
689         for(String shardName : memberShardNames){
690             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
691             Map<String, String> peerAddresses = getPeerAddresses(shardName);
692             localShardActorNames.add(shardId.toString());
693             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
694                     shardPropsCreator, peerAddressResolver));
695         }
696
697         mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
698                 datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
699
700         mBean.setShardManager(this);
701     }
702
703     /**
704      * Given the name of the shard find the addresses of all it's peers
705      *
706      * @param shardName
707      */
708     private Map<String, String> getPeerAddresses(String shardName) {
709         Collection<String> members = configuration.getMembersFromShardName(shardName);
710         Map<String, String> peerAddresses = new HashMap<>();
711
712         String currentMemberName = this.cluster.getCurrentMemberName();
713
714         for(String memberName : members) {
715             if(!currentMemberName.equals(memberName)) {
716                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
717                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
718                 peerAddresses.put(shardId.toString(), address);
719             }
720         }
721         return peerAddresses;
722     }
723
724     @Override
725     public SupervisorStrategy supervisorStrategy() {
726
727         return new OneForOneStrategy(10, Duration.create("1 minute"),
728                 new Function<Throwable, SupervisorStrategy.Directive>() {
729             @Override
730             public SupervisorStrategy.Directive apply(Throwable t) {
731                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
732                 return SupervisorStrategy.resume();
733             }
734         }
735                 );
736
737     }
738
739     @Override
740     public String persistenceId() {
741         return "shard-manager-" + type;
742     }
743
744     @VisibleForTesting
745     ShardManagerInfoMBean getMBean(){
746         return mBean;
747     }
748
749     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
750         String shardName = shardReplicaMsg.getShardName();
751
752         // verify the local shard replica is already available in the controller node
753         if (localShards.containsKey(shardName)) {
754             LOG.debug ("Local shard {} already available in the controller node", shardName);
755             getSender().tell(new akka.actor.Status.Failure(
756                    new IllegalArgumentException(String.format("Local shard %s already exists",
757                                                  shardName))), getSelf());
758             return;
759         }
760         // verify the shard with the specified name is present in the cluster configuration
761         if (!(this.configuration.isShardConfigured(shardName))) {
762             LOG.debug ("No module configuration exists for shard {}", shardName);
763             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(
764                         String.format("No module configuration exists for shard %s",
765                                        shardName))), getSelf());
766             return;
767         }
768
769         // Create the localShard
770         getSender().tell(new akka.actor.Status.Success(true), getSelf());
771         return;
772     }
773
774     private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
775         String shardName = shardReplicaMsg.getShardName();
776         boolean deleteStatus = false;
777
778         // verify the local shard replica is available in the controller node
779         if (!localShards.containsKey(shardName)) {
780             LOG.debug ("Local shard replica {} is not available in the controller node", shardName);
781             getSender().tell(new akka.actor.Status.Failure(
782                    new IllegalArgumentException(String.format("Local shard %s not available",
783                                                 shardName))), getSelf());
784             return;
785         }
786         // call RemoveShard for the shardName
787         getSender().tell(new akka.actor.Status.Success(true), getSelf());
788         return;
789     }
790
791     @VisibleForTesting
792     protected static class ShardInformation {
793         private final ShardIdentifier shardId;
794         private final String shardName;
795         private ActorRef actor;
796         private ActorPath actorPath;
797         private final Map<String, String> initialPeerAddresses;
798         private Optional<DataTree> localShardDataTree;
799         private boolean leaderAvailable = false;
800
801         // flag that determines if the actor is ready for business
802         private boolean actorInitialized = false;
803
804         private boolean followerSyncStatus = false;
805
806         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
807         private String role ;
808         private String leaderId;
809         private short leaderVersion;
810
811         private final DatastoreContext datastoreContext;
812         private final ShardPropsCreator shardPropsCreator;
813         private final ShardPeerAddressResolver addressResolver;
814
815         private ShardInformation(String shardName, ShardIdentifier shardId,
816                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
817                 ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) {
818             this.shardName = shardName;
819             this.shardId = shardId;
820             this.initialPeerAddresses = initialPeerAddresses;
821             this.datastoreContext = datastoreContext;
822             this.shardPropsCreator = shardPropsCreator;
823             this.addressResolver = addressResolver;
824         }
825
826         Props newProps(SchemaContext schemaContext) {
827             return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext);
828         }
829
830         String getShardName() {
831             return shardName;
832         }
833
834         ActorRef getActor(){
835             return actor;
836         }
837
838         ActorPath getActorPath() {
839             return actorPath;
840         }
841
842         void setActor(ActorRef actor) {
843             this.actor = actor;
844             this.actorPath = actor.path();
845         }
846
847         ShardIdentifier getShardId() {
848             return shardId;
849         }
850
851         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
852             this.localShardDataTree = localShardDataTree;
853         }
854
855         Optional<DataTree> getLocalShardDataTree() {
856             return localShardDataTree;
857         }
858
859         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
860             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
861
862             if(actor != null) {
863                 if(LOG.isDebugEnabled()) {
864                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
865                             peerId, peerAddress, actor.path());
866                 }
867
868                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
869             }
870
871             notifyOnShardInitializedCallbacks();
872         }
873
874         void peerDown(String memberName, String peerId, ActorRef sender) {
875             if(actor != null) {
876                 actor.tell(new PeerDown(memberName, peerId), sender);
877             }
878         }
879
880         void peerUp(String memberName, String peerId, ActorRef sender) {
881             if(actor != null) {
882                 actor.tell(new PeerUp(memberName, peerId), sender);
883             }
884         }
885
886         boolean isShardReady() {
887             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
888         }
889
890         boolean isShardReadyWithLeaderId() {
891             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
892                     (isLeader() || addressResolver.resolve(leaderId) != null);
893         }
894
895         boolean isShardInitialized() {
896             return getActor() != null && actorInitialized;
897         }
898
899         boolean isLeader() {
900             return Objects.equal(leaderId, shardId.toString());
901         }
902
903         String getSerializedLeaderActor() {
904             if(isLeader()) {
905                 return Serialization.serializedActorPath(getActor());
906             } else {
907                 return addressResolver.resolve(leaderId);
908             }
909         }
910
911         void setActorInitialized() {
912             LOG.debug("Shard {} is initialized", shardId);
913
914             this.actorInitialized = true;
915
916             notifyOnShardInitializedCallbacks();
917         }
918
919         private void notifyOnShardInitializedCallbacks() {
920             if(onShardInitializedSet.isEmpty()) {
921                 return;
922             }
923
924             boolean ready = isShardReadyWithLeaderId();
925
926             if(LOG.isDebugEnabled()) {
927                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
928                         ready ? "ready" : "initialized", onShardInitializedSet.size());
929             }
930
931             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
932             while(iter.hasNext()) {
933                 OnShardInitialized onShardInitialized = iter.next();
934                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
935                     iter.remove();
936                     onShardInitialized.getTimeoutSchedule().cancel();
937                     onShardInitialized.getReplyRunnable().run();
938                 }
939             }
940         }
941
942         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
943             onShardInitializedSet.add(onShardInitialized);
944         }
945
946         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
947             onShardInitializedSet.remove(onShardInitialized);
948         }
949
950         void setRole(String newRole) {
951             this.role = newRole;
952
953             notifyOnShardInitializedCallbacks();
954         }
955
956         void setFollowerSyncStatus(boolean syncStatus){
957             this.followerSyncStatus = syncStatus;
958         }
959
960         boolean isInSync(){
961             if(RaftState.Follower.name().equals(this.role)){
962                 return followerSyncStatus;
963             } else if(RaftState.Leader.name().equals(this.role)){
964                 return true;
965             }
966
967             return false;
968         }
969
970         boolean setLeaderId(String leaderId) {
971             boolean changed = !Objects.equal(this.leaderId, leaderId);
972             this.leaderId = leaderId;
973             if(leaderId != null) {
974                 this.leaderAvailable = true;
975             }
976             notifyOnShardInitializedCallbacks();
977
978             return changed;
979         }
980
981         String getLeaderId() {
982             return leaderId;
983         }
984
985         void setLeaderAvailable(boolean leaderAvailable) {
986             this.leaderAvailable = leaderAvailable;
987         }
988
989         short getLeaderVersion() {
990             return leaderVersion;
991         }
992
993         void setLeaderVersion(short leaderVersion) {
994             this.leaderVersion = leaderVersion;
995         }
996     }
997
998     private static class ShardManagerCreator implements Creator<ShardManager> {
999         private static final long serialVersionUID = 1L;
1000
1001         final ClusterWrapper cluster;
1002         final Configuration configuration;
1003         final DatastoreContext datastoreContext;
1004         private final CountDownLatch waitTillReadyCountdownLatch;
1005         private final PrimaryShardInfoFutureCache primaryShardInfoCache;
1006
1007         ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
1008                 CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
1009             this.cluster = cluster;
1010             this.configuration = configuration;
1011             this.datastoreContext = datastoreContext;
1012             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1013             this.primaryShardInfoCache = primaryShardInfoCache;
1014         }
1015
1016         @Override
1017         public ShardManager create() throws Exception {
1018             return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
1019                     primaryShardInfoCache);
1020         }
1021     }
1022
1023     private static class OnShardInitialized {
1024         private final Runnable replyRunnable;
1025         private Cancellable timeoutSchedule;
1026
1027         OnShardInitialized(Runnable replyRunnable) {
1028             this.replyRunnable = replyRunnable;
1029         }
1030
1031         Runnable getReplyRunnable() {
1032             return replyRunnable;
1033         }
1034
1035         Cancellable getTimeoutSchedule() {
1036             return timeoutSchedule;
1037         }
1038
1039         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1040             this.timeoutSchedule = timeoutSchedule;
1041         }
1042     }
1043
1044     private static class OnShardReady extends OnShardInitialized {
1045         OnShardReady(Runnable replyRunnable) {
1046             super(replyRunnable);
1047         }
1048     }
1049
1050     private static class ShardNotInitializedTimeout {
1051         private final ActorRef sender;
1052         private final ShardInformation shardInfo;
1053         private final OnShardInitialized onShardInitialized;
1054
1055         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1056             this.sender = sender;
1057             this.shardInfo = shardInfo;
1058             this.onShardInitialized = onShardInitialized;
1059         }
1060
1061         ActorRef getSender() {
1062             return sender;
1063         }
1064
1065         ShardInformation getShardInfo() {
1066             return shardInfo;
1067         }
1068
1069         OnShardInitialized getOnShardInitialized() {
1070             return onShardInitialized;
1071         }
1072     }
1073
1074     /**
1075      * We no longer persist SchemaContextModules but keep this class around for now for backwards
1076      * compatibility so we don't get de-serialization failures on upgrade from Helium.
1077      */
1078     @Deprecated
1079     static class SchemaContextModules implements Serializable {
1080         private static final long serialVersionUID = -8884620101025936590L;
1081
1082         private final Set<String> modules;
1083
1084         SchemaContextModules(Set<String> modules){
1085             this.modules = modules;
1086         }
1087
1088         public Set<String> getModules() {
1089             return modules;
1090         }
1091     }
1092 }
1093
1094
1095