Bug 2187: EOS shard recovery after AddShardReplica
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.OnComplete;
22 import akka.japi.Function;
23 import akka.persistence.RecoveryCompleted;
24 import akka.persistence.SaveSnapshotFailure;
25 import akka.persistence.SaveSnapshotSuccess;
26 import akka.persistence.SnapshotOffer;
27 import akka.serialization.Serialization;
28 import akka.util.Timeout;
29 import com.google.common.annotations.VisibleForTesting;
30 import com.google.common.base.Objects;
31 import com.google.common.base.Optional;
32 import com.google.common.base.Preconditions;
33 import com.google.common.base.Strings;
34 import com.google.common.base.Supplier;
35 import com.google.common.collect.Sets;
36 import java.io.Serializable;
37 import java.util.ArrayList;
38 import java.util.Collection;
39 import java.util.Collections;
40 import java.util.HashMap;
41 import java.util.HashSet;
42 import java.util.Iterator;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Set;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import javax.annotation.Nonnull;
50 import javax.annotation.Nullable;
51 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
52 import org.opendaylight.controller.cluster.datastore.config.Configuration;
53 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
54 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
55 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
56 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
57 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
58 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
59 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
60 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
61 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
62 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
63 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
64 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
65 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
66 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
67 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
68 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
69 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
70 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
71 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
72 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
73 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
74 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
76 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
77 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
78 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
79 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
80 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
81 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
82 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
83 import org.opendaylight.controller.cluster.raft.RaftState;
84 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
85 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
86 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
87 import org.opendaylight.controller.cluster.raft.messages.AddServer;
88 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
89 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
90 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
91 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
92 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
93 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96 import scala.concurrent.Future;
97 import scala.concurrent.duration.Duration;
98 import scala.concurrent.duration.FiniteDuration;
99
100 /**
101  * The ShardManager has the following jobs,
102  * <ul>
103  * <li> Create all the local shard replicas that belong on this cluster member
104  * <li> Find the address of the local shard
105  * <li> Find the primary replica for any given shard
106  * <li> Monitor the cluster members and store their addresses
107  * <ul>
108  */
109 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
110
111     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
112
113     // Stores a mapping between a shard name and it's corresponding information
114     // Shard names look like inventory, topology etc and are as specified in
115     // configuration
116     private final Map<String, ShardInformation> localShards = new HashMap<>();
117
118     // The type of a ShardManager reflects the type of the datastore itself
119     // A data store could be of type config/operational
120     private final String type;
121
122     private final ClusterWrapper cluster;
123
124     private final Configuration configuration;
125
126     private final String shardDispatcherPath;
127
128     private final ShardManagerInfo mBean;
129
130     private DatastoreContextFactory datastoreContextFactory;
131
132     private final CountDownLatch waitTillReadyCountdownLatch;
133
134     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
135
136     private final ShardPeerAddressResolver peerAddressResolver;
137
138     private SchemaContext schemaContext;
139
140     private DatastoreSnapshot restoreFromSnapshot;
141
142     private ShardManagerSnapshot recoveredSnapshot;
143
144     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
145
146     private final String persistenceId;
147
148     /**
149      */
150     protected ShardManager(AbstractBuilder<?> builder) {
151
152         this.cluster = builder.cluster;
153         this.configuration = builder.configuration;
154         this.datastoreContextFactory = builder.datastoreContextFactory;
155         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
156         this.shardDispatcherPath =
157                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
158         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
159         this.primaryShardInfoCache = builder.primaryShardInfoCache;
160         this.restoreFromSnapshot = builder.restoreFromSnapshot;
161
162         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
163         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
164
165         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
166
167         // Subscribe this actor to cluster member events
168         cluster.subscribeToMemberEvents(getSelf());
169
170         List<String> localShardActorNames = new ArrayList<>();
171         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
172                 "shard-manager-" + this.type,
173                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
174                 localShardActorNames);
175         mBean.setShardManager(this);
176     }
177
178     @Override
179     public void postStop() {
180         LOG.info("Stopping ShardManager");
181
182         mBean.unregisterMBean();
183     }
184
185     @Override
186     public void handleCommand(Object message) throws Exception {
187         if (message  instanceof FindPrimary) {
188             findPrimary((FindPrimary)message);
189         } else if(message instanceof FindLocalShard){
190             findLocalShard((FindLocalShard) message);
191         } else if (message instanceof UpdateSchemaContext) {
192             updateSchemaContext(message);
193         } else if(message instanceof ActorInitialized) {
194             onActorInitialized(message);
195         } else if (message instanceof ClusterEvent.MemberUp){
196             memberUp((ClusterEvent.MemberUp) message);
197         } else if (message instanceof ClusterEvent.MemberExited){
198             memberExited((ClusterEvent.MemberExited) message);
199         } else if(message instanceof ClusterEvent.MemberRemoved) {
200             memberRemoved((ClusterEvent.MemberRemoved) message);
201         } else if(message instanceof ClusterEvent.UnreachableMember) {
202             memberUnreachable((ClusterEvent.UnreachableMember)message);
203         } else if(message instanceof ClusterEvent.ReachableMember) {
204             memberReachable((ClusterEvent.ReachableMember) message);
205         } else if(message instanceof DatastoreContextFactory) {
206             onDatastoreContextFactory((DatastoreContextFactory)message);
207         } else if(message instanceof RoleChangeNotification) {
208             onRoleChangeNotification((RoleChangeNotification) message);
209         } else if(message instanceof FollowerInitialSyncUpStatus){
210             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
211         } else if(message instanceof ShardNotInitializedTimeout) {
212             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
213         } else if(message instanceof ShardLeaderStateChanged) {
214             onLeaderStateChanged((ShardLeaderStateChanged) message);
215         } else if(message instanceof SwitchShardBehavior){
216             onSwitchShardBehavior((SwitchShardBehavior) message);
217         } else if(message instanceof CreateShard) {
218             onCreateShard((CreateShard)message);
219         } else if(message instanceof AddShardReplica){
220             onAddShardReplica((AddShardReplica)message);
221         } else if(message instanceof ForwardedAddServerReply) {
222             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
223             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
224                     msg.removeShardOnFailure);
225         } else if(message instanceof ForwardedAddServerFailure) {
226             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
227             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
228         } else if(message instanceof PrimaryShardFoundForContext) {
229             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
230             onPrimaryShardFoundContext(primaryShardFoundContext);
231         } else if(message instanceof RemoveShardReplica){
232             onRemoveShardReplica((RemoveShardReplica)message);
233         } else if(message instanceof GetSnapshot) {
234             onGetSnapshot();
235         } else if(message instanceof ServerRemoved){
236             onShardReplicaRemoved((ServerRemoved) message);
237         } else if (message instanceof SaveSnapshotSuccess) {
238             LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
239         } else if (message instanceof SaveSnapshotFailure) {
240             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
241                     persistenceId(), ((SaveSnapshotFailure) message).cause());
242         } else {
243             unknownMessage(message);
244         }
245     }
246
247     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
248         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
249             addShard(primaryShardFoundContext.shardName, primaryShardFoundContext.getRemotePrimaryShardFound(), getSender());
250         }
251     }
252
253     private void onShardReplicaRemoved(ServerRemoved message) {
254         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
255         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
256         if(shardInformation == null) {
257             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
258             return;
259         } else if(shardInformation.getActor() != null) {
260             LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
261             shardInformation.getActor().tell(PoisonPill.getInstance(), self());
262         }
263         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
264         persistShardList();
265     }
266
267     private void onGetSnapshot() {
268         LOG.debug("{}: onGetSnapshot", persistenceId());
269
270         List<String> notInitialized = null;
271         for(ShardInformation shardInfo: localShards.values()) {
272             if(!shardInfo.isShardInitialized()) {
273                 if(notInitialized == null) {
274                     notInitialized = new ArrayList<>();
275                 }
276
277                 notInitialized.add(shardInfo.getShardName());
278             }
279         }
280
281         if(notInitialized != null) {
282             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
283                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
284             return;
285         }
286
287         byte[] shardManagerSnapshot = null;
288         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
289                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
290                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
291
292         for(ShardInformation shardInfo: localShards.values()) {
293             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
294         }
295     }
296
297     private void onCreateShard(CreateShard createShard) {
298         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
299
300         Object reply;
301         try {
302             String shardName = createShard.getModuleShardConfig().getShardName();
303             if(localShards.containsKey(shardName)) {
304                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
305                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
306             } else {
307                 doCreateShard(createShard);
308                 reply = new akka.actor.Status.Success(null);
309             }
310         } catch (Exception e) {
311             LOG.error("{}: onCreateShard failed", persistenceId(), e);
312             reply = new akka.actor.Status.Failure(e);
313         }
314
315         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
316             getSender().tell(reply, getSelf());
317         }
318     }
319
320     private void doCreateShard(CreateShard createShard) {
321         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
322         String shardName = moduleShardConfig.getShardName();
323
324         configuration.addModuleShardConfiguration(moduleShardConfig);
325
326         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
327         if(shardDatastoreContext == null) {
328             shardDatastoreContext = newShardDatastoreContext(shardName);
329         } else {
330             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
331                     peerAddressResolver).build();
332         }
333
334         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
335
336         boolean shardWasInRecoveredSnapshot = recoveredSnapshot != null &&
337                 recoveredSnapshot.getShardList().contains(shardName);
338
339         Map<String, String> peerAddresses;
340         boolean isActiveMember;
341         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
342                 contains(cluster.getCurrentMemberName())) {
343             peerAddresses = getPeerAddresses(shardName);
344             isActiveMember = true;
345         } else {
346             // The local member is not in the static shard member configuration and the shard did not
347             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
348             // the shard with no peers and with elections disabled so it stays as follower. A
349             // subsequent AddServer request will be needed to make it an active member.
350             isActiveMember = false;
351             peerAddresses = Collections.emptyMap();
352             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
353                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
354         }
355
356         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
357                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
358                 isActiveMember);
359
360         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
361                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
362         info.setActiveMember(isActiveMember);
363         localShards.put(info.getShardName(), info);
364
365         mBean.addLocalShard(shardId.toString());
366
367         if(schemaContext != null) {
368             info.setActor(newShardActor(schemaContext, info));
369         }
370     }
371
372     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
373         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
374                 shardPeerAddressResolver(peerAddressResolver);
375     }
376
377     private DatastoreContext newShardDatastoreContext(String shardName) {
378         return newShardDatastoreContextBuilder(shardName).build();
379     }
380
381     private void checkReady(){
382         if (isReadyWithLeaderId()) {
383             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
384                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
385
386             waitTillReadyCountdownLatch.countDown();
387         }
388     }
389
390     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
391         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
392
393         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
394         if(shardInformation != null) {
395             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
396             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
397             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
398                 primaryShardInfoCache.remove(shardInformation.getShardName());
399             }
400
401             checkReady();
402         } else {
403             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
404         }
405     }
406
407     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
408         ShardInformation shardInfo = message.getShardInfo();
409
410         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
411                 shardInfo.getShardName());
412
413         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
414
415         if(!shardInfo.isShardInitialized()) {
416             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
417             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
418         } else {
419             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
420             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
421         }
422     }
423
424     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
425         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
426                 status.getName(), status.isInitialSyncDone());
427
428         ShardInformation shardInformation = findShardInformation(status.getName());
429
430         if(shardInformation != null) {
431             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
432
433             mBean.setSyncStatus(isInSync());
434         }
435
436     }
437
438     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
439         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
440                 roleChanged.getOldRole(), roleChanged.getNewRole());
441
442         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
443         if(shardInformation != null) {
444             shardInformation.setRole(roleChanged.getNewRole());
445             checkReady();
446             mBean.setSyncStatus(isInSync());
447         }
448     }
449
450
451     private ShardInformation findShardInformation(String memberId) {
452         for(ShardInformation info : localShards.values()){
453             if(info.getShardId().toString().equals(memberId)){
454                 return info;
455             }
456         }
457
458         return null;
459     }
460
461     private boolean isReadyWithLeaderId() {
462         boolean isReady = true;
463         for (ShardInformation info : localShards.values()) {
464             if(!info.isShardReadyWithLeaderId()){
465                 isReady = false;
466                 break;
467             }
468         }
469         return isReady;
470     }
471
472     private boolean isInSync(){
473         for (ShardInformation info : localShards.values()) {
474             if(!info.isInSync()){
475                 return false;
476             }
477         }
478         return true;
479     }
480
481     private void onActorInitialized(Object message) {
482         final ActorRef sender = getSender();
483
484         if (sender == null) {
485             return; //why is a non-actor sending this message? Just ignore.
486         }
487
488         String actorName = sender.path().name();
489         //find shard name from actor name; actor name is stringified shardId
490         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
491
492         if (shardId.getShardName() == null) {
493             return;
494         }
495
496         markShardAsInitialized(shardId.getShardName());
497     }
498
499     private void markShardAsInitialized(String shardName) {
500         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
501
502         ShardInformation shardInformation = localShards.get(shardName);
503         if (shardInformation != null) {
504             shardInformation.setActorInitialized();
505
506             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
507         }
508     }
509
510     @Override
511     protected void handleRecover(Object message) throws Exception {
512         if (message instanceof RecoveryCompleted) {
513             LOG.info("Recovery complete : {}", persistenceId());
514
515             // We no longer persist SchemaContext modules so delete all the prior messages from the akka
516             // journal on upgrade from Helium.
517             deleteMessages(lastSequenceNr());
518             createLocalShards();
519         } else if (message instanceof SnapshotOffer) {
520             onSnapshotOffer((SnapshotOffer) message);
521         }
522     }
523
524     private void findLocalShard(FindLocalShard message) {
525         final ShardInformation shardInformation = localShards.get(message.getShardName());
526
527         if(shardInformation == null){
528             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
529             return;
530         }
531
532         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
533             @Override
534             public Object get() {
535                 return new LocalShardFound(shardInformation.getActor());
536             }
537         });
538     }
539
540     private void sendResponse(ShardInformation shardInformation, boolean doWait,
541             boolean wantShardReady, final Supplier<Object> messageSupplier) {
542         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
543             if(doWait) {
544                 final ActorRef sender = getSender();
545                 final ActorRef self = self();
546
547                 Runnable replyRunnable = new Runnable() {
548                     @Override
549                     public void run() {
550                         sender.tell(messageSupplier.get(), self);
551                     }
552                 };
553
554                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
555                     new OnShardInitialized(replyRunnable);
556
557                 shardInformation.addOnShardInitialized(onShardInitialized);
558
559                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
560                 if(shardInformation.isShardInitialized()) {
561                     // If the shard is already initialized then we'll wait enough time for the shard to
562                     // elect a leader, ie 2 times the election timeout.
563                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
564                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
565                 }
566
567                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
568                         shardInformation.getShardName());
569
570                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
571                         timeout, getSelf(),
572                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
573                         getContext().dispatcher(), getSelf());
574
575                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
576
577             } else if (!shardInformation.isShardInitialized()) {
578                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
579                         shardInformation.getShardName());
580                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
581             } else {
582                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
583                         shardInformation.getShardName());
584                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
585             }
586
587             return;
588         }
589
590         getSender().tell(messageSupplier.get(), getSelf());
591     }
592
593     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
594         return new NoShardLeaderException(null, shardId.toString());
595     }
596
597     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
598         return new NotInitializedException(String.format(
599                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
600     }
601
602     private void memberRemoved(ClusterEvent.MemberRemoved message) {
603         String memberName = message.member().roles().head();
604
605         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
606                 message.member().address());
607
608         peerAddressResolver.removePeerAddress(memberName);
609
610         for(ShardInformation info : localShards.values()){
611             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
612         }
613     }
614
615     private void memberExited(ClusterEvent.MemberExited message) {
616         String memberName = message.member().roles().head();
617
618         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
619                 message.member().address());
620
621         peerAddressResolver.removePeerAddress(memberName);
622
623         for(ShardInformation info : localShards.values()){
624             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
625         }
626     }
627
628     private void memberUp(ClusterEvent.MemberUp message) {
629         String memberName = message.member().roles().head();
630
631         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
632                 message.member().address());
633
634         addPeerAddress(memberName, message.member().address());
635
636         checkReady();
637     }
638
639     private void addPeerAddress(String memberName, Address address) {
640         peerAddressResolver.addPeerAddress(memberName, address);
641
642         for(ShardInformation info : localShards.values()){
643             String shardName = info.getShardName();
644             String peerId = getShardIdentifier(memberName, shardName).toString();
645             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
646
647             info.peerUp(memberName, peerId, getSelf());
648         }
649     }
650
651     private void memberReachable(ClusterEvent.ReachableMember message) {
652         String memberName = message.member().roles().head();
653         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
654
655         addPeerAddress(memberName, message.member().address());
656
657         markMemberAvailable(memberName);
658     }
659
660     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
661         String memberName = message.member().roles().head();
662         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
663
664         markMemberUnavailable(memberName);
665     }
666
667     private void markMemberUnavailable(final String memberName) {
668         for(ShardInformation info : localShards.values()){
669             String leaderId = info.getLeaderId();
670             if(leaderId != null && leaderId.contains(memberName)) {
671                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
672                 info.setLeaderAvailable(false);
673
674                 primaryShardInfoCache.remove(info.getShardName());
675             }
676
677             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
678         }
679     }
680
681     private void markMemberAvailable(final String memberName) {
682         for(ShardInformation info : localShards.values()){
683             String leaderId = info.getLeaderId();
684             if(leaderId != null && leaderId.contains(memberName)) {
685                 LOG.debug("Marking Leader {} as available.", leaderId);
686                 info.setLeaderAvailable(true);
687             }
688
689             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
690         }
691     }
692
693     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
694         datastoreContextFactory = factory;
695         for (ShardInformation info : localShards.values()) {
696             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
697         }
698     }
699
700     private void onSwitchShardBehavior(SwitchShardBehavior message) {
701         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
702
703         ShardInformation shardInformation = localShards.get(identifier.getShardName());
704
705         if(shardInformation != null && shardInformation.getActor() != null) {
706             shardInformation.getActor().tell(
707                     new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
708         } else {
709             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
710                     message.getShardName(), message.getNewState());
711         }
712     }
713
714     /**
715      * Notifies all the local shards of a change in the schema context
716      *
717      * @param message
718      */
719     private void updateSchemaContext(final Object message) {
720         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
721
722         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
723
724         for (ShardInformation info : localShards.values()) {
725             if (info.getActor() == null) {
726                 LOG.debug("Creating Shard {}", info.getShardId());
727                 info.setActor(newShardActor(schemaContext, info));
728             } else {
729                 info.getActor().tell(message, getSelf());
730             }
731         }
732     }
733
734     @VisibleForTesting
735     protected ClusterWrapper getCluster() {
736         return cluster;
737     }
738
739     @VisibleForTesting
740     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
741         return getContext().actorOf(info.newProps(schemaContext)
742                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
743     }
744
745     private void findPrimary(FindPrimary message) {
746         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
747
748         final String shardName = message.getShardName();
749         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
750
751         // First see if the there is a local replica for the shard
752         final ShardInformation info = localShards.get(shardName);
753         if (info != null && info.isActiveMember()) {
754             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
755                 @Override
756                 public Object get() {
757                     String primaryPath = info.getSerializedLeaderActor();
758                     Object found = canReturnLocalShardState && info.isLeader() ?
759                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
760                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
761
762                             if(LOG.isDebugEnabled()) {
763                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
764                             }
765
766                             return found;
767                 }
768             });
769
770             return;
771         }
772
773         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
774             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
775                     shardName, address);
776
777             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
778                     message.isWaitUntilReady()), getContext());
779             return;
780         }
781
782         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
783
784         getSender().tell(new PrimaryNotFoundException(
785                 String.format("No primary shard found for %s.", shardName)), getSelf());
786     }
787
788     /**
789      * Construct the name of the shard actor given the name of the member on
790      * which the shard resides and the name of the shard
791      *
792      * @param memberName
793      * @param shardName
794      * @return
795      */
796     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
797         return peerAddressResolver.getShardIdentifier(memberName, shardName);
798     }
799
800     /**
801      * Create shards that are local to the member on which the ShardManager
802      * runs
803      *
804      */
805     private void createLocalShards() {
806         String memberName = this.cluster.getCurrentMemberName();
807         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
808
809         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
810         if(restoreFromSnapshot != null)
811         {
812             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
813                 shardSnapshots.put(snapshot.getName(), snapshot);
814             }
815         }
816
817         restoreFromSnapshot = null; // null out to GC
818
819         for(String shardName : memberShardNames){
820             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
821
822             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
823
824             Map<String, String> peerAddresses = getPeerAddresses(shardName);
825             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
826                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
827                         shardSnapshots.get(shardName)), peerAddressResolver));
828             mBean.addLocalShard(shardId.toString());
829         }
830     }
831
832     /**
833      * Given the name of the shard find the addresses of all it's peers
834      *
835      * @param shardName
836      */
837     private Map<String, String> getPeerAddresses(String shardName) {
838         Collection<String> members = configuration.getMembersFromShardName(shardName);
839         Map<String, String> peerAddresses = new HashMap<>();
840
841         String currentMemberName = this.cluster.getCurrentMemberName();
842
843         for(String memberName : members) {
844             if(!currentMemberName.equals(memberName)) {
845                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
846                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
847                 peerAddresses.put(shardId.toString(), address);
848             }
849         }
850         return peerAddresses;
851     }
852
853     @Override
854     public SupervisorStrategy supervisorStrategy() {
855
856         return new OneForOneStrategy(10, Duration.create("1 minute"),
857                 new Function<Throwable, SupervisorStrategy.Directive>() {
858             @Override
859             public SupervisorStrategy.Directive apply(Throwable t) {
860                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
861                 return SupervisorStrategy.resume();
862             }
863         }
864                 );
865
866     }
867
868     @Override
869     public String persistenceId() {
870         return persistenceId;
871     }
872
873     @VisibleForTesting
874     ShardManagerInfoMBean getMBean(){
875         return mBean;
876     }
877
878     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
879         if (shardReplicaOperationsInProgress.contains(shardName)) {
880             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
881             LOG.debug ("{}: {}", persistenceId(), msg);
882             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
883             return true;
884         }
885
886         return false;
887     }
888
889     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
890         final String shardName = shardReplicaMsg.getShardName();
891
892         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
893
894         // verify the shard with the specified name is present in the cluster configuration
895         if (!(this.configuration.isShardConfigured(shardName))) {
896             String msg = String.format("No module configuration exists for shard %s", shardName);
897             LOG.debug ("{}: {}", persistenceId(), msg);
898             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
899             return;
900         }
901
902         // Create the localShard
903         if (schemaContext == null) {
904             String msg = String.format(
905                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
906             LOG.debug ("{}: {}", persistenceId(), msg);
907             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
908             return;
909         }
910
911         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
912             @Override
913             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
914                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
915
916             }
917
918             @Override
919             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
920                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
921             }
922
923         });
924     }
925
926     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
927         String msg = String.format("Local shard %s already exists", shardName);
928         LOG.debug ("{}: {}", persistenceId(), msg);
929         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
930     }
931
932     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
933         if(isShardReplicaOperationInProgress(shardName, sender)) {
934             return;
935         }
936
937         shardReplicaOperationsInProgress.add(shardName);
938
939         final ShardInformation shardInfo;
940         final boolean removeShardOnFailure;
941         ShardInformation existingShardInfo = localShards.get(shardName);
942         if(existingShardInfo == null) {
943             removeShardOnFailure = true;
944             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
945
946             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
947                     DisableElectionsRaftPolicy.class.getName()).build();
948
949             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
950                     Shard.builder(), peerAddressResolver);
951             shardInfo.setActiveMember(false);
952             localShards.put(shardName, shardInfo);
953             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
954         } else {
955             removeShardOnFailure = false;
956             shardInfo = existingShardInfo;
957         }
958
959         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
960
961         //inform ShardLeader to add this shard as a replica by sending an AddServer message
962         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
963                 response.getPrimaryPath(), shardInfo.getShardId());
964
965         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
966                 duration());
967         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
968             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
969
970         futureObj.onComplete(new OnComplete<Object>() {
971             @Override
972             public void onComplete(Throwable failure, Object addServerResponse) {
973                 if (failure != null) {
974                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
975                             response.getPrimaryPath(), shardName, failure);
976
977                     String msg = String.format("AddServer request to leader %s for shard %s failed",
978                             response.getPrimaryPath(), shardName);
979                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
980                 } else {
981                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
982                             response.getPrimaryPath(), removeShardOnFailure), sender);
983                 }
984             }
985         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
986     }
987
988     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
989             boolean removeShardOnFailure) {
990         shardReplicaOperationsInProgress.remove(shardName);
991
992         if(removeShardOnFailure) {
993             ShardInformation shardInfo = localShards.remove(shardName);
994             if (shardInfo.getActor() != null) {
995                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
996             }
997         }
998
999         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
1000             new RuntimeException(message, failure)), getSelf());
1001     }
1002
1003     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1004             String leaderPath, boolean removeShardOnFailure) {
1005         String shardName = shardInfo.getShardName();
1006         shardReplicaOperationsInProgress.remove(shardName);
1007
1008         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1009
1010         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1011             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1012
1013             // Make the local shard voting capable
1014             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1015             shardInfo.setActiveMember(true);
1016             persistShardList();
1017
1018             mBean.addLocalShard(shardInfo.getShardId().toString());
1019             sender.tell(new akka.actor.Status.Success(null), getSelf());
1020         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1021             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1022         } else {
1023             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1024                     persistenceId(), shardName, replyMsg.getStatus());
1025
1026             Exception failure;
1027             switch (replyMsg.getStatus()) {
1028                 case TIMEOUT:
1029                     failure = new TimeoutException(String.format(
1030                             "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1031                             "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1032                             leaderPath, shardName));
1033                     break;
1034                 case NO_LEADER:
1035                     failure = createNoShardLeaderException(shardInfo.getShardId());
1036                     break;
1037                 default :
1038                     failure = new RuntimeException(String.format(
1039                             "AddServer request to leader %s for shard %s failed with status %s",
1040                             leaderPath, shardName, replyMsg.getStatus()));
1041             }
1042
1043             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1044         }
1045     }
1046
1047     private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
1048         String shardName = shardReplicaMsg.getShardName();
1049
1050         // verify the local shard replica is available in the controller node
1051         if (!localShards.containsKey(shardName)) {
1052             String msg = String.format("Local shard %s does not", shardName);
1053             LOG.debug ("{}: {}", persistenceId(), msg);
1054             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1055             return;
1056         }
1057         // call RemoveShard for the shardName
1058         getSender().tell(new akka.actor.Status.Success(true), getSelf());
1059         return;
1060     }
1061
1062     private void persistShardList() {
1063         List<String> shardList = new ArrayList<>(localShards.keySet());
1064         for (ShardInformation shardInfo : localShards.values()) {
1065             if (!shardInfo.isActiveMember()) {
1066                 shardList.remove(shardInfo.getShardName());
1067             }
1068         }
1069         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1070         saveSnapshot(new ShardManagerSnapshot(shardList));
1071     }
1072
1073     private void onSnapshotOffer(SnapshotOffer offer) {
1074         recoveredSnapshot = (ShardManagerSnapshot)offer.snapshot();
1075
1076         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), recoveredSnapshot);
1077
1078         String currentMember = cluster.getCurrentMemberName();
1079         Set<String> configuredShardList =
1080             new HashSet<>(configuration.getMemberShardNames(currentMember));
1081         for (String shard : recoveredSnapshot.getShardList()) {
1082             if (!configuredShardList.contains(shard)) {
1083                 // add the current member as a replica for the shard
1084                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1085                 configuration.addMemberReplicaForShard(shard, currentMember);
1086             } else {
1087                 configuredShardList.remove(shard);
1088             }
1089         }
1090         for (String shard : configuredShardList) {
1091             // remove the member as a replica for the shard
1092             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1093             configuration.removeMemberReplicaForShard(shard, currentMember);
1094         }
1095     }
1096
1097     private static class ForwardedAddServerReply {
1098         ShardInformation shardInfo;
1099         AddServerReply addServerReply;
1100         String leaderPath;
1101         boolean removeShardOnFailure;
1102
1103         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1104                 boolean removeShardOnFailure) {
1105             this.shardInfo = shardInfo;
1106             this.addServerReply = addServerReply;
1107             this.leaderPath = leaderPath;
1108             this.removeShardOnFailure = removeShardOnFailure;
1109         }
1110     }
1111
1112     private static class ForwardedAddServerFailure {
1113         String shardName;
1114         String failureMessage;
1115         Throwable failure;
1116         boolean removeShardOnFailure;
1117
1118         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1119                 boolean removeShardOnFailure) {
1120             this.shardName = shardName;
1121             this.failureMessage = failureMessage;
1122             this.failure = failure;
1123             this.removeShardOnFailure = removeShardOnFailure;
1124         }
1125     }
1126
1127     @VisibleForTesting
1128     protected static class ShardInformation {
1129         private final ShardIdentifier shardId;
1130         private final String shardName;
1131         private ActorRef actor;
1132         private ActorPath actorPath;
1133         private final Map<String, String> initialPeerAddresses;
1134         private Optional<DataTree> localShardDataTree;
1135         private boolean leaderAvailable = false;
1136
1137         // flag that determines if the actor is ready for business
1138         private boolean actorInitialized = false;
1139
1140         private boolean followerSyncStatus = false;
1141
1142         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1143         private String role ;
1144         private String leaderId;
1145         private short leaderVersion;
1146
1147         private DatastoreContext datastoreContext;
1148         private Shard.AbstractBuilder<?, ?> builder;
1149         private final ShardPeerAddressResolver addressResolver;
1150         private boolean isActiveMember = true;
1151
1152         private ShardInformation(String shardName, ShardIdentifier shardId,
1153                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1154                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1155             this.shardName = shardName;
1156             this.shardId = shardId;
1157             this.initialPeerAddresses = initialPeerAddresses;
1158             this.datastoreContext = datastoreContext;
1159             this.builder = builder;
1160             this.addressResolver = addressResolver;
1161         }
1162
1163         Props newProps(SchemaContext schemaContext) {
1164             Preconditions.checkNotNull(builder);
1165             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1166                     schemaContext(schemaContext).props();
1167             builder = null;
1168             return props;
1169         }
1170
1171         String getShardName() {
1172             return shardName;
1173         }
1174
1175         @Nullable
1176         ActorRef getActor(){
1177             return actor;
1178         }
1179
1180         ActorPath getActorPath() {
1181             return actorPath;
1182         }
1183
1184         void setActor(ActorRef actor) {
1185             this.actor = actor;
1186             this.actorPath = actor.path();
1187         }
1188
1189         ShardIdentifier getShardId() {
1190             return shardId;
1191         }
1192
1193         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1194             this.localShardDataTree = localShardDataTree;
1195         }
1196
1197         Optional<DataTree> getLocalShardDataTree() {
1198             return localShardDataTree;
1199         }
1200
1201         DatastoreContext getDatastoreContext() {
1202             return datastoreContext;
1203         }
1204
1205         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1206             this.datastoreContext = datastoreContext;
1207             if (actor != null) {
1208                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1209                 actor.tell(this.datastoreContext, sender);
1210             }
1211         }
1212
1213         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1214             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1215
1216             if(actor != null) {
1217                 if(LOG.isDebugEnabled()) {
1218                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1219                             peerId, peerAddress, actor.path());
1220                 }
1221
1222                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1223             }
1224
1225             notifyOnShardInitializedCallbacks();
1226         }
1227
1228         void peerDown(String memberName, String peerId, ActorRef sender) {
1229             if(actor != null) {
1230                 actor.tell(new PeerDown(memberName, peerId), sender);
1231             }
1232         }
1233
1234         void peerUp(String memberName, String peerId, ActorRef sender) {
1235             if(actor != null) {
1236                 actor.tell(new PeerUp(memberName, peerId), sender);
1237             }
1238         }
1239
1240         boolean isShardReady() {
1241             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1242         }
1243
1244         boolean isShardReadyWithLeaderId() {
1245             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1246                     (isLeader() || addressResolver.resolve(leaderId) != null);
1247         }
1248
1249         boolean isShardInitialized() {
1250             return getActor() != null && actorInitialized;
1251         }
1252
1253         boolean isLeader() {
1254             return Objects.equal(leaderId, shardId.toString());
1255         }
1256
1257         String getSerializedLeaderActor() {
1258             if(isLeader()) {
1259                 return Serialization.serializedActorPath(getActor());
1260             } else {
1261                 return addressResolver.resolve(leaderId);
1262             }
1263         }
1264
1265         void setActorInitialized() {
1266             LOG.debug("Shard {} is initialized", shardId);
1267
1268             this.actorInitialized = true;
1269
1270             notifyOnShardInitializedCallbacks();
1271         }
1272
1273         private void notifyOnShardInitializedCallbacks() {
1274             if(onShardInitializedSet.isEmpty()) {
1275                 return;
1276             }
1277
1278             boolean ready = isShardReadyWithLeaderId();
1279
1280             if(LOG.isDebugEnabled()) {
1281                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1282                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1283             }
1284
1285             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1286             while(iter.hasNext()) {
1287                 OnShardInitialized onShardInitialized = iter.next();
1288                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1289                     iter.remove();
1290                     onShardInitialized.getTimeoutSchedule().cancel();
1291                     onShardInitialized.getReplyRunnable().run();
1292                 }
1293             }
1294         }
1295
1296         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1297             onShardInitializedSet.add(onShardInitialized);
1298         }
1299
1300         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1301             onShardInitializedSet.remove(onShardInitialized);
1302         }
1303
1304         void setRole(String newRole) {
1305             this.role = newRole;
1306
1307             notifyOnShardInitializedCallbacks();
1308         }
1309
1310         void setFollowerSyncStatus(boolean syncStatus){
1311             this.followerSyncStatus = syncStatus;
1312         }
1313
1314         boolean isInSync(){
1315             if(RaftState.Follower.name().equals(this.role)){
1316                 return followerSyncStatus;
1317             } else if(RaftState.Leader.name().equals(this.role)){
1318                 return true;
1319             }
1320
1321             return false;
1322         }
1323
1324         boolean setLeaderId(String leaderId) {
1325             boolean changed = !Objects.equal(this.leaderId, leaderId);
1326             this.leaderId = leaderId;
1327             if(leaderId != null) {
1328                 this.leaderAvailable = true;
1329             }
1330             notifyOnShardInitializedCallbacks();
1331
1332             return changed;
1333         }
1334
1335         String getLeaderId() {
1336             return leaderId;
1337         }
1338
1339         void setLeaderAvailable(boolean leaderAvailable) {
1340             this.leaderAvailable = leaderAvailable;
1341         }
1342
1343         short getLeaderVersion() {
1344             return leaderVersion;
1345         }
1346
1347         void setLeaderVersion(short leaderVersion) {
1348             this.leaderVersion = leaderVersion;
1349         }
1350
1351         boolean isActiveMember() {
1352             return isActiveMember;
1353         }
1354
1355         void setActiveMember(boolean isActiveMember) {
1356             this.isActiveMember = isActiveMember;
1357         }
1358     }
1359
1360     private static class OnShardInitialized {
1361         private final Runnable replyRunnable;
1362         private Cancellable timeoutSchedule;
1363
1364         OnShardInitialized(Runnable replyRunnable) {
1365             this.replyRunnable = replyRunnable;
1366         }
1367
1368         Runnable getReplyRunnable() {
1369             return replyRunnable;
1370         }
1371
1372         Cancellable getTimeoutSchedule() {
1373             return timeoutSchedule;
1374         }
1375
1376         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1377             this.timeoutSchedule = timeoutSchedule;
1378         }
1379     }
1380
1381     private static class OnShardReady extends OnShardInitialized {
1382         OnShardReady(Runnable replyRunnable) {
1383             super(replyRunnable);
1384         }
1385     }
1386
1387     private static class ShardNotInitializedTimeout {
1388         private final ActorRef sender;
1389         private final ShardInformation shardInfo;
1390         private final OnShardInitialized onShardInitialized;
1391
1392         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1393             this.sender = sender;
1394             this.shardInfo = shardInfo;
1395             this.onShardInitialized = onShardInitialized;
1396         }
1397
1398         ActorRef getSender() {
1399             return sender;
1400         }
1401
1402         ShardInformation getShardInfo() {
1403             return shardInfo;
1404         }
1405
1406         OnShardInitialized getOnShardInitialized() {
1407             return onShardInitialized;
1408         }
1409     }
1410
1411     /**
1412      * We no longer persist SchemaContextModules but keep this class around for now for backwards
1413      * compatibility so we don't get de-serialization failures on upgrade from Helium.
1414      */
1415     @Deprecated
1416     static class SchemaContextModules implements Serializable {
1417         private static final long serialVersionUID = -8884620101025936590L;
1418
1419         private final Set<String> modules;
1420
1421         SchemaContextModules(Set<String> modules){
1422             this.modules = modules;
1423         }
1424
1425         public Set<String> getModules() {
1426             return modules;
1427         }
1428     }
1429
1430     public static Builder builder() {
1431         return new Builder();
1432     }
1433
1434     public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
1435         private ClusterWrapper cluster;
1436         private Configuration configuration;
1437         private DatastoreContextFactory datastoreContextFactory;
1438         private CountDownLatch waitTillReadyCountdownLatch;
1439         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1440         private DatastoreSnapshot restoreFromSnapshot;
1441         private volatile boolean sealed;
1442
1443         @SuppressWarnings("unchecked")
1444         private T self() {
1445             return (T) this;
1446         }
1447
1448         protected void checkSealed() {
1449             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1450         }
1451
1452         public T cluster(ClusterWrapper cluster) {
1453             checkSealed();
1454             this.cluster = cluster;
1455             return self();
1456         }
1457
1458         public T configuration(Configuration configuration) {
1459             checkSealed();
1460             this.configuration = configuration;
1461             return self();
1462         }
1463
1464         public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1465             checkSealed();
1466             this.datastoreContextFactory = datastoreContextFactory;
1467             return self();
1468         }
1469
1470         public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1471             checkSealed();
1472             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1473             return self();
1474         }
1475
1476         public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1477             checkSealed();
1478             this.primaryShardInfoCache = primaryShardInfoCache;
1479             return self();
1480         }
1481
1482         public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1483             checkSealed();
1484             this.restoreFromSnapshot = restoreFromSnapshot;
1485             return self();
1486         }
1487
1488         protected void verify() {
1489             sealed = true;
1490             Preconditions.checkNotNull(cluster, "cluster should not be null");
1491             Preconditions.checkNotNull(configuration, "configuration should not be null");
1492             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1493             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1494             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1495         }
1496
1497         public Props props() {
1498             verify();
1499             return Props.create(ShardManager.class, this);
1500         }
1501     }
1502
1503     public static class Builder extends AbstractBuilder<Builder> {
1504     }
1505
1506     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1507         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1508                 getShardInitializationTimeout().duration().$times(2));
1509
1510
1511         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1512         futureObj.onComplete(new OnComplete<Object>() {
1513             @Override
1514             public void onComplete(Throwable failure, Object response) {
1515                 if (failure != null) {
1516                     handler.onFailure(failure);
1517                 } else {
1518                     if(response instanceof RemotePrimaryShardFound) {
1519                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1520                     } else if(response instanceof LocalPrimaryShardFound) {
1521                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1522                     } else {
1523                         handler.onUnknownResponse(response);
1524                     }
1525                 }
1526             }
1527         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1528     }
1529
1530     /**
1531      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1532      * a remote or local find primary message is processed
1533      */
1534     private static interface FindPrimaryResponseHandler {
1535         /**
1536          * Invoked when a Failure message is received as a response
1537          *
1538          * @param failure
1539          */
1540         void onFailure(Throwable failure);
1541
1542         /**
1543          * Invoked when a RemotePrimaryShardFound response is received
1544          *
1545          * @param response
1546          */
1547         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1548
1549         /**
1550          * Invoked when a LocalPrimaryShardFound response is received
1551          * @param response
1552          */
1553         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1554
1555         /**
1556          * Invoked when an unknown response is received. This is another type of failure.
1557          *
1558          * @param response
1559          */
1560         void onUnknownResponse(Object response);
1561     }
1562
1563     /**
1564      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1565      * replica and sends a wrapped Failure response to some targetActor
1566      */
1567     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1568         private final ActorRef targetActor;
1569         private final String shardName;
1570         private final String persistenceId;
1571         private final ActorRef shardManagerActor;
1572
1573         /**
1574          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1575          * @param shardName The name of the shard for which the primary replica had to be found
1576          * @param persistenceId The persistenceId for the ShardManager
1577          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1578          */
1579         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1580             this.targetActor = Preconditions.checkNotNull(targetActor);
1581             this.shardName = Preconditions.checkNotNull(shardName);
1582             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1583             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1584         }
1585
1586         public ActorRef getTargetActor() {
1587             return targetActor;
1588         }
1589
1590         public String getShardName() {
1591             return shardName;
1592         }
1593
1594         public ActorRef getShardManagerActor() {
1595             return shardManagerActor;
1596         }
1597
1598         @Override
1599         public void onFailure(Throwable failure) {
1600             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1601             targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1602                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1603         }
1604
1605         @Override
1606         public void onUnknownResponse(Object response) {
1607             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1608                     shardName, response);
1609             LOG.debug ("{}: {}", persistenceId, msg);
1610             targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1611                     new RuntimeException(msg)), shardManagerActor);
1612         }
1613     }
1614
1615
1616     /**
1617      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1618      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1619      * as a successful response to find primary.
1620      */
1621     private static class PrimaryShardFoundForContext {
1622         private final String shardName;
1623         private final Object contextMessage;
1624         private final RemotePrimaryShardFound remotePrimaryShardFound;
1625         private final LocalPrimaryShardFound localPrimaryShardFound;
1626
1627         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage, @Nonnull Object primaryFoundMessage) {
1628             this.shardName = Preconditions.checkNotNull(shardName);
1629             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1630             Preconditions.checkNotNull(primaryFoundMessage);
1631             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ? (RemotePrimaryShardFound) primaryFoundMessage : null;
1632             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ? (LocalPrimaryShardFound) primaryFoundMessage : null;
1633         }
1634
1635         @Nonnull
1636         public String getPrimaryPath(){
1637             if(remotePrimaryShardFound != null){
1638                 return remotePrimaryShardFound.getPrimaryPath();
1639             }
1640             return localPrimaryShardFound.getPrimaryPath();
1641         }
1642
1643         @Nonnull
1644         public Object getContextMessage() {
1645             return contextMessage;
1646         }
1647
1648         @Nullable
1649         public RemotePrimaryShardFound getRemotePrimaryShardFound(){
1650             return remotePrimaryShardFound;
1651         }
1652
1653         @Nullable
1654         public LocalPrimaryShardFound getLocalPrimaryShardFound(){
1655             return localPrimaryShardFound;
1656         }
1657
1658         boolean isPrimaryLocal(){
1659             return (remotePrimaryShardFound == null);
1660         }
1661
1662         @Nonnull
1663         public String getShardName() {
1664             return shardName;
1665         }
1666     }
1667 }
1668
1669
1670