d61e12e1cb2fd2bc97e7f6b34429c22adaf72669
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Function;
23 import akka.persistence.RecoveryCompleted;
24 import akka.persistence.SaveSnapshotFailure;
25 import akka.persistence.SaveSnapshotSuccess;
26 import akka.persistence.SnapshotOffer;
27 import akka.persistence.SnapshotSelectionCriteria;
28 import akka.serialization.Serialization;
29 import akka.util.Timeout;
30 import com.google.common.annotations.VisibleForTesting;
31 import com.google.common.base.Objects;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Preconditions;
34 import com.google.common.base.Strings;
35 import com.google.common.base.Supplier;
36 import com.google.common.collect.Sets;
37 import java.io.ByteArrayInputStream;
38 import java.io.ObjectInputStream;
39 import java.io.Serializable;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.Collections;
43 import java.util.HashMap;
44 import java.util.HashSet;
45 import java.util.Iterator;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.Set;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import javax.annotation.Nonnull;
53 import javax.annotation.Nullable;
54 import org.apache.commons.lang3.SerializationUtils;
55 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
56 import org.opendaylight.controller.cluster.datastore.config.Configuration;
57 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
58 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
59 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
60 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
61 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
64 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
65 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
66 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
67 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
68 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
69 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
70 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
71 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
72 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
73 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
74 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
75 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
76 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
77 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
78 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
79 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
80 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
81 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
82 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
83 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
84 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
86 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
87 import org.opendaylight.controller.cluster.raft.RaftState;
88 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
89 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
90 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
91 import org.opendaylight.controller.cluster.raft.messages.AddServer;
92 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
93 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
94 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
95 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
96 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
97 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
98 import org.slf4j.Logger;
99 import org.slf4j.LoggerFactory;
100 import scala.concurrent.Future;
101 import scala.concurrent.duration.Duration;
102 import scala.concurrent.duration.FiniteDuration;
103
104 /**
105  * The ShardManager has the following jobs,
106  * <ul>
107  * <li> Create all the local shard replicas that belong on this cluster member
108  * <li> Find the address of the local shard
109  * <li> Find the primary replica for any given shard
110  * <li> Monitor the cluster members and store their addresses
111  * <ul>
112  */
113 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
114
115     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
116
117     // Stores a mapping between a shard name and it's corresponding information
118     // Shard names look like inventory, topology etc and are as specified in
119     // configuration
120     private final Map<String, ShardInformation> localShards = new HashMap<>();
121
122     // The type of a ShardManager reflects the type of the datastore itself
123     // A data store could be of type config/operational
124     private final String type;
125
126     private final ClusterWrapper cluster;
127
128     private final Configuration configuration;
129
130     private final String shardDispatcherPath;
131
132     private final ShardManagerInfo mBean;
133
134     private DatastoreContextFactory datastoreContextFactory;
135
136     private final CountDownLatch waitTillReadyCountdownLatch;
137
138     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
139
140     private final ShardPeerAddressResolver peerAddressResolver;
141
142     private SchemaContext schemaContext;
143
144     private DatastoreSnapshot restoreFromSnapshot;
145
146     private ShardManagerSnapshot currentSnapshot;
147
148     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
149
150     private final String persistenceId;
151
152     /**
153      */
154     protected ShardManager(AbstractBuilder<?> builder) {
155
156         this.cluster = builder.cluster;
157         this.configuration = builder.configuration;
158         this.datastoreContextFactory = builder.datastoreContextFactory;
159         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
160         this.shardDispatcherPath =
161                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
162         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
163         this.primaryShardInfoCache = builder.primaryShardInfoCache;
164         this.restoreFromSnapshot = builder.restoreFromSnapshot;
165
166         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
167         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
168
169         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
170
171         // Subscribe this actor to cluster member events
172         cluster.subscribeToMemberEvents(getSelf());
173
174         List<String> localShardActorNames = new ArrayList<>();
175         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
176                 "shard-manager-" + this.type,
177                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
178                 localShardActorNames);
179         mBean.setShardManager(this);
180     }
181
182     @Override
183     public void postStop() {
184         LOG.info("Stopping ShardManager");
185
186         mBean.unregisterMBean();
187     }
188
189     @Override
190     public void handleCommand(Object message) throws Exception {
191         if (message  instanceof FindPrimary) {
192             findPrimary((FindPrimary)message);
193         } else if(message instanceof FindLocalShard){
194             findLocalShard((FindLocalShard) message);
195         } else if (message instanceof UpdateSchemaContext) {
196             updateSchemaContext(message);
197         } else if(message instanceof ActorInitialized) {
198             onActorInitialized(message);
199         } else if (message instanceof ClusterEvent.MemberUp){
200             memberUp((ClusterEvent.MemberUp) message);
201         } else if (message instanceof ClusterEvent.MemberExited){
202             memberExited((ClusterEvent.MemberExited) message);
203         } else if(message instanceof ClusterEvent.MemberRemoved) {
204             memberRemoved((ClusterEvent.MemberRemoved) message);
205         } else if(message instanceof ClusterEvent.UnreachableMember) {
206             memberUnreachable((ClusterEvent.UnreachableMember)message);
207         } else if(message instanceof ClusterEvent.ReachableMember) {
208             memberReachable((ClusterEvent.ReachableMember) message);
209         } else if(message instanceof DatastoreContextFactory) {
210             onDatastoreContextFactory((DatastoreContextFactory)message);
211         } else if(message instanceof RoleChangeNotification) {
212             onRoleChangeNotification((RoleChangeNotification) message);
213         } else if(message instanceof FollowerInitialSyncUpStatus){
214             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
215         } else if(message instanceof ShardNotInitializedTimeout) {
216             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
217         } else if(message instanceof ShardLeaderStateChanged) {
218             onLeaderStateChanged((ShardLeaderStateChanged) message);
219         } else if(message instanceof SwitchShardBehavior){
220             onSwitchShardBehavior((SwitchShardBehavior) message);
221         } else if(message instanceof CreateShard) {
222             onCreateShard((CreateShard)message);
223         } else if(message instanceof AddShardReplica){
224             onAddShardReplica((AddShardReplica)message);
225         } else if(message instanceof ForwardedAddServerReply) {
226             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
227             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
228                     msg.removeShardOnFailure);
229         } else if(message instanceof ForwardedAddServerFailure) {
230             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
231             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
232         } else if(message instanceof PrimaryShardFoundForContext) {
233             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
234             onPrimaryShardFoundContext(primaryShardFoundContext);
235         } else if(message instanceof RemoveShardReplica){
236             onRemoveShardReplica((RemoveShardReplica)message);
237         } else if(message instanceof GetSnapshot) {
238             onGetSnapshot();
239         } else if(message instanceof ServerRemoved){
240             onShardReplicaRemoved((ServerRemoved) message);
241         } else if (message instanceof SaveSnapshotSuccess) {
242             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
243         } else if (message instanceof SaveSnapshotFailure) {
244             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
245                     persistenceId(), ((SaveSnapshotFailure) message).cause());
246         } else {
247             unknownMessage(message);
248         }
249     }
250
251     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
252         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
253             addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender());
254         }
255     }
256
257     private void onShardReplicaRemoved(ServerRemoved message) {
258         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
259         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
260         if(shardInformation == null) {
261             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
262             return;
263         } else if(shardInformation.getActor() != null) {
264             LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
265             shardInformation.getActor().tell(PoisonPill.getInstance(), self());
266         }
267         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
268         persistShardList();
269     }
270
271     private void onGetSnapshot() {
272         LOG.debug("{}: onGetSnapshot", persistenceId());
273
274         List<String> notInitialized = null;
275         for(ShardInformation shardInfo: localShards.values()) {
276             if(!shardInfo.isShardInitialized()) {
277                 if(notInitialized == null) {
278                     notInitialized = new ArrayList<>();
279                 }
280
281                 notInitialized.add(shardInfo.getShardName());
282             }
283         }
284
285         if(notInitialized != null) {
286             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
287                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
288             return;
289         }
290
291         byte[] shardManagerSnapshot = null;
292         if(currentSnapshot != null) {
293             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
294         }
295
296         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
297                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
298                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
299
300         for(ShardInformation shardInfo: localShards.values()) {
301             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
302         }
303     }
304
305     private void onCreateShard(CreateShard createShard) {
306         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
307
308         Object reply;
309         try {
310             String shardName = createShard.getModuleShardConfig().getShardName();
311             if(localShards.containsKey(shardName)) {
312                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
313                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
314             } else {
315                 doCreateShard(createShard);
316                 reply = new akka.actor.Status.Success(null);
317             }
318         } catch (Exception e) {
319             LOG.error("{}: onCreateShard failed", persistenceId(), e);
320             reply = new akka.actor.Status.Failure(e);
321         }
322
323         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
324             getSender().tell(reply, getSelf());
325         }
326     }
327
328     private void doCreateShard(CreateShard createShard) {
329         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
330         String shardName = moduleShardConfig.getShardName();
331
332         configuration.addModuleShardConfiguration(moduleShardConfig);
333
334         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
335         if(shardDatastoreContext == null) {
336             shardDatastoreContext = newShardDatastoreContext(shardName);
337         } else {
338             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
339                     peerAddressResolver).build();
340         }
341
342         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
343
344         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
345                 currentSnapshot.getShardList().contains(shardName);
346
347         Map<String, String> peerAddresses;
348         boolean isActiveMember;
349         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
350                 contains(cluster.getCurrentMemberName())) {
351             peerAddresses = getPeerAddresses(shardName);
352             isActiveMember = true;
353         } else {
354             // The local member is not in the static shard member configuration and the shard did not
355             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
356             // the shard with no peers and with elections disabled so it stays as follower. A
357             // subsequent AddServer request will be needed to make it an active member.
358             isActiveMember = false;
359             peerAddresses = Collections.emptyMap();
360             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
361                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
362         }
363
364         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
365                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
366                 isActiveMember);
367
368         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
369                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
370         info.setActiveMember(isActiveMember);
371         localShards.put(info.getShardName(), info);
372
373         mBean.addLocalShard(shardId.toString());
374
375         if(schemaContext != null) {
376             info.setActor(newShardActor(schemaContext, info));
377         }
378     }
379
380     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
381         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
382                 shardPeerAddressResolver(peerAddressResolver);
383     }
384
385     private DatastoreContext newShardDatastoreContext(String shardName) {
386         return newShardDatastoreContextBuilder(shardName).build();
387     }
388
389     private void checkReady(){
390         if (isReadyWithLeaderId()) {
391             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
392                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
393
394             waitTillReadyCountdownLatch.countDown();
395         }
396     }
397
398     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
399         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
400
401         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
402         if(shardInformation != null) {
403             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
404             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
405             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
406                 primaryShardInfoCache.remove(shardInformation.getShardName());
407             }
408
409             checkReady();
410         } else {
411             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
412         }
413     }
414
415     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
416         ShardInformation shardInfo = message.getShardInfo();
417
418         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
419                 shardInfo.getShardName());
420
421         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
422
423         if(!shardInfo.isShardInitialized()) {
424             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
425             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
426         } else {
427             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
428             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
429         }
430     }
431
432     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
433         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
434                 status.getName(), status.isInitialSyncDone());
435
436         ShardInformation shardInformation = findShardInformation(status.getName());
437
438         if(shardInformation != null) {
439             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
440
441             mBean.setSyncStatus(isInSync());
442         }
443
444     }
445
446     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
447         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
448                 roleChanged.getOldRole(), roleChanged.getNewRole());
449
450         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
451         if(shardInformation != null) {
452             shardInformation.setRole(roleChanged.getNewRole());
453             checkReady();
454             mBean.setSyncStatus(isInSync());
455         }
456     }
457
458
459     private ShardInformation findShardInformation(String memberId) {
460         for(ShardInformation info : localShards.values()){
461             if(info.getShardId().toString().equals(memberId)){
462                 return info;
463             }
464         }
465
466         return null;
467     }
468
469     private boolean isReadyWithLeaderId() {
470         boolean isReady = true;
471         for (ShardInformation info : localShards.values()) {
472             if(!info.isShardReadyWithLeaderId()){
473                 isReady = false;
474                 break;
475             }
476         }
477         return isReady;
478     }
479
480     private boolean isInSync(){
481         for (ShardInformation info : localShards.values()) {
482             if(!info.isInSync()){
483                 return false;
484             }
485         }
486         return true;
487     }
488
489     private void onActorInitialized(Object message) {
490         final ActorRef sender = getSender();
491
492         if (sender == null) {
493             return; //why is a non-actor sending this message? Just ignore.
494         }
495
496         String actorName = sender.path().name();
497         //find shard name from actor name; actor name is stringified shardId
498         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
499
500         if (shardId.getShardName() == null) {
501             return;
502         }
503
504         markShardAsInitialized(shardId.getShardName());
505     }
506
507     private void markShardAsInitialized(String shardName) {
508         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
509
510         ShardInformation shardInformation = localShards.get(shardName);
511         if (shardInformation != null) {
512             shardInformation.setActorInitialized();
513
514             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
515         }
516     }
517
518     @Override
519     protected void handleRecover(Object message) throws Exception {
520         if (message instanceof RecoveryCompleted) {
521             onRecoveryCompleted();
522         } else if (message instanceof SnapshotOffer) {
523             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
524         }
525     }
526
527     private void onRecoveryCompleted() {
528         LOG.info("Recovery complete : {}", persistenceId());
529
530         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
531         // journal on upgrade from Helium.
532         deleteMessages(lastSequenceNr());
533
534         if(currentSnapshot == null && restoreFromSnapshot != null &&
535                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
536             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
537                     restoreFromSnapshot.getShardManagerSnapshot()))) {
538                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
539
540                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
541
542                 applyShardManagerSnapshot(snapshot);
543             } catch(Exception e) {
544                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
545             }
546         }
547
548         createLocalShards();
549     }
550
551     private void findLocalShard(FindLocalShard message) {
552         final ShardInformation shardInformation = localShards.get(message.getShardName());
553
554         if(shardInformation == null){
555             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
556             return;
557         }
558
559         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
560             @Override
561             public Object get() {
562                 return new LocalShardFound(shardInformation.getActor());
563             }
564         });
565     }
566
567     private void sendResponse(ShardInformation shardInformation, boolean doWait,
568             boolean wantShardReady, final Supplier<Object> messageSupplier) {
569         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
570             if(doWait) {
571                 final ActorRef sender = getSender();
572                 final ActorRef self = self();
573
574                 Runnable replyRunnable = new Runnable() {
575                     @Override
576                     public void run() {
577                         sender.tell(messageSupplier.get(), self);
578                     }
579                 };
580
581                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
582                     new OnShardInitialized(replyRunnable);
583
584                 shardInformation.addOnShardInitialized(onShardInitialized);
585
586                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
587                 if(shardInformation.isShardInitialized()) {
588                     // If the shard is already initialized then we'll wait enough time for the shard to
589                     // elect a leader, ie 2 times the election timeout.
590                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
591                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
592                 }
593
594                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
595                         shardInformation.getShardName());
596
597                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
598                         timeout, getSelf(),
599                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
600                         getContext().dispatcher(), getSelf());
601
602                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
603
604             } else if (!shardInformation.isShardInitialized()) {
605                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
606                         shardInformation.getShardName());
607                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
608             } else {
609                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
610                         shardInformation.getShardName());
611                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
612             }
613
614             return;
615         }
616
617         getSender().tell(messageSupplier.get(), getSelf());
618     }
619
620     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
621         return new NoShardLeaderException(null, shardId.toString());
622     }
623
624     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
625         return new NotInitializedException(String.format(
626                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
627     }
628
629     private void memberRemoved(ClusterEvent.MemberRemoved message) {
630         String memberName = message.member().roles().head();
631
632         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
633                 message.member().address());
634
635         peerAddressResolver.removePeerAddress(memberName);
636
637         for(ShardInformation info : localShards.values()){
638             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
639         }
640     }
641
642     private void memberExited(ClusterEvent.MemberExited message) {
643         String memberName = message.member().roles().head();
644
645         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
646                 message.member().address());
647
648         peerAddressResolver.removePeerAddress(memberName);
649
650         for(ShardInformation info : localShards.values()){
651             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
652         }
653     }
654
655     private void memberUp(ClusterEvent.MemberUp message) {
656         String memberName = message.member().roles().head();
657
658         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
659                 message.member().address());
660
661         addPeerAddress(memberName, message.member().address());
662
663         checkReady();
664     }
665
666     private void addPeerAddress(String memberName, Address address) {
667         peerAddressResolver.addPeerAddress(memberName, address);
668
669         for(ShardInformation info : localShards.values()){
670             String shardName = info.getShardName();
671             String peerId = getShardIdentifier(memberName, shardName).toString();
672             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
673
674             info.peerUp(memberName, peerId, getSelf());
675         }
676     }
677
678     private void memberReachable(ClusterEvent.ReachableMember message) {
679         String memberName = message.member().roles().head();
680         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
681
682         addPeerAddress(memberName, message.member().address());
683
684         markMemberAvailable(memberName);
685     }
686
687     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
688         String memberName = message.member().roles().head();
689         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
690
691         markMemberUnavailable(memberName);
692     }
693
694     private void markMemberUnavailable(final String memberName) {
695         for(ShardInformation info : localShards.values()){
696             String leaderId = info.getLeaderId();
697             if(leaderId != null && leaderId.contains(memberName)) {
698                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
699                 info.setLeaderAvailable(false);
700
701                 primaryShardInfoCache.remove(info.getShardName());
702             }
703
704             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
705         }
706     }
707
708     private void markMemberAvailable(final String memberName) {
709         for(ShardInformation info : localShards.values()){
710             String leaderId = info.getLeaderId();
711             if(leaderId != null && leaderId.contains(memberName)) {
712                 LOG.debug("Marking Leader {} as available.", leaderId);
713                 info.setLeaderAvailable(true);
714             }
715
716             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
717         }
718     }
719
720     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
721         datastoreContextFactory = factory;
722         for (ShardInformation info : localShards.values()) {
723             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
724         }
725     }
726
727     private void onSwitchShardBehavior(SwitchShardBehavior message) {
728         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
729
730         ShardInformation shardInformation = localShards.get(identifier.getShardName());
731
732         if(shardInformation != null && shardInformation.getActor() != null) {
733             shardInformation.getActor().tell(
734                     new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
735         } else {
736             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
737                     message.getShardName(), message.getNewState());
738         }
739     }
740
741     /**
742      * Notifies all the local shards of a change in the schema context
743      *
744      * @param message
745      */
746     private void updateSchemaContext(final Object message) {
747         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
748
749         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
750
751         for (ShardInformation info : localShards.values()) {
752             if (info.getActor() == null) {
753                 LOG.debug("Creating Shard {}", info.getShardId());
754                 info.setActor(newShardActor(schemaContext, info));
755             } else {
756                 info.getActor().tell(message, getSelf());
757             }
758         }
759     }
760
761     @VisibleForTesting
762     protected ClusterWrapper getCluster() {
763         return cluster;
764     }
765
766     @VisibleForTesting
767     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
768         return getContext().actorOf(info.newProps(schemaContext)
769                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
770     }
771
772     private void findPrimary(FindPrimary message) {
773         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
774
775         final String shardName = message.getShardName();
776         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
777
778         // First see if the there is a local replica for the shard
779         final ShardInformation info = localShards.get(shardName);
780         if (info != null && info.isActiveMember()) {
781             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
782                 @Override
783                 public Object get() {
784                     String primaryPath = info.getSerializedLeaderActor();
785                     Object found = canReturnLocalShardState && info.isLeader() ?
786                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
787                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
788
789                             if(LOG.isDebugEnabled()) {
790                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
791                             }
792
793                             return found;
794                 }
795             });
796
797             return;
798         }
799
800         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
801             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
802                     shardName, address);
803
804             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
805                     message.isWaitUntilReady()), getContext());
806             return;
807         }
808
809         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
810
811         getSender().tell(new PrimaryNotFoundException(
812                 String.format("No primary shard found for %s.", shardName)), getSelf());
813     }
814
815     /**
816      * Construct the name of the shard actor given the name of the member on
817      * which the shard resides and the name of the shard
818      *
819      * @param memberName
820      * @param shardName
821      * @return
822      */
823     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
824         return peerAddressResolver.getShardIdentifier(memberName, shardName);
825     }
826
827     /**
828      * Create shards that are local to the member on which the ShardManager
829      * runs
830      *
831      */
832     private void createLocalShards() {
833         String memberName = this.cluster.getCurrentMemberName();
834         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
835
836         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
837         if(restoreFromSnapshot != null)
838         {
839             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
840                 shardSnapshots.put(snapshot.getName(), snapshot);
841             }
842         }
843
844         restoreFromSnapshot = null; // null out to GC
845
846         for(String shardName : memberShardNames){
847             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
848
849             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
850
851             Map<String, String> peerAddresses = getPeerAddresses(shardName);
852             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
853                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
854                         shardSnapshots.get(shardName)), peerAddressResolver));
855             mBean.addLocalShard(shardId.toString());
856         }
857     }
858
859     /**
860      * Given the name of the shard find the addresses of all it's peers
861      *
862      * @param shardName
863      */
864     private Map<String, String> getPeerAddresses(String shardName) {
865         Collection<String> members = configuration.getMembersFromShardName(shardName);
866         Map<String, String> peerAddresses = new HashMap<>();
867
868         String currentMemberName = this.cluster.getCurrentMemberName();
869
870         for(String memberName : members) {
871             if(!currentMemberName.equals(memberName)) {
872                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
873                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
874                 peerAddresses.put(shardId.toString(), address);
875             }
876         }
877         return peerAddresses;
878     }
879
880     @Override
881     public SupervisorStrategy supervisorStrategy() {
882
883         return new OneForOneStrategy(10, Duration.create("1 minute"),
884                 new Function<Throwable, SupervisorStrategy.Directive>() {
885             @Override
886             public SupervisorStrategy.Directive apply(Throwable t) {
887                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
888                 return SupervisorStrategy.resume();
889             }
890         }
891                 );
892
893     }
894
895     @Override
896     public String persistenceId() {
897         return persistenceId;
898     }
899
900     @VisibleForTesting
901     ShardManagerInfoMBean getMBean(){
902         return mBean;
903     }
904
905     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
906         if (shardReplicaOperationsInProgress.contains(shardName)) {
907             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
908             LOG.debug ("{}: {}", persistenceId(), msg);
909             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
910             return true;
911         }
912
913         return false;
914     }
915
916     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
917         final String shardName = shardReplicaMsg.getShardName();
918
919         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
920
921         // verify the shard with the specified name is present in the cluster configuration
922         if (!(this.configuration.isShardConfigured(shardName))) {
923             String msg = String.format("No module configuration exists for shard %s", shardName);
924             LOG.debug ("{}: {}", persistenceId(), msg);
925             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
926             return;
927         }
928
929         // Create the localShard
930         if (schemaContext == null) {
931             String msg = String.format(
932                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
933             LOG.debug ("{}: {}", persistenceId(), msg);
934             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
935             return;
936         }
937
938         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
939             @Override
940             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
941                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
942
943             }
944
945             @Override
946             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
947                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
948             }
949
950         });
951     }
952
953     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
954         String msg = String.format("Local shard %s already exists", shardName);
955         LOG.debug ("{}: {}", persistenceId(), msg);
956         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
957     }
958
959     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
960         if(isShardReplicaOperationInProgress(shardName, sender)) {
961             return;
962         }
963
964         shardReplicaOperationsInProgress.add(shardName);
965
966         final ShardInformation shardInfo;
967         final boolean removeShardOnFailure;
968         ShardInformation existingShardInfo = localShards.get(shardName);
969         if(existingShardInfo == null) {
970             removeShardOnFailure = true;
971             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
972
973             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
974                     DisableElectionsRaftPolicy.class.getName()).build();
975
976             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
977                     Shard.builder(), peerAddressResolver);
978             shardInfo.setActiveMember(false);
979             localShards.put(shardName, shardInfo);
980             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
981         } else {
982             removeShardOnFailure = false;
983             shardInfo = existingShardInfo;
984         }
985
986         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
987
988         //inform ShardLeader to add this shard as a replica by sending an AddServer message
989         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
990                 response.getPrimaryPath(), shardInfo.getShardId());
991
992         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
993                 duration());
994         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
995             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
996
997         futureObj.onComplete(new OnComplete<Object>() {
998             @Override
999             public void onComplete(Throwable failure, Object addServerResponse) {
1000                 if (failure != null) {
1001                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1002                             response.getPrimaryPath(), shardName, failure);
1003
1004                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1005                             response.getPrimaryPath(), shardName);
1006                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1007                 } else {
1008                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1009                             response.getPrimaryPath(), removeShardOnFailure), sender);
1010                 }
1011             }
1012         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1013     }
1014
1015     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1016             boolean removeShardOnFailure) {
1017         shardReplicaOperationsInProgress.remove(shardName);
1018
1019         if(removeShardOnFailure) {
1020             ShardInformation shardInfo = localShards.remove(shardName);
1021             if (shardInfo.getActor() != null) {
1022                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1023             }
1024         }
1025
1026         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
1027             new RuntimeException(message, failure)), getSelf());
1028     }
1029
1030     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1031             String leaderPath, boolean removeShardOnFailure) {
1032         String shardName = shardInfo.getShardName();
1033         shardReplicaOperationsInProgress.remove(shardName);
1034
1035         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1036
1037         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1038             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1039
1040             // Make the local shard voting capable
1041             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1042             shardInfo.setActiveMember(true);
1043             persistShardList();
1044
1045             mBean.addLocalShard(shardInfo.getShardId().toString());
1046             sender.tell(new akka.actor.Status.Success(null), getSelf());
1047         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1048             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1049         } else {
1050             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1051                     persistenceId(), shardName, replyMsg.getStatus());
1052
1053             Exception failure;
1054             switch (replyMsg.getStatus()) {
1055                 case TIMEOUT:
1056                     failure = new TimeoutException(String.format(
1057                             "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1058                             "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1059                             leaderPath, shardName));
1060                     break;
1061                 case NO_LEADER:
1062                     failure = createNoShardLeaderException(shardInfo.getShardId());
1063                     break;
1064                 default :
1065                     failure = new RuntimeException(String.format(
1066                             "AddServer request to leader %s for shard %s failed with status %s",
1067                             leaderPath, shardName, replyMsg.getStatus()));
1068             }
1069
1070             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1071         }
1072     }
1073
1074     private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
1075         String shardName = shardReplicaMsg.getShardName();
1076
1077         // verify the local shard replica is available in the controller node
1078         if (!localShards.containsKey(shardName)) {
1079             String msg = String.format("Local shard %s does not", shardName);
1080             LOG.debug ("{}: {}", persistenceId(), msg);
1081             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1082             return;
1083         }
1084         // call RemoveShard for the shardName
1085         getSender().tell(new akka.actor.Status.Success(true), getSelf());
1086         return;
1087     }
1088
1089     private void persistShardList() {
1090         List<String> shardList = new ArrayList<>(localShards.keySet());
1091         for (ShardInformation shardInfo : localShards.values()) {
1092             if (!shardInfo.isActiveMember()) {
1093                 shardList.remove(shardInfo.getShardName());
1094             }
1095         }
1096         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1097         saveSnapshot(updateShardManagerSnapshot(shardList));
1098     }
1099
1100     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1101         currentSnapshot = new ShardManagerSnapshot(shardList);
1102         return currentSnapshot;
1103     }
1104
1105     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1106         currentSnapshot = snapshot;
1107
1108         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1109
1110         String currentMember = cluster.getCurrentMemberName();
1111         Set<String> configuredShardList =
1112             new HashSet<>(configuration.getMemberShardNames(currentMember));
1113         for (String shard : currentSnapshot.getShardList()) {
1114             if (!configuredShardList.contains(shard)) {
1115                 // add the current member as a replica for the shard
1116                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1117                 configuration.addMemberReplicaForShard(shard, currentMember);
1118             } else {
1119                 configuredShardList.remove(shard);
1120             }
1121         }
1122         for (String shard : configuredShardList) {
1123             // remove the member as a replica for the shard
1124             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1125             configuration.removeMemberReplicaForShard(shard, currentMember);
1126         }
1127     }
1128
1129     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1130         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1131             persistenceId());
1132         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
1133     }
1134
1135     private static class ForwardedAddServerReply {
1136         ShardInformation shardInfo;
1137         AddServerReply addServerReply;
1138         String leaderPath;
1139         boolean removeShardOnFailure;
1140
1141         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1142                 boolean removeShardOnFailure) {
1143             this.shardInfo = shardInfo;
1144             this.addServerReply = addServerReply;
1145             this.leaderPath = leaderPath;
1146             this.removeShardOnFailure = removeShardOnFailure;
1147         }
1148     }
1149
1150     private static class ForwardedAddServerFailure {
1151         String shardName;
1152         String failureMessage;
1153         Throwable failure;
1154         boolean removeShardOnFailure;
1155
1156         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1157                 boolean removeShardOnFailure) {
1158             this.shardName = shardName;
1159             this.failureMessage = failureMessage;
1160             this.failure = failure;
1161             this.removeShardOnFailure = removeShardOnFailure;
1162         }
1163     }
1164
1165     @VisibleForTesting
1166     protected static class ShardInformation {
1167         private final ShardIdentifier shardId;
1168         private final String shardName;
1169         private ActorRef actor;
1170         private ActorPath actorPath;
1171         private final Map<String, String> initialPeerAddresses;
1172         private Optional<DataTree> localShardDataTree;
1173         private boolean leaderAvailable = false;
1174
1175         // flag that determines if the actor is ready for business
1176         private boolean actorInitialized = false;
1177
1178         private boolean followerSyncStatus = false;
1179
1180         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1181         private String role ;
1182         private String leaderId;
1183         private short leaderVersion;
1184
1185         private DatastoreContext datastoreContext;
1186         private Shard.AbstractBuilder<?, ?> builder;
1187         private final ShardPeerAddressResolver addressResolver;
1188         private boolean isActiveMember = true;
1189
1190         private ShardInformation(String shardName, ShardIdentifier shardId,
1191                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1192                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1193             this.shardName = shardName;
1194             this.shardId = shardId;
1195             this.initialPeerAddresses = initialPeerAddresses;
1196             this.datastoreContext = datastoreContext;
1197             this.builder = builder;
1198             this.addressResolver = addressResolver;
1199         }
1200
1201         Props newProps(SchemaContext schemaContext) {
1202             Preconditions.checkNotNull(builder);
1203             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1204                     schemaContext(schemaContext).props();
1205             builder = null;
1206             return props;
1207         }
1208
1209         String getShardName() {
1210             return shardName;
1211         }
1212
1213         @Nullable
1214         ActorRef getActor(){
1215             return actor;
1216         }
1217
1218         ActorPath getActorPath() {
1219             return actorPath;
1220         }
1221
1222         void setActor(ActorRef actor) {
1223             this.actor = actor;
1224             this.actorPath = actor.path();
1225         }
1226
1227         ShardIdentifier getShardId() {
1228             return shardId;
1229         }
1230
1231         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1232             this.localShardDataTree = localShardDataTree;
1233         }
1234
1235         Optional<DataTree> getLocalShardDataTree() {
1236             return localShardDataTree;
1237         }
1238
1239         DatastoreContext getDatastoreContext() {
1240             return datastoreContext;
1241         }
1242
1243         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1244             this.datastoreContext = datastoreContext;
1245             if (actor != null) {
1246                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1247                 actor.tell(this.datastoreContext, sender);
1248             }
1249         }
1250
1251         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1252             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1253
1254             if(actor != null) {
1255                 if(LOG.isDebugEnabled()) {
1256                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1257                             peerId, peerAddress, actor.path());
1258                 }
1259
1260                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1261             }
1262
1263             notifyOnShardInitializedCallbacks();
1264         }
1265
1266         void peerDown(String memberName, String peerId, ActorRef sender) {
1267             if(actor != null) {
1268                 actor.tell(new PeerDown(memberName, peerId), sender);
1269             }
1270         }
1271
1272         void peerUp(String memberName, String peerId, ActorRef sender) {
1273             if(actor != null) {
1274                 actor.tell(new PeerUp(memberName, peerId), sender);
1275             }
1276         }
1277
1278         boolean isShardReady() {
1279             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1280         }
1281
1282         boolean isShardReadyWithLeaderId() {
1283             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1284                     (isLeader() || addressResolver.resolve(leaderId) != null);
1285         }
1286
1287         boolean isShardInitialized() {
1288             return getActor() != null && actorInitialized;
1289         }
1290
1291         boolean isLeader() {
1292             return Objects.equal(leaderId, shardId.toString());
1293         }
1294
1295         String getSerializedLeaderActor() {
1296             if(isLeader()) {
1297                 return Serialization.serializedActorPath(getActor());
1298             } else {
1299                 return addressResolver.resolve(leaderId);
1300             }
1301         }
1302
1303         void setActorInitialized() {
1304             LOG.debug("Shard {} is initialized", shardId);
1305
1306             this.actorInitialized = true;
1307
1308             notifyOnShardInitializedCallbacks();
1309         }
1310
1311         private void notifyOnShardInitializedCallbacks() {
1312             if(onShardInitializedSet.isEmpty()) {
1313                 return;
1314             }
1315
1316             boolean ready = isShardReadyWithLeaderId();
1317
1318             if(LOG.isDebugEnabled()) {
1319                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1320                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1321             }
1322
1323             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1324             while(iter.hasNext()) {
1325                 OnShardInitialized onShardInitialized = iter.next();
1326                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1327                     iter.remove();
1328                     onShardInitialized.getTimeoutSchedule().cancel();
1329                     onShardInitialized.getReplyRunnable().run();
1330                 }
1331             }
1332         }
1333
1334         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1335             onShardInitializedSet.add(onShardInitialized);
1336         }
1337
1338         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1339             onShardInitializedSet.remove(onShardInitialized);
1340         }
1341
1342         void setRole(String newRole) {
1343             this.role = newRole;
1344
1345             notifyOnShardInitializedCallbacks();
1346         }
1347
1348         void setFollowerSyncStatus(boolean syncStatus){
1349             this.followerSyncStatus = syncStatus;
1350         }
1351
1352         boolean isInSync(){
1353             if(RaftState.Follower.name().equals(this.role)){
1354                 return followerSyncStatus;
1355             } else if(RaftState.Leader.name().equals(this.role)){
1356                 return true;
1357             }
1358
1359             return false;
1360         }
1361
1362         boolean setLeaderId(String leaderId) {
1363             boolean changed = !Objects.equal(this.leaderId, leaderId);
1364             this.leaderId = leaderId;
1365             if(leaderId != null) {
1366                 this.leaderAvailable = true;
1367             }
1368             notifyOnShardInitializedCallbacks();
1369
1370             return changed;
1371         }
1372
1373         String getLeaderId() {
1374             return leaderId;
1375         }
1376
1377         void setLeaderAvailable(boolean leaderAvailable) {
1378             this.leaderAvailable = leaderAvailable;
1379         }
1380
1381         short getLeaderVersion() {
1382             return leaderVersion;
1383         }
1384
1385         void setLeaderVersion(short leaderVersion) {
1386             this.leaderVersion = leaderVersion;
1387         }
1388
1389         boolean isActiveMember() {
1390             return isActiveMember;
1391         }
1392
1393         void setActiveMember(boolean isActiveMember) {
1394             this.isActiveMember = isActiveMember;
1395         }
1396     }
1397
1398     private static class OnShardInitialized {
1399         private final Runnable replyRunnable;
1400         private Cancellable timeoutSchedule;
1401
1402         OnShardInitialized(Runnable replyRunnable) {
1403             this.replyRunnable = replyRunnable;
1404         }
1405
1406         Runnable getReplyRunnable() {
1407             return replyRunnable;
1408         }
1409
1410         Cancellable getTimeoutSchedule() {
1411             return timeoutSchedule;
1412         }
1413
1414         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1415             this.timeoutSchedule = timeoutSchedule;
1416         }
1417     }
1418
1419     private static class OnShardReady extends OnShardInitialized {
1420         OnShardReady(Runnable replyRunnable) {
1421             super(replyRunnable);
1422         }
1423     }
1424
1425     private static class ShardNotInitializedTimeout {
1426         private final ActorRef sender;
1427         private final ShardInformation shardInfo;
1428         private final OnShardInitialized onShardInitialized;
1429
1430         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1431             this.sender = sender;
1432             this.shardInfo = shardInfo;
1433             this.onShardInitialized = onShardInitialized;
1434         }
1435
1436         ActorRef getSender() {
1437             return sender;
1438         }
1439
1440         ShardInformation getShardInfo() {
1441             return shardInfo;
1442         }
1443
1444         OnShardInitialized getOnShardInitialized() {
1445             return onShardInitialized;
1446         }
1447     }
1448
1449     /**
1450      * We no longer persist SchemaContextModules but keep this class around for now for backwards
1451      * compatibility so we don't get de-serialization failures on upgrade from Helium.
1452      */
1453     @Deprecated
1454     static class SchemaContextModules implements Serializable {
1455         private static final long serialVersionUID = -8884620101025936590L;
1456
1457         private final Set<String> modules;
1458
1459         SchemaContextModules(Set<String> modules){
1460             this.modules = modules;
1461         }
1462
1463         public Set<String> getModules() {
1464             return modules;
1465         }
1466     }
1467
1468     public static Builder builder() {
1469         return new Builder();
1470     }
1471
1472     public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
1473         private ClusterWrapper cluster;
1474         private Configuration configuration;
1475         private DatastoreContextFactory datastoreContextFactory;
1476         private CountDownLatch waitTillReadyCountdownLatch;
1477         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1478         private DatastoreSnapshot restoreFromSnapshot;
1479         private volatile boolean sealed;
1480
1481         @SuppressWarnings("unchecked")
1482         private T self() {
1483             return (T) this;
1484         }
1485
1486         protected void checkSealed() {
1487             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1488         }
1489
1490         public T cluster(ClusterWrapper cluster) {
1491             checkSealed();
1492             this.cluster = cluster;
1493             return self();
1494         }
1495
1496         public T configuration(Configuration configuration) {
1497             checkSealed();
1498             this.configuration = configuration;
1499             return self();
1500         }
1501
1502         public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1503             checkSealed();
1504             this.datastoreContextFactory = datastoreContextFactory;
1505             return self();
1506         }
1507
1508         public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1509             checkSealed();
1510             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1511             return self();
1512         }
1513
1514         public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1515             checkSealed();
1516             this.primaryShardInfoCache = primaryShardInfoCache;
1517             return self();
1518         }
1519
1520         public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1521             checkSealed();
1522             this.restoreFromSnapshot = restoreFromSnapshot;
1523             return self();
1524         }
1525
1526         protected void verify() {
1527             sealed = true;
1528             Preconditions.checkNotNull(cluster, "cluster should not be null");
1529             Preconditions.checkNotNull(configuration, "configuration should not be null");
1530             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1531             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1532             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1533         }
1534
1535         public Props props() {
1536             verify();
1537             return Props.create(ShardManager.class, this);
1538         }
1539     }
1540
1541     public static class Builder extends AbstractBuilder<Builder> {
1542     }
1543
1544     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1545         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1546                 getShardInitializationTimeout().duration().$times(2));
1547
1548
1549         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1550         futureObj.onComplete(new OnComplete<Object>() {
1551             @Override
1552             public void onComplete(Throwable failure, Object response) {
1553                 if (failure != null) {
1554                     handler.onFailure(failure);
1555                 } else {
1556                     if(response instanceof RemotePrimaryShardFound) {
1557                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1558                     } else if(response instanceof LocalPrimaryShardFound) {
1559                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1560                     } else {
1561                         handler.onUnknownResponse(response);
1562                     }
1563                 }
1564             }
1565         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1566     }
1567
1568     /**
1569      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1570      * a remote or local find primary message is processed
1571      */
1572     private static interface FindPrimaryResponseHandler {
1573         /**
1574          * Invoked when a Failure message is received as a response
1575          *
1576          * @param failure
1577          */
1578         void onFailure(Throwable failure);
1579
1580         /**
1581          * Invoked when a RemotePrimaryShardFound response is received
1582          *
1583          * @param response
1584          */
1585         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1586
1587         /**
1588          * Invoked when a LocalPrimaryShardFound response is received
1589          * @param response
1590          */
1591         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1592
1593         /**
1594          * Invoked when an unknown response is received. This is another type of failure.
1595          *
1596          * @param response
1597          */
1598         void onUnknownResponse(Object response);
1599     }
1600
1601     /**
1602      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1603      * replica and sends a wrapped Failure response to some targetActor
1604      */
1605     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1606         private final ActorRef targetActor;
1607         private final String shardName;
1608         private final String persistenceId;
1609         private final ActorRef shardManagerActor;
1610
1611         /**
1612          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1613          * @param shardName The name of the shard for which the primary replica had to be found
1614          * @param persistenceId The persistenceId for the ShardManager
1615          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1616          */
1617         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1618             this.targetActor = Preconditions.checkNotNull(targetActor);
1619             this.shardName = Preconditions.checkNotNull(shardName);
1620             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1621             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1622         }
1623
1624         public ActorRef getTargetActor() {
1625             return targetActor;
1626         }
1627
1628         public String getShardName() {
1629             return shardName;
1630         }
1631
1632         public ActorRef getShardManagerActor() {
1633             return shardManagerActor;
1634         }
1635
1636         @Override
1637         public void onFailure(Throwable failure) {
1638             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1639             targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1640                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1641         }
1642
1643         @Override
1644         public void onUnknownResponse(Object response) {
1645             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1646                     shardName, response);
1647             LOG.debug ("{}: {}", persistenceId, msg);
1648             targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1649                     new RuntimeException(msg)), shardManagerActor);
1650         }
1651     }
1652
1653
1654     /**
1655      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1656      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1657      * as a successful response to find primary.
1658      */
1659     private static class PrimaryShardFoundForContext {
1660         private final String shardName;
1661         private final Object contextMessage;
1662         private final RemotePrimaryShardFound remotePrimaryShardFound;
1663         private final LocalPrimaryShardFound localPrimaryShardFound;
1664
1665         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) {
1666             this.shardName = Preconditions.checkNotNull(shardName);
1667             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1668             Preconditions.checkNotNull(primaryFoundMessage);
1669             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null;
1670             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null;
1671         }
1672
1673         @Nonnull
1674         public String getPrimaryPath(){
1675             if(remotePrimaryShardFound != null){
1676                 return remotePrimaryShardFound.getPrimaryPath();
1677             }
1678             return localPrimaryShardFound.getPrimaryPath();
1679         }
1680
1681         @Nonnull
1682         public Object getContextMessage() {
1683             return contextMessage;
1684         }
1685
1686         @Nullable
1687         public RemotePrimaryShardFound getRemotePrimaryShardFound(){
1688             return remotePrimaryShardFound;
1689         }
1690
1691         @Nullable
1692         public LocalPrimaryShardFound getLocalPrimaryShardFound(){
1693             return localPrimaryShardFound;
1694         }
1695
1696         boolean isPrimaryLocal(){
1697             return (remotePrimaryShardFound == null);
1698         }
1699
1700         @Nonnull
1701         public String getShardName() {
1702             return shardName;
1703         }
1704     }
1705 }
1706
1707
1708