Bug 2187: Implement add-replicas-for-all-shards RPC
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Function;
23 import akka.persistence.RecoveryCompleted;
24 import akka.persistence.SaveSnapshotFailure;
25 import akka.persistence.SaveSnapshotSuccess;
26 import akka.persistence.SnapshotOffer;
27 import akka.persistence.SnapshotSelectionCriteria;
28 import akka.serialization.Serialization;
29 import akka.util.Timeout;
30 import com.google.common.annotations.VisibleForTesting;
31 import com.google.common.base.Objects;
32 import com.google.common.base.Optional;
33 import com.google.common.base.Preconditions;
34 import com.google.common.base.Strings;
35 import com.google.common.base.Supplier;
36 import com.google.common.collect.Sets;
37 import java.io.ByteArrayInputStream;
38 import java.io.ObjectInputStream;
39 import java.io.Serializable;
40 import java.util.ArrayList;
41 import java.util.Collection;
42 import java.util.Collections;
43 import java.util.HashMap;
44 import java.util.HashSet;
45 import java.util.Iterator;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.Set;
49 import java.util.concurrent.CountDownLatch;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import javax.annotation.Nonnull;
53 import javax.annotation.Nullable;
54 import org.apache.commons.lang3.SerializationUtils;
55 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
56 import org.opendaylight.controller.cluster.datastore.config.Configuration;
57 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
58 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
59 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
60 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
61 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
62 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
63 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
64 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
65 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
66 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
67 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
68 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
69 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
70 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
71 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
72 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
73 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
74 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
75 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
76 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
77 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
78 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
79 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
80 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
81 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
82 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
83 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
84 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
85 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
86 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
87 import org.opendaylight.controller.cluster.raft.RaftState;
88 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
89 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
90 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
91 import org.opendaylight.controller.cluster.raft.messages.AddServer;
92 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
93 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
94 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
95 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
96 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
97 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
98 import org.slf4j.Logger;
99 import org.slf4j.LoggerFactory;
100 import scala.concurrent.Future;
101 import scala.concurrent.duration.Duration;
102 import scala.concurrent.duration.FiniteDuration;
103
104 /**
105  * The ShardManager has the following jobs,
106  * <ul>
107  * <li> Create all the local shard replicas that belong on this cluster member
108  * <li> Find the address of the local shard
109  * <li> Find the primary replica for any given shard
110  * <li> Monitor the cluster members and store their addresses
111  * <ul>
112  */
113 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
114
115     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
116
117     // Stores a mapping between a shard name and it's corresponding information
118     // Shard names look like inventory, topology etc and are as specified in
119     // configuration
120     private final Map<String, ShardInformation> localShards = new HashMap<>();
121
122     // The type of a ShardManager reflects the type of the datastore itself
123     // A data store could be of type config/operational
124     private final String type;
125
126     private final ClusterWrapper cluster;
127
128     private final Configuration configuration;
129
130     private final String shardDispatcherPath;
131
132     private final ShardManagerInfo mBean;
133
134     private DatastoreContextFactory datastoreContextFactory;
135
136     private final CountDownLatch waitTillReadyCountdownLatch;
137
138     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
139
140     private final ShardPeerAddressResolver peerAddressResolver;
141
142     private SchemaContext schemaContext;
143
144     private DatastoreSnapshot restoreFromSnapshot;
145
146     private ShardManagerSnapshot currentSnapshot;
147
148     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
149
150     private final String persistenceId;
151
152     /**
153      */
154     protected ShardManager(AbstractBuilder<?> builder) {
155
156         this.cluster = builder.cluster;
157         this.configuration = builder.configuration;
158         this.datastoreContextFactory = builder.datastoreContextFactory;
159         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
160         this.shardDispatcherPath =
161                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
162         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
163         this.primaryShardInfoCache = builder.primaryShardInfoCache;
164         this.restoreFromSnapshot = builder.restoreFromSnapshot;
165
166         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
167         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
168
169         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
170
171         // Subscribe this actor to cluster member events
172         cluster.subscribeToMemberEvents(getSelf());
173
174         List<String> localShardActorNames = new ArrayList<>();
175         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
176                 "shard-manager-" + this.type,
177                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
178                 localShardActorNames);
179         mBean.setShardManager(this);
180     }
181
182     @Override
183     public void postStop() {
184         LOG.info("Stopping ShardManager");
185
186         mBean.unregisterMBean();
187     }
188
189     @Override
190     public void handleCommand(Object message) throws Exception {
191         if (message  instanceof FindPrimary) {
192             findPrimary((FindPrimary)message);
193         } else if(message instanceof FindLocalShard){
194             findLocalShard((FindLocalShard) message);
195         } else if (message instanceof UpdateSchemaContext) {
196             updateSchemaContext(message);
197         } else if(message instanceof ActorInitialized) {
198             onActorInitialized(message);
199         } else if (message instanceof ClusterEvent.MemberUp){
200             memberUp((ClusterEvent.MemberUp) message);
201         } else if (message instanceof ClusterEvent.MemberExited){
202             memberExited((ClusterEvent.MemberExited) message);
203         } else if(message instanceof ClusterEvent.MemberRemoved) {
204             memberRemoved((ClusterEvent.MemberRemoved) message);
205         } else if(message instanceof ClusterEvent.UnreachableMember) {
206             memberUnreachable((ClusterEvent.UnreachableMember)message);
207         } else if(message instanceof ClusterEvent.ReachableMember) {
208             memberReachable((ClusterEvent.ReachableMember) message);
209         } else if(message instanceof DatastoreContextFactory) {
210             onDatastoreContextFactory((DatastoreContextFactory)message);
211         } else if(message instanceof RoleChangeNotification) {
212             onRoleChangeNotification((RoleChangeNotification) message);
213         } else if(message instanceof FollowerInitialSyncUpStatus){
214             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
215         } else if(message instanceof ShardNotInitializedTimeout) {
216             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
217         } else if(message instanceof ShardLeaderStateChanged) {
218             onLeaderStateChanged((ShardLeaderStateChanged) message);
219         } else if(message instanceof SwitchShardBehavior){
220             onSwitchShardBehavior((SwitchShardBehavior) message);
221         } else if(message instanceof CreateShard) {
222             onCreateShard((CreateShard)message);
223         } else if(message instanceof AddShardReplica){
224             onAddShardReplica((AddShardReplica)message);
225         } else if(message instanceof ForwardedAddServerReply) {
226             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
227             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
228                     msg.removeShardOnFailure);
229         } else if(message instanceof ForwardedAddServerFailure) {
230             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
231             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
232         } else if(message instanceof PrimaryShardFoundForContext) {
233             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
234             onPrimaryShardFoundContext(primaryShardFoundContext);
235         } else if(message instanceof RemoveShardReplica){
236             onRemoveShardReplica((RemoveShardReplica)message);
237         } else if(message instanceof GetSnapshot) {
238             onGetSnapshot();
239         } else if(message instanceof ServerRemoved){
240             onShardReplicaRemoved((ServerRemoved) message);
241         } else if (message instanceof SaveSnapshotSuccess) {
242             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
243         } else if (message instanceof SaveSnapshotFailure) {
244             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
245                     persistenceId(), ((SaveSnapshotFailure) message).cause());
246         } else {
247             unknownMessage(message);
248         }
249     }
250
251     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
252         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
253             addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender());
254         }
255     }
256
257     private void onShardReplicaRemoved(ServerRemoved message) {
258         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
259         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
260         if(shardInformation == null) {
261             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
262             return;
263         } else if(shardInformation.getActor() != null) {
264             LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
265             shardInformation.getActor().tell(PoisonPill.getInstance(), self());
266         }
267         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
268         persistShardList();
269     }
270
271     private void onGetSnapshot() {
272         LOG.debug("{}: onGetSnapshot", persistenceId());
273
274         List<String> notInitialized = null;
275         for(ShardInformation shardInfo: localShards.values()) {
276             if(!shardInfo.isShardInitialized()) {
277                 if(notInitialized == null) {
278                     notInitialized = new ArrayList<>();
279                 }
280
281                 notInitialized.add(shardInfo.getShardName());
282             }
283         }
284
285         if(notInitialized != null) {
286             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
287                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
288             return;
289         }
290
291         byte[] shardManagerSnapshot = null;
292         if(currentSnapshot != null) {
293             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
294         }
295
296         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
297                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
298                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
299
300         for(ShardInformation shardInfo: localShards.values()) {
301             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
302         }
303     }
304
305     private void onCreateShard(CreateShard createShard) {
306         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
307
308         Object reply;
309         try {
310             String shardName = createShard.getModuleShardConfig().getShardName();
311             if(localShards.containsKey(shardName)) {
312                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
313                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
314             } else {
315                 doCreateShard(createShard);
316                 reply = new akka.actor.Status.Success(null);
317             }
318         } catch (Exception e) {
319             LOG.error("{}: onCreateShard failed", persistenceId(), e);
320             reply = new akka.actor.Status.Failure(e);
321         }
322
323         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
324             getSender().tell(reply, getSelf());
325         }
326     }
327
328     private void doCreateShard(CreateShard createShard) {
329         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
330         String shardName = moduleShardConfig.getShardName();
331
332         configuration.addModuleShardConfiguration(moduleShardConfig);
333
334         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
335         if(shardDatastoreContext == null) {
336             shardDatastoreContext = newShardDatastoreContext(shardName);
337         } else {
338             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
339                     peerAddressResolver).build();
340         }
341
342         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
343
344         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
345                 currentSnapshot.getShardList().contains(shardName);
346
347         Map<String, String> peerAddresses;
348         boolean isActiveMember;
349         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
350                 contains(cluster.getCurrentMemberName())) {
351             peerAddresses = getPeerAddresses(shardName);
352             isActiveMember = true;
353         } else {
354             // The local member is not in the static shard member configuration and the shard did not
355             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
356             // the shard with no peers and with elections disabled so it stays as follower. A
357             // subsequent AddServer request will be needed to make it an active member.
358             isActiveMember = false;
359             peerAddresses = Collections.emptyMap();
360             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
361                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
362         }
363
364         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
365                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
366                 isActiveMember);
367
368         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
369                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
370         info.setActiveMember(isActiveMember);
371         localShards.put(info.getShardName(), info);
372
373         mBean.addLocalShard(shardId.toString());
374
375         if(schemaContext != null) {
376             info.setActor(newShardActor(schemaContext, info));
377         }
378     }
379
380     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
381         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
382                 shardPeerAddressResolver(peerAddressResolver);
383     }
384
385     private DatastoreContext newShardDatastoreContext(String shardName) {
386         return newShardDatastoreContextBuilder(shardName).build();
387     }
388
389     private void checkReady(){
390         if (isReadyWithLeaderId()) {
391             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
392                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
393
394             waitTillReadyCountdownLatch.countDown();
395         }
396     }
397
398     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
399         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
400
401         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
402         if(shardInformation != null) {
403             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
404             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
405             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
406                 primaryShardInfoCache.remove(shardInformation.getShardName());
407             }
408
409             checkReady();
410         } else {
411             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
412         }
413     }
414
415     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
416         ShardInformation shardInfo = message.getShardInfo();
417
418         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
419                 shardInfo.getShardName());
420
421         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
422
423         if(!shardInfo.isShardInitialized()) {
424             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
425             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
426         } else {
427             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
428             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
429         }
430     }
431
432     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
433         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
434                 status.getName(), status.isInitialSyncDone());
435
436         ShardInformation shardInformation = findShardInformation(status.getName());
437
438         if(shardInformation != null) {
439             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
440
441             mBean.setSyncStatus(isInSync());
442         }
443
444     }
445
446     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
447         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
448                 roleChanged.getOldRole(), roleChanged.getNewRole());
449
450         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
451         if(shardInformation != null) {
452             shardInformation.setRole(roleChanged.getNewRole());
453             checkReady();
454             mBean.setSyncStatus(isInSync());
455         }
456     }
457
458
459     private ShardInformation findShardInformation(String memberId) {
460         for(ShardInformation info : localShards.values()){
461             if(info.getShardId().toString().equals(memberId)){
462                 return info;
463             }
464         }
465
466         return null;
467     }
468
469     private boolean isReadyWithLeaderId() {
470         boolean isReady = true;
471         for (ShardInformation info : localShards.values()) {
472             if(!info.isShardReadyWithLeaderId()){
473                 isReady = false;
474                 break;
475             }
476         }
477         return isReady;
478     }
479
480     private boolean isInSync(){
481         for (ShardInformation info : localShards.values()) {
482             if(!info.isInSync()){
483                 return false;
484             }
485         }
486         return true;
487     }
488
489     private void onActorInitialized(Object message) {
490         final ActorRef sender = getSender();
491
492         if (sender == null) {
493             return; //why is a non-actor sending this message? Just ignore.
494         }
495
496         String actorName = sender.path().name();
497         //find shard name from actor name; actor name is stringified shardId
498         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
499
500         if (shardId.getShardName() == null) {
501             return;
502         }
503
504         markShardAsInitialized(shardId.getShardName());
505     }
506
507     private void markShardAsInitialized(String shardName) {
508         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
509
510         ShardInformation shardInformation = localShards.get(shardName);
511         if (shardInformation != null) {
512             shardInformation.setActorInitialized();
513
514             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
515         }
516     }
517
518     @Override
519     protected void handleRecover(Object message) throws Exception {
520         if (message instanceof RecoveryCompleted) {
521             onRecoveryCompleted();
522         } else if (message instanceof SnapshotOffer) {
523             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
524         }
525     }
526
527     private void onRecoveryCompleted() {
528         LOG.info("Recovery complete : {}", persistenceId());
529
530         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
531         // journal on upgrade from Helium.
532         deleteMessages(lastSequenceNr());
533
534         if(currentSnapshot == null && restoreFromSnapshot != null &&
535                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
536             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
537                     restoreFromSnapshot.getShardManagerSnapshot()))) {
538                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
539
540                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
541
542                 applyShardManagerSnapshot(snapshot);
543             } catch(Exception e) {
544                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
545             }
546         }
547
548         createLocalShards();
549     }
550
551     private void findLocalShard(FindLocalShard message) {
552         final ShardInformation shardInformation = localShards.get(message.getShardName());
553
554         if(shardInformation == null){
555             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
556             return;
557         }
558
559         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
560             @Override
561             public Object get() {
562                 return new LocalShardFound(shardInformation.getActor());
563             }
564         });
565     }
566
567     private void sendResponse(ShardInformation shardInformation, boolean doWait,
568             boolean wantShardReady, final Supplier<Object> messageSupplier) {
569         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
570             if(doWait) {
571                 final ActorRef sender = getSender();
572                 final ActorRef self = self();
573
574                 Runnable replyRunnable = new Runnable() {
575                     @Override
576                     public void run() {
577                         sender.tell(messageSupplier.get(), self);
578                     }
579                 };
580
581                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
582                     new OnShardInitialized(replyRunnable);
583
584                 shardInformation.addOnShardInitialized(onShardInitialized);
585
586                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
587                 if(shardInformation.isShardInitialized()) {
588                     // If the shard is already initialized then we'll wait enough time for the shard to
589                     // elect a leader, ie 2 times the election timeout.
590                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
591                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
592                 }
593
594                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
595                         shardInformation.getShardName());
596
597                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
598                         timeout, getSelf(),
599                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
600                         getContext().dispatcher(), getSelf());
601
602                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
603
604             } else if (!shardInformation.isShardInitialized()) {
605                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
606                         shardInformation.getShardName());
607                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
608             } else {
609                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
610                         shardInformation.getShardName());
611                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
612             }
613
614             return;
615         }
616
617         getSender().tell(messageSupplier.get(), getSelf());
618     }
619
620     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
621         return new NoShardLeaderException(null, shardId.toString());
622     }
623
624     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
625         return new NotInitializedException(String.format(
626                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
627     }
628
629     private void memberRemoved(ClusterEvent.MemberRemoved message) {
630         String memberName = message.member().roles().head();
631
632         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
633                 message.member().address());
634
635         peerAddressResolver.removePeerAddress(memberName);
636
637         for(ShardInformation info : localShards.values()){
638             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
639         }
640     }
641
642     private void memberExited(ClusterEvent.MemberExited message) {
643         String memberName = message.member().roles().head();
644
645         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
646                 message.member().address());
647
648         peerAddressResolver.removePeerAddress(memberName);
649
650         for(ShardInformation info : localShards.values()){
651             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
652         }
653     }
654
655     private void memberUp(ClusterEvent.MemberUp message) {
656         String memberName = message.member().roles().head();
657
658         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
659                 message.member().address());
660
661         addPeerAddress(memberName, message.member().address());
662
663         checkReady();
664     }
665
666     private void addPeerAddress(String memberName, Address address) {
667         peerAddressResolver.addPeerAddress(memberName, address);
668
669         for(ShardInformation info : localShards.values()){
670             String shardName = info.getShardName();
671             String peerId = getShardIdentifier(memberName, shardName).toString();
672             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
673
674             info.peerUp(memberName, peerId, getSelf());
675         }
676     }
677
678     private void memberReachable(ClusterEvent.ReachableMember message) {
679         String memberName = message.member().roles().head();
680         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
681
682         addPeerAddress(memberName, message.member().address());
683
684         markMemberAvailable(memberName);
685     }
686
687     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
688         String memberName = message.member().roles().head();
689         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
690
691         markMemberUnavailable(memberName);
692     }
693
694     private void markMemberUnavailable(final String memberName) {
695         for(ShardInformation info : localShards.values()){
696             String leaderId = info.getLeaderId();
697             if(leaderId != null && leaderId.contains(memberName)) {
698                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
699                 info.setLeaderAvailable(false);
700
701                 primaryShardInfoCache.remove(info.getShardName());
702             }
703
704             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
705         }
706     }
707
708     private void markMemberAvailable(final String memberName) {
709         for(ShardInformation info : localShards.values()){
710             String leaderId = info.getLeaderId();
711             if(leaderId != null && leaderId.contains(memberName)) {
712                 LOG.debug("Marking Leader {} as available.", leaderId);
713                 info.setLeaderAvailable(true);
714             }
715
716             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
717         }
718     }
719
720     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
721         datastoreContextFactory = factory;
722         for (ShardInformation info : localShards.values()) {
723             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
724         }
725     }
726
727     private void onSwitchShardBehavior(SwitchShardBehavior message) {
728         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
729
730         ShardInformation shardInformation = localShards.get(identifier.getShardName());
731
732         if(shardInformation != null && shardInformation.getActor() != null) {
733             shardInformation.getActor().tell(
734                     new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
735         } else {
736             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
737                     message.getShardName(), message.getNewState());
738         }
739     }
740
741     /**
742      * Notifies all the local shards of a change in the schema context
743      *
744      * @param message
745      */
746     private void updateSchemaContext(final Object message) {
747         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
748
749         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
750
751         for (ShardInformation info : localShards.values()) {
752             if (info.getActor() == null) {
753                 LOG.debug("Creating Shard {}", info.getShardId());
754                 info.setActor(newShardActor(schemaContext, info));
755             } else {
756                 info.getActor().tell(message, getSelf());
757             }
758         }
759     }
760
761     @VisibleForTesting
762     protected ClusterWrapper getCluster() {
763         return cluster;
764     }
765
766     @VisibleForTesting
767     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
768         return getContext().actorOf(info.newProps(schemaContext)
769                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
770     }
771
772     private void findPrimary(FindPrimary message) {
773         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
774
775         final String shardName = message.getShardName();
776         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
777
778         // First see if the there is a local replica for the shard
779         final ShardInformation info = localShards.get(shardName);
780         if (info != null && info.isActiveMember()) {
781             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
782                 @Override
783                 public Object get() {
784                     String primaryPath = info.getSerializedLeaderActor();
785                     Object found = canReturnLocalShardState && info.isLeader() ?
786                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
787                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
788
789                             if(LOG.isDebugEnabled()) {
790                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
791                             }
792
793                             return found;
794                 }
795             });
796
797             return;
798         }
799
800         Collection<String> visitedAddresses;
801         if(message instanceof RemoteFindPrimary) {
802             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
803         } else {
804             visitedAddresses = new ArrayList<>();
805         }
806
807         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
808
809         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
810             if(visitedAddresses.contains(address)) {
811                 continue;
812             }
813
814             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
815                     shardName, address);
816
817             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
818                     message.isWaitUntilReady(), visitedAddresses), getContext());
819             return;
820         }
821
822         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
823
824         getSender().tell(new PrimaryNotFoundException(
825                 String.format("No primary shard found for %s.", shardName)), getSelf());
826     }
827
828     /**
829      * Construct the name of the shard actor given the name of the member on
830      * which the shard resides and the name of the shard
831      *
832      * @param memberName
833      * @param shardName
834      * @return
835      */
836     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
837         return peerAddressResolver.getShardIdentifier(memberName, shardName);
838     }
839
840     /**
841      * Create shards that are local to the member on which the ShardManager
842      * runs
843      *
844      */
845     private void createLocalShards() {
846         String memberName = this.cluster.getCurrentMemberName();
847         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
848
849         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
850         if(restoreFromSnapshot != null)
851         {
852             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
853                 shardSnapshots.put(snapshot.getName(), snapshot);
854             }
855         }
856
857         restoreFromSnapshot = null; // null out to GC
858
859         for(String shardName : memberShardNames){
860             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
861
862             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
863
864             Map<String, String> peerAddresses = getPeerAddresses(shardName);
865             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
866                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
867                         shardSnapshots.get(shardName)), peerAddressResolver));
868             mBean.addLocalShard(shardId.toString());
869         }
870     }
871
872     /**
873      * Given the name of the shard find the addresses of all it's peers
874      *
875      * @param shardName
876      */
877     private Map<String, String> getPeerAddresses(String shardName) {
878         Collection<String> members = configuration.getMembersFromShardName(shardName);
879         Map<String, String> peerAddresses = new HashMap<>();
880
881         String currentMemberName = this.cluster.getCurrentMemberName();
882
883         for(String memberName : members) {
884             if(!currentMemberName.equals(memberName)) {
885                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
886                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
887                 peerAddresses.put(shardId.toString(), address);
888             }
889         }
890         return peerAddresses;
891     }
892
893     @Override
894     public SupervisorStrategy supervisorStrategy() {
895
896         return new OneForOneStrategy(10, Duration.create("1 minute"),
897                 new Function<Throwable, SupervisorStrategy.Directive>() {
898             @Override
899             public SupervisorStrategy.Directive apply(Throwable t) {
900                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
901                 return SupervisorStrategy.resume();
902             }
903         }
904                 );
905
906     }
907
908     @Override
909     public String persistenceId() {
910         return persistenceId;
911     }
912
913     @VisibleForTesting
914     ShardManagerInfoMBean getMBean(){
915         return mBean;
916     }
917
918     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
919         if (shardReplicaOperationsInProgress.contains(shardName)) {
920             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
921             LOG.debug ("{}: {}", persistenceId(), msg);
922             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
923             return true;
924         }
925
926         return false;
927     }
928
929     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
930         final String shardName = shardReplicaMsg.getShardName();
931
932         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
933
934         // verify the shard with the specified name is present in the cluster configuration
935         if (!(this.configuration.isShardConfigured(shardName))) {
936             String msg = String.format("No module configuration exists for shard %s", shardName);
937             LOG.debug ("{}: {}", persistenceId(), msg);
938             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
939             return;
940         }
941
942         // Create the localShard
943         if (schemaContext == null) {
944             String msg = String.format(
945                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
946             LOG.debug ("{}: {}", persistenceId(), msg);
947             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
948             return;
949         }
950
951         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
952             @Override
953             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
954                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
955
956             }
957
958             @Override
959             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
960                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
961             }
962
963         });
964     }
965
966     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
967         String msg = String.format("Local shard %s already exists", shardName);
968         LOG.debug ("{}: {}", persistenceId(), msg);
969         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
970     }
971
972     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
973         if(isShardReplicaOperationInProgress(shardName, sender)) {
974             return;
975         }
976
977         shardReplicaOperationsInProgress.add(shardName);
978
979         final ShardInformation shardInfo;
980         final boolean removeShardOnFailure;
981         ShardInformation existingShardInfo = localShards.get(shardName);
982         if(existingShardInfo == null) {
983             removeShardOnFailure = true;
984             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
985
986             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
987                     DisableElectionsRaftPolicy.class.getName()).build();
988
989             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
990                     Shard.builder(), peerAddressResolver);
991             shardInfo.setActiveMember(false);
992             localShards.put(shardName, shardInfo);
993             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
994         } else {
995             removeShardOnFailure = false;
996             shardInfo = existingShardInfo;
997         }
998
999         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1000
1001         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1002         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1003                 response.getPrimaryPath(), shardInfo.getShardId());
1004
1005         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1006                 duration());
1007         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1008             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1009
1010         futureObj.onComplete(new OnComplete<Object>() {
1011             @Override
1012             public void onComplete(Throwable failure, Object addServerResponse) {
1013                 if (failure != null) {
1014                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1015                             response.getPrimaryPath(), shardName, failure);
1016
1017                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1018                             response.getPrimaryPath(), shardName);
1019                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1020                 } else {
1021                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1022                             response.getPrimaryPath(), removeShardOnFailure), sender);
1023                 }
1024             }
1025         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1026     }
1027
1028     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1029             boolean removeShardOnFailure) {
1030         shardReplicaOperationsInProgress.remove(shardName);
1031
1032         if(removeShardOnFailure) {
1033             ShardInformation shardInfo = localShards.remove(shardName);
1034             if (shardInfo.getActor() != null) {
1035                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1036             }
1037         }
1038
1039         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
1040             new RuntimeException(message, failure)), getSelf());
1041     }
1042
1043     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1044             String leaderPath, boolean removeShardOnFailure) {
1045         String shardName = shardInfo.getShardName();
1046         shardReplicaOperationsInProgress.remove(shardName);
1047
1048         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1049
1050         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1051             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1052
1053             // Make the local shard voting capable
1054             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1055             shardInfo.setActiveMember(true);
1056             persistShardList();
1057
1058             mBean.addLocalShard(shardInfo.getShardId().toString());
1059             sender.tell(new akka.actor.Status.Success(null), getSelf());
1060         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1061             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1062         } else {
1063             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1064                     persistenceId(), shardName, replyMsg.getStatus());
1065
1066             Exception failure;
1067             switch (replyMsg.getStatus()) {
1068                 case TIMEOUT:
1069                     failure = new TimeoutException(String.format(
1070                             "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1071                             "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1072                             leaderPath, shardName));
1073                     break;
1074                 case NO_LEADER:
1075                     failure = createNoShardLeaderException(shardInfo.getShardId());
1076                     break;
1077                 default :
1078                     failure = new RuntimeException(String.format(
1079                             "AddServer request to leader %s for shard %s failed with status %s",
1080                             leaderPath, shardName, replyMsg.getStatus()));
1081             }
1082
1083             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1084         }
1085     }
1086
1087     private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
1088         String shardName = shardReplicaMsg.getShardName();
1089
1090         // verify the local shard replica is available in the controller node
1091         if (!localShards.containsKey(shardName)) {
1092             String msg = String.format("Local shard %s does not", shardName);
1093             LOG.debug ("{}: {}", persistenceId(), msg);
1094             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1095             return;
1096         }
1097         // call RemoveShard for the shardName
1098         getSender().tell(new akka.actor.Status.Success(true), getSelf());
1099         return;
1100     }
1101
1102     private void persistShardList() {
1103         List<String> shardList = new ArrayList<>(localShards.keySet());
1104         for (ShardInformation shardInfo : localShards.values()) {
1105             if (!shardInfo.isActiveMember()) {
1106                 shardList.remove(shardInfo.getShardName());
1107             }
1108         }
1109         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1110         saveSnapshot(updateShardManagerSnapshot(shardList));
1111     }
1112
1113     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1114         currentSnapshot = new ShardManagerSnapshot(shardList);
1115         return currentSnapshot;
1116     }
1117
1118     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1119         currentSnapshot = snapshot;
1120
1121         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1122
1123         String currentMember = cluster.getCurrentMemberName();
1124         Set<String> configuredShardList =
1125             new HashSet<>(configuration.getMemberShardNames(currentMember));
1126         for (String shard : currentSnapshot.getShardList()) {
1127             if (!configuredShardList.contains(shard)) {
1128                 // add the current member as a replica for the shard
1129                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1130                 configuration.addMemberReplicaForShard(shard, currentMember);
1131             } else {
1132                 configuredShardList.remove(shard);
1133             }
1134         }
1135         for (String shard : configuredShardList) {
1136             // remove the member as a replica for the shard
1137             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1138             configuration.removeMemberReplicaForShard(shard, currentMember);
1139         }
1140     }
1141
1142     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1143         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1144             persistenceId());
1145         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
1146     }
1147
1148     private static class ForwardedAddServerReply {
1149         ShardInformation shardInfo;
1150         AddServerReply addServerReply;
1151         String leaderPath;
1152         boolean removeShardOnFailure;
1153
1154         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1155                 boolean removeShardOnFailure) {
1156             this.shardInfo = shardInfo;
1157             this.addServerReply = addServerReply;
1158             this.leaderPath = leaderPath;
1159             this.removeShardOnFailure = removeShardOnFailure;
1160         }
1161     }
1162
1163     private static class ForwardedAddServerFailure {
1164         String shardName;
1165         String failureMessage;
1166         Throwable failure;
1167         boolean removeShardOnFailure;
1168
1169         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1170                 boolean removeShardOnFailure) {
1171             this.shardName = shardName;
1172             this.failureMessage = failureMessage;
1173             this.failure = failure;
1174             this.removeShardOnFailure = removeShardOnFailure;
1175         }
1176     }
1177
1178     @VisibleForTesting
1179     protected static class ShardInformation {
1180         private final ShardIdentifier shardId;
1181         private final String shardName;
1182         private ActorRef actor;
1183         private ActorPath actorPath;
1184         private final Map<String, String> initialPeerAddresses;
1185         private Optional<DataTree> localShardDataTree;
1186         private boolean leaderAvailable = false;
1187
1188         // flag that determines if the actor is ready for business
1189         private boolean actorInitialized = false;
1190
1191         private boolean followerSyncStatus = false;
1192
1193         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1194         private String role ;
1195         private String leaderId;
1196         private short leaderVersion;
1197
1198         private DatastoreContext datastoreContext;
1199         private Shard.AbstractBuilder<?, ?> builder;
1200         private final ShardPeerAddressResolver addressResolver;
1201         private boolean isActiveMember = true;
1202
1203         private ShardInformation(String shardName, ShardIdentifier shardId,
1204                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1205                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1206             this.shardName = shardName;
1207             this.shardId = shardId;
1208             this.initialPeerAddresses = initialPeerAddresses;
1209             this.datastoreContext = datastoreContext;
1210             this.builder = builder;
1211             this.addressResolver = addressResolver;
1212         }
1213
1214         Props newProps(SchemaContext schemaContext) {
1215             Preconditions.checkNotNull(builder);
1216             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1217                     schemaContext(schemaContext).props();
1218             builder = null;
1219             return props;
1220         }
1221
1222         String getShardName() {
1223             return shardName;
1224         }
1225
1226         @Nullable
1227         ActorRef getActor(){
1228             return actor;
1229         }
1230
1231         ActorPath getActorPath() {
1232             return actorPath;
1233         }
1234
1235         void setActor(ActorRef actor) {
1236             this.actor = actor;
1237             this.actorPath = actor.path();
1238         }
1239
1240         ShardIdentifier getShardId() {
1241             return shardId;
1242         }
1243
1244         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1245             this.localShardDataTree = localShardDataTree;
1246         }
1247
1248         Optional<DataTree> getLocalShardDataTree() {
1249             return localShardDataTree;
1250         }
1251
1252         DatastoreContext getDatastoreContext() {
1253             return datastoreContext;
1254         }
1255
1256         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1257             this.datastoreContext = datastoreContext;
1258             if (actor != null) {
1259                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1260                 actor.tell(this.datastoreContext, sender);
1261             }
1262         }
1263
1264         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1265             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1266
1267             if(actor != null) {
1268                 if(LOG.isDebugEnabled()) {
1269                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1270                             peerId, peerAddress, actor.path());
1271                 }
1272
1273                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1274             }
1275
1276             notifyOnShardInitializedCallbacks();
1277         }
1278
1279         void peerDown(String memberName, String peerId, ActorRef sender) {
1280             if(actor != null) {
1281                 actor.tell(new PeerDown(memberName, peerId), sender);
1282             }
1283         }
1284
1285         void peerUp(String memberName, String peerId, ActorRef sender) {
1286             if(actor != null) {
1287                 actor.tell(new PeerUp(memberName, peerId), sender);
1288             }
1289         }
1290
1291         boolean isShardReady() {
1292             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1293         }
1294
1295         boolean isShardReadyWithLeaderId() {
1296             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1297                     (isLeader() || addressResolver.resolve(leaderId) != null);
1298         }
1299
1300         boolean isShardInitialized() {
1301             return getActor() != null && actorInitialized;
1302         }
1303
1304         boolean isLeader() {
1305             return Objects.equal(leaderId, shardId.toString());
1306         }
1307
1308         String getSerializedLeaderActor() {
1309             if(isLeader()) {
1310                 return Serialization.serializedActorPath(getActor());
1311             } else {
1312                 return addressResolver.resolve(leaderId);
1313             }
1314         }
1315
1316         void setActorInitialized() {
1317             LOG.debug("Shard {} is initialized", shardId);
1318
1319             this.actorInitialized = true;
1320
1321             notifyOnShardInitializedCallbacks();
1322         }
1323
1324         private void notifyOnShardInitializedCallbacks() {
1325             if(onShardInitializedSet.isEmpty()) {
1326                 return;
1327             }
1328
1329             boolean ready = isShardReadyWithLeaderId();
1330
1331             if(LOG.isDebugEnabled()) {
1332                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1333                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1334             }
1335
1336             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1337             while(iter.hasNext()) {
1338                 OnShardInitialized onShardInitialized = iter.next();
1339                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1340                     iter.remove();
1341                     onShardInitialized.getTimeoutSchedule().cancel();
1342                     onShardInitialized.getReplyRunnable().run();
1343                 }
1344             }
1345         }
1346
1347         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1348             onShardInitializedSet.add(onShardInitialized);
1349         }
1350
1351         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1352             onShardInitializedSet.remove(onShardInitialized);
1353         }
1354
1355         void setRole(String newRole) {
1356             this.role = newRole;
1357
1358             notifyOnShardInitializedCallbacks();
1359         }
1360
1361         void setFollowerSyncStatus(boolean syncStatus){
1362             this.followerSyncStatus = syncStatus;
1363         }
1364
1365         boolean isInSync(){
1366             if(RaftState.Follower.name().equals(this.role)){
1367                 return followerSyncStatus;
1368             } else if(RaftState.Leader.name().equals(this.role)){
1369                 return true;
1370             }
1371
1372             return false;
1373         }
1374
1375         boolean setLeaderId(String leaderId) {
1376             boolean changed = !Objects.equal(this.leaderId, leaderId);
1377             this.leaderId = leaderId;
1378             if(leaderId != null) {
1379                 this.leaderAvailable = true;
1380             }
1381             notifyOnShardInitializedCallbacks();
1382
1383             return changed;
1384         }
1385
1386         String getLeaderId() {
1387             return leaderId;
1388         }
1389
1390         void setLeaderAvailable(boolean leaderAvailable) {
1391             this.leaderAvailable = leaderAvailable;
1392         }
1393
1394         short getLeaderVersion() {
1395             return leaderVersion;
1396         }
1397
1398         void setLeaderVersion(short leaderVersion) {
1399             this.leaderVersion = leaderVersion;
1400         }
1401
1402         boolean isActiveMember() {
1403             return isActiveMember;
1404         }
1405
1406         void setActiveMember(boolean isActiveMember) {
1407             this.isActiveMember = isActiveMember;
1408         }
1409     }
1410
1411     private static class OnShardInitialized {
1412         private final Runnable replyRunnable;
1413         private Cancellable timeoutSchedule;
1414
1415         OnShardInitialized(Runnable replyRunnable) {
1416             this.replyRunnable = replyRunnable;
1417         }
1418
1419         Runnable getReplyRunnable() {
1420             return replyRunnable;
1421         }
1422
1423         Cancellable getTimeoutSchedule() {
1424             return timeoutSchedule;
1425         }
1426
1427         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1428             this.timeoutSchedule = timeoutSchedule;
1429         }
1430     }
1431
1432     private static class OnShardReady extends OnShardInitialized {
1433         OnShardReady(Runnable replyRunnable) {
1434             super(replyRunnable);
1435         }
1436     }
1437
1438     private static class ShardNotInitializedTimeout {
1439         private final ActorRef sender;
1440         private final ShardInformation shardInfo;
1441         private final OnShardInitialized onShardInitialized;
1442
1443         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1444             this.sender = sender;
1445             this.shardInfo = shardInfo;
1446             this.onShardInitialized = onShardInitialized;
1447         }
1448
1449         ActorRef getSender() {
1450             return sender;
1451         }
1452
1453         ShardInformation getShardInfo() {
1454             return shardInfo;
1455         }
1456
1457         OnShardInitialized getOnShardInitialized() {
1458             return onShardInitialized;
1459         }
1460     }
1461
1462     /**
1463      * We no longer persist SchemaContextModules but keep this class around for now for backwards
1464      * compatibility so we don't get de-serialization failures on upgrade from Helium.
1465      */
1466     @Deprecated
1467     static class SchemaContextModules implements Serializable {
1468         private static final long serialVersionUID = -8884620101025936590L;
1469
1470         private final Set<String> modules;
1471
1472         SchemaContextModules(Set<String> modules){
1473             this.modules = modules;
1474         }
1475
1476         public Set<String> getModules() {
1477             return modules;
1478         }
1479     }
1480
1481     public static Builder builder() {
1482         return new Builder();
1483     }
1484
1485     public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
1486         private ClusterWrapper cluster;
1487         private Configuration configuration;
1488         private DatastoreContextFactory datastoreContextFactory;
1489         private CountDownLatch waitTillReadyCountdownLatch;
1490         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1491         private DatastoreSnapshot restoreFromSnapshot;
1492         private volatile boolean sealed;
1493
1494         @SuppressWarnings("unchecked")
1495         private T self() {
1496             return (T) this;
1497         }
1498
1499         protected void checkSealed() {
1500             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1501         }
1502
1503         public T cluster(ClusterWrapper cluster) {
1504             checkSealed();
1505             this.cluster = cluster;
1506             return self();
1507         }
1508
1509         public T configuration(Configuration configuration) {
1510             checkSealed();
1511             this.configuration = configuration;
1512             return self();
1513         }
1514
1515         public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1516             checkSealed();
1517             this.datastoreContextFactory = datastoreContextFactory;
1518             return self();
1519         }
1520
1521         public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1522             checkSealed();
1523             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1524             return self();
1525         }
1526
1527         public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1528             checkSealed();
1529             this.primaryShardInfoCache = primaryShardInfoCache;
1530             return self();
1531         }
1532
1533         public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1534             checkSealed();
1535             this.restoreFromSnapshot = restoreFromSnapshot;
1536             return self();
1537         }
1538
1539         protected void verify() {
1540             sealed = true;
1541             Preconditions.checkNotNull(cluster, "cluster should not be null");
1542             Preconditions.checkNotNull(configuration, "configuration should not be null");
1543             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1544             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1545             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1546         }
1547
1548         public Props props() {
1549             verify();
1550             return Props.create(ShardManager.class, this);
1551         }
1552     }
1553
1554     public static class Builder extends AbstractBuilder<Builder> {
1555     }
1556
1557     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1558         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1559                 getShardInitializationTimeout().duration().$times(2));
1560
1561
1562         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1563         futureObj.onComplete(new OnComplete<Object>() {
1564             @Override
1565             public void onComplete(Throwable failure, Object response) {
1566                 if (failure != null) {
1567                     handler.onFailure(failure);
1568                 } else {
1569                     if(response instanceof RemotePrimaryShardFound) {
1570                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1571                     } else if(response instanceof LocalPrimaryShardFound) {
1572                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1573                     } else {
1574                         handler.onUnknownResponse(response);
1575                     }
1576                 }
1577             }
1578         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1579     }
1580
1581     /**
1582      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1583      * a remote or local find primary message is processed
1584      */
1585     private static interface FindPrimaryResponseHandler {
1586         /**
1587          * Invoked when a Failure message is received as a response
1588          *
1589          * @param failure
1590          */
1591         void onFailure(Throwable failure);
1592
1593         /**
1594          * Invoked when a RemotePrimaryShardFound response is received
1595          *
1596          * @param response
1597          */
1598         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1599
1600         /**
1601          * Invoked when a LocalPrimaryShardFound response is received
1602          * @param response
1603          */
1604         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1605
1606         /**
1607          * Invoked when an unknown response is received. This is another type of failure.
1608          *
1609          * @param response
1610          */
1611         void onUnknownResponse(Object response);
1612     }
1613
1614     /**
1615      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1616      * replica and sends a wrapped Failure response to some targetActor
1617      */
1618     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1619         private final ActorRef targetActor;
1620         private final String shardName;
1621         private final String persistenceId;
1622         private final ActorRef shardManagerActor;
1623
1624         /**
1625          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1626          * @param shardName The name of the shard for which the primary replica had to be found
1627          * @param persistenceId The persistenceId for the ShardManager
1628          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1629          */
1630         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1631             this.targetActor = Preconditions.checkNotNull(targetActor);
1632             this.shardName = Preconditions.checkNotNull(shardName);
1633             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1634             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1635         }
1636
1637         public ActorRef getTargetActor() {
1638             return targetActor;
1639         }
1640
1641         public String getShardName() {
1642             return shardName;
1643         }
1644
1645         public ActorRef getShardManagerActor() {
1646             return shardManagerActor;
1647         }
1648
1649         @Override
1650         public void onFailure(Throwable failure) {
1651             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1652             targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1653                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1654         }
1655
1656         @Override
1657         public void onUnknownResponse(Object response) {
1658             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1659                     shardName, response);
1660             LOG.debug ("{}: {}", persistenceId, msg);
1661             targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1662                     new RuntimeException(msg)), shardManagerActor);
1663         }
1664     }
1665
1666
1667     /**
1668      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1669      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1670      * as a successful response to find primary.
1671      */
1672     private static class PrimaryShardFoundForContext {
1673         private final String shardName;
1674         private final Object contextMessage;
1675         private final RemotePrimaryShardFound remotePrimaryShardFound;
1676         private final LocalPrimaryShardFound localPrimaryShardFound;
1677
1678         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) {
1679             this.shardName = Preconditions.checkNotNull(shardName);
1680             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1681             Preconditions.checkNotNull(primaryFoundMessage);
1682             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null;
1683             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null;
1684         }
1685
1686         @Nonnull
1687         public String getPrimaryPath(){
1688             if(remotePrimaryShardFound != null){
1689                 return remotePrimaryShardFound.getPrimaryPath();
1690             }
1691             return localPrimaryShardFound.getPrimaryPath();
1692         }
1693
1694         @Nonnull
1695         public Object getContextMessage() {
1696             return contextMessage;
1697         }
1698
1699         @Nullable
1700         public RemotePrimaryShardFound getRemotePrimaryShardFound(){
1701             return remotePrimaryShardFound;
1702         }
1703
1704         @Nullable
1705         public LocalPrimaryShardFound getLocalPrimaryShardFound(){
1706             return localPrimaryShardFound;
1707         }
1708
1709         boolean isPrimaryLocal(){
1710             return (remotePrimaryShardFound == null);
1711         }
1712
1713         @Nonnull
1714         public String getShardName() {
1715             return shardName;
1716         }
1717     }
1718 }
1719
1720
1721