Bug 4823: Notify findPrimary callbacks on ReachableMember event
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorPath;
13 import akka.actor.ActorRef;
14 import akka.actor.Address;
15 import akka.actor.Cancellable;
16 import akka.actor.OneForOneStrategy;
17 import akka.actor.PoisonPill;
18 import akka.actor.Props;
19 import akka.actor.Status;
20 import akka.actor.SupervisorStrategy;
21 import akka.cluster.ClusterEvent;
22 import akka.dispatch.OnComplete;
23 import akka.japi.Function;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotOffer;
28 import akka.persistence.SnapshotSelectionCriteria;
29 import akka.serialization.Serialization;
30 import akka.util.Timeout;
31 import com.google.common.annotations.VisibleForTesting;
32 import com.google.common.base.Objects;
33 import com.google.common.base.Optional;
34 import com.google.common.base.Preconditions;
35 import com.google.common.base.Strings;
36 import com.google.common.base.Supplier;
37 import com.google.common.collect.Sets;
38 import java.io.ByteArrayInputStream;
39 import java.io.ObjectInputStream;
40 import java.io.Serializable;
41 import java.util.ArrayList;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.HashMap;
45 import java.util.HashSet;
46 import java.util.Iterator;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.Set;
50 import java.util.concurrent.CountDownLatch;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import javax.annotation.Nonnull;
54 import javax.annotation.Nullable;
55 import org.apache.commons.lang3.SerializationUtils;
56 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
57 import org.opendaylight.controller.cluster.datastore.config.Configuration;
58 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
59 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
60 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
61 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
63 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
64 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
65 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
66 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
67 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
68 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
69 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
70 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
71 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
72 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
73 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
75 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
76 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
77 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
78 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
79 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
80 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
81 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
82 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
83 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
84 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
85 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
86 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
87 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
88 import org.opendaylight.controller.cluster.raft.RaftState;
89 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
90 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
91 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
92 import org.opendaylight.controller.cluster.raft.messages.AddServer;
93 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
94 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
95 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
96 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
97 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
98 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
99 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
100 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
101 import org.slf4j.Logger;
102 import org.slf4j.LoggerFactory;
103 import scala.concurrent.Future;
104 import scala.concurrent.duration.Duration;
105 import scala.concurrent.duration.FiniteDuration;
106
107 /**
108  * The ShardManager has the following jobs,
109  * <ul>
110  * <li> Create all the local shard replicas that belong on this cluster member
111  * <li> Find the address of the local shard
112  * <li> Find the primary replica for any given shard
113  * <li> Monitor the cluster members and store their addresses
114  * <ul>
115  */
116 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
117
118     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
119
120     // Stores a mapping between a shard name and it's corresponding information
121     // Shard names look like inventory, topology etc and are as specified in
122     // configuration
123     private final Map<String, ShardInformation> localShards = new HashMap<>();
124
125     // The type of a ShardManager reflects the type of the datastore itself
126     // A data store could be of type config/operational
127     private final String type;
128
129     private final ClusterWrapper cluster;
130
131     private final Configuration configuration;
132
133     private final String shardDispatcherPath;
134
135     private final ShardManagerInfo mBean;
136
137     private DatastoreContextFactory datastoreContextFactory;
138
139     private final CountDownLatch waitTillReadyCountdownLatch;
140
141     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
142
143     private final ShardPeerAddressResolver peerAddressResolver;
144
145     private SchemaContext schemaContext;
146
147     private DatastoreSnapshot restoreFromSnapshot;
148
149     private ShardManagerSnapshot currentSnapshot;
150
151     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
152
153     private final String persistenceId;
154
155     /**
156      */
157     protected ShardManager(AbstractBuilder<?> builder) {
158
159         this.cluster = builder.cluster;
160         this.configuration = builder.configuration;
161         this.datastoreContextFactory = builder.datastoreContextFactory;
162         this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
163         this.shardDispatcherPath =
164                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
165         this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
166         this.primaryShardInfoCache = builder.primaryShardInfoCache;
167         this.restoreFromSnapshot = builder.restoreFromSnapshot;
168
169         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
170         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
171
172         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
173
174         // Subscribe this actor to cluster member events
175         cluster.subscribeToMemberEvents(getSelf());
176
177         List<String> localShardActorNames = new ArrayList<>();
178         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
179                 "shard-manager-" + this.type,
180                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
181                 localShardActorNames);
182         mBean.setShardManager(this);
183     }
184
185     @Override
186     public void postStop() {
187         LOG.info("Stopping ShardManager");
188
189         mBean.unregisterMBean();
190     }
191
192     @Override
193     public void handleCommand(Object message) throws Exception {
194         if (message  instanceof FindPrimary) {
195             findPrimary((FindPrimary)message);
196         } else if(message instanceof FindLocalShard){
197             findLocalShard((FindLocalShard) message);
198         } else if (message instanceof UpdateSchemaContext) {
199             updateSchemaContext(message);
200         } else if(message instanceof ActorInitialized) {
201             onActorInitialized(message);
202         } else if (message instanceof ClusterEvent.MemberUp){
203             memberUp((ClusterEvent.MemberUp) message);
204         } else if (message instanceof ClusterEvent.MemberExited){
205             memberExited((ClusterEvent.MemberExited) message);
206         } else if(message instanceof ClusterEvent.MemberRemoved) {
207             memberRemoved((ClusterEvent.MemberRemoved) message);
208         } else if(message instanceof ClusterEvent.UnreachableMember) {
209             memberUnreachable((ClusterEvent.UnreachableMember)message);
210         } else if(message instanceof ClusterEvent.ReachableMember) {
211             memberReachable((ClusterEvent.ReachableMember) message);
212         } else if(message instanceof DatastoreContextFactory) {
213             onDatastoreContextFactory((DatastoreContextFactory)message);
214         } else if(message instanceof RoleChangeNotification) {
215             onRoleChangeNotification((RoleChangeNotification) message);
216         } else if(message instanceof FollowerInitialSyncUpStatus){
217             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
218         } else if(message instanceof ShardNotInitializedTimeout) {
219             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
220         } else if(message instanceof ShardLeaderStateChanged) {
221             onLeaderStateChanged((ShardLeaderStateChanged) message);
222         } else if(message instanceof SwitchShardBehavior){
223             onSwitchShardBehavior((SwitchShardBehavior) message);
224         } else if(message instanceof CreateShard) {
225             onCreateShard((CreateShard)message);
226         } else if(message instanceof AddShardReplica){
227             onAddShardReplica((AddShardReplica)message);
228         } else if(message instanceof ForwardedAddServerReply) {
229             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
230             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
231                     msg.removeShardOnFailure);
232         } else if(message instanceof ForwardedAddServerFailure) {
233             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
234             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
235         } else if(message instanceof PrimaryShardFoundForContext) {
236             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
237             onPrimaryShardFoundContext(primaryShardFoundContext);
238         } else if(message instanceof RemoveShardReplica) {
239             onRemoveShardReplica((RemoveShardReplica) message);
240         } else if(message instanceof WrappedShardResponse){
241             onWrappedShardResponse((WrappedShardResponse) message);
242         } else if(message instanceof GetSnapshot) {
243             onGetSnapshot();
244         } else if(message instanceof ServerRemoved){
245             onShardReplicaRemoved((ServerRemoved) message);
246         } else if (message instanceof SaveSnapshotSuccess) {
247             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
248         } else if (message instanceof SaveSnapshotFailure) {
249             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
250                     persistenceId(), ((SaveSnapshotFailure) message).cause());
251         } else {
252             unknownMessage(message);
253         }
254     }
255
256     private void onWrappedShardResponse(WrappedShardResponse message) {
257         if (message.getResponse() instanceof RemoveServerReply) {
258             onRemoveServerReply(getSender(), message.getShardName(), (RemoveServerReply) message.getResponse());
259         }
260     }
261
262     private void onRemoveServerReply(ActorRef originalSender, String shardName, RemoveServerReply response) {
263         shardReplicaOperationsInProgress.remove(shardName);
264         originalSender.tell(new Status.Success(null), self());
265     }
266
267     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
268         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
269             addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
270                     getSender());
271         } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
272             removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
273                     primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
274         }
275     }
276
277     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
278             final ActorRef sender) {
279         if(isShardReplicaOperationInProgress(shardName, sender)) {
280             return;
281         }
282
283         shardReplicaOperationsInProgress.add(shardName);
284
285         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
286
287         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
288
289         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
290         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
291                 primaryPath, shardId);
292
293         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
294                 duration());
295         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
296                 new RemoveServer(shardId.toString()), removeServerTimeout);
297
298         futureObj.onComplete(new OnComplete<Object>() {
299             @Override
300             public void onComplete(Throwable failure, Object response) {
301                 if (failure != null) {
302                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
303                             primaryPath, shardName);
304
305                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
306
307                     // FAILURE
308                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
309                 } else {
310                     // SUCCESS
311                     self().tell(new WrappedShardResponse(shardName, response), sender);
312                 }
313             }
314         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
315     }
316
317     private void onShardReplicaRemoved(ServerRemoved message) {
318         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
319         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
320         if(shardInformation == null) {
321             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
322             return;
323         } else if(shardInformation.getActor() != null) {
324             LOG.debug("{} : Sending PoisonPill to Shard actor {}", persistenceId(), shardInformation.getActor());
325             shardInformation.getActor().tell(PoisonPill.getInstance(), self());
326         }
327         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
328         persistShardList();
329     }
330
331     private void onGetSnapshot() {
332         LOG.debug("{}: onGetSnapshot", persistenceId());
333
334         List<String> notInitialized = null;
335         for(ShardInformation shardInfo: localShards.values()) {
336             if(!shardInfo.isShardInitialized()) {
337                 if(notInitialized == null) {
338                     notInitialized = new ArrayList<>();
339                 }
340
341                 notInitialized.add(shardInfo.getShardName());
342             }
343         }
344
345         if(notInitialized != null) {
346             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
347                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
348             return;
349         }
350
351         byte[] shardManagerSnapshot = null;
352         if(currentSnapshot != null) {
353             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
354         }
355
356         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
357                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
358                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
359
360         for(ShardInformation shardInfo: localShards.values()) {
361             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
362         }
363     }
364
365     private void onCreateShard(CreateShard createShard) {
366         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
367
368         Object reply;
369         try {
370             String shardName = createShard.getModuleShardConfig().getShardName();
371             if(localShards.containsKey(shardName)) {
372                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
373                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
374             } else {
375                 doCreateShard(createShard);
376                 reply = new akka.actor.Status.Success(null);
377             }
378         } catch (Exception e) {
379             LOG.error("{}: onCreateShard failed", persistenceId(), e);
380             reply = new akka.actor.Status.Failure(e);
381         }
382
383         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
384             getSender().tell(reply, getSelf());
385         }
386     }
387
388     private void doCreateShard(CreateShard createShard) {
389         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
390         String shardName = moduleShardConfig.getShardName();
391
392         configuration.addModuleShardConfiguration(moduleShardConfig);
393
394         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
395         if(shardDatastoreContext == null) {
396             shardDatastoreContext = newShardDatastoreContext(shardName);
397         } else {
398             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
399                     peerAddressResolver).build();
400         }
401
402         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
403
404         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
405                 currentSnapshot.getShardList().contains(shardName);
406
407         Map<String, String> peerAddresses;
408         boolean isActiveMember;
409         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
410                 contains(cluster.getCurrentMemberName())) {
411             peerAddresses = getPeerAddresses(shardName);
412             isActiveMember = true;
413         } else {
414             // The local member is not in the static shard member configuration and the shard did not
415             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
416             // the shard with no peers and with elections disabled so it stays as follower. A
417             // subsequent AddServer request will be needed to make it an active member.
418             isActiveMember = false;
419             peerAddresses = Collections.emptyMap();
420             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
421                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
422         }
423
424         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
425                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
426                 isActiveMember);
427
428         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
429                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
430         info.setActiveMember(isActiveMember);
431         localShards.put(info.getShardName(), info);
432
433         mBean.addLocalShard(shardId.toString());
434
435         if(schemaContext != null) {
436             info.setActor(newShardActor(schemaContext, info));
437         }
438     }
439
440     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
441         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
442                 shardPeerAddressResolver(peerAddressResolver);
443     }
444
445     private DatastoreContext newShardDatastoreContext(String shardName) {
446         return newShardDatastoreContextBuilder(shardName).build();
447     }
448
449     private void checkReady(){
450         if (isReadyWithLeaderId()) {
451             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
452                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
453
454             waitTillReadyCountdownLatch.countDown();
455         }
456     }
457
458     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
459         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
460
461         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
462         if(shardInformation != null) {
463             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
464             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
465             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
466                 primaryShardInfoCache.remove(shardInformation.getShardName());
467             }
468
469             checkReady();
470         } else {
471             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
472         }
473     }
474
475     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
476         ShardInformation shardInfo = message.getShardInfo();
477
478         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
479                 shardInfo.getShardName());
480
481         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
482
483         if(!shardInfo.isShardInitialized()) {
484             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
485             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
486         } else {
487             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
488             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
489         }
490     }
491
492     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
493         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
494                 status.getName(), status.isInitialSyncDone());
495
496         ShardInformation shardInformation = findShardInformation(status.getName());
497
498         if(shardInformation != null) {
499             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
500
501             mBean.setSyncStatus(isInSync());
502         }
503
504     }
505
506     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
507         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
508                 roleChanged.getOldRole(), roleChanged.getNewRole());
509
510         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
511         if(shardInformation != null) {
512             shardInformation.setRole(roleChanged.getNewRole());
513             checkReady();
514             mBean.setSyncStatus(isInSync());
515         }
516     }
517
518
519     private ShardInformation findShardInformation(String memberId) {
520         for(ShardInformation info : localShards.values()){
521             if(info.getShardId().toString().equals(memberId)){
522                 return info;
523             }
524         }
525
526         return null;
527     }
528
529     private boolean isReadyWithLeaderId() {
530         boolean isReady = true;
531         for (ShardInformation info : localShards.values()) {
532             if(!info.isShardReadyWithLeaderId()){
533                 isReady = false;
534                 break;
535             }
536         }
537         return isReady;
538     }
539
540     private boolean isInSync(){
541         for (ShardInformation info : localShards.values()) {
542             if(!info.isInSync()){
543                 return false;
544             }
545         }
546         return true;
547     }
548
549     private void onActorInitialized(Object message) {
550         final ActorRef sender = getSender();
551
552         if (sender == null) {
553             return; //why is a non-actor sending this message? Just ignore.
554         }
555
556         String actorName = sender.path().name();
557         //find shard name from actor name; actor name is stringified shardId
558         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
559
560         if (shardId.getShardName() == null) {
561             return;
562         }
563
564         markShardAsInitialized(shardId.getShardName());
565     }
566
567     private void markShardAsInitialized(String shardName) {
568         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
569
570         ShardInformation shardInformation = localShards.get(shardName);
571         if (shardInformation != null) {
572             shardInformation.setActorInitialized();
573
574             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
575         }
576     }
577
578     @Override
579     protected void handleRecover(Object message) throws Exception {
580         if (message instanceof RecoveryCompleted) {
581             onRecoveryCompleted();
582         } else if (message instanceof SnapshotOffer) {
583             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
584         }
585     }
586
587     private void onRecoveryCompleted() {
588         LOG.info("Recovery complete : {}", persistenceId());
589
590         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
591         // journal on upgrade from Helium.
592         deleteMessages(lastSequenceNr());
593
594         if(currentSnapshot == null && restoreFromSnapshot != null &&
595                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
596             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
597                     restoreFromSnapshot.getShardManagerSnapshot()))) {
598                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
599
600                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
601
602                 applyShardManagerSnapshot(snapshot);
603             } catch(Exception e) {
604                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
605             }
606         }
607
608         createLocalShards();
609     }
610
611     private void findLocalShard(FindLocalShard message) {
612         final ShardInformation shardInformation = localShards.get(message.getShardName());
613
614         if(shardInformation == null){
615             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
616             return;
617         }
618
619         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
620             @Override
621             public Object get() {
622                 return new LocalShardFound(shardInformation.getActor());
623             }
624         });
625     }
626
627     private void sendResponse(ShardInformation shardInformation, boolean doWait,
628             boolean wantShardReady, final Supplier<Object> messageSupplier) {
629         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
630             if(doWait) {
631                 final ActorRef sender = getSender();
632                 final ActorRef self = self();
633
634                 Runnable replyRunnable = new Runnable() {
635                     @Override
636                     public void run() {
637                         sender.tell(messageSupplier.get(), self);
638                     }
639                 };
640
641                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
642                     new OnShardInitialized(replyRunnable);
643
644                 shardInformation.addOnShardInitialized(onShardInitialized);
645
646                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
647                 if(shardInformation.isShardInitialized()) {
648                     // If the shard is already initialized then we'll wait enough time for the shard to
649                     // elect a leader, ie 2 times the election timeout.
650                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
651                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
652                 }
653
654                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
655                         shardInformation.getShardName());
656
657                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
658                         timeout, getSelf(),
659                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
660                         getContext().dispatcher(), getSelf());
661
662                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
663
664             } else if (!shardInformation.isShardInitialized()) {
665                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
666                         shardInformation.getShardName());
667                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
668             } else {
669                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
670                         shardInformation.getShardName());
671                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
672             }
673
674             return;
675         }
676
677         getSender().tell(messageSupplier.get(), getSelf());
678     }
679
680     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
681         return new NoShardLeaderException(null, shardId.toString());
682     }
683
684     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
685         return new NotInitializedException(String.format(
686                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
687     }
688
689     private void memberRemoved(ClusterEvent.MemberRemoved message) {
690         String memberName = message.member().roles().head();
691
692         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
693                 message.member().address());
694
695         peerAddressResolver.removePeerAddress(memberName);
696
697         for(ShardInformation info : localShards.values()){
698             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
699         }
700     }
701
702     private void memberExited(ClusterEvent.MemberExited message) {
703         String memberName = message.member().roles().head();
704
705         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
706                 message.member().address());
707
708         peerAddressResolver.removePeerAddress(memberName);
709
710         for(ShardInformation info : localShards.values()){
711             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
712         }
713     }
714
715     private void memberUp(ClusterEvent.MemberUp message) {
716         String memberName = message.member().roles().head();
717
718         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
719                 message.member().address());
720
721         addPeerAddress(memberName, message.member().address());
722
723         checkReady();
724     }
725
726     private void addPeerAddress(String memberName, Address address) {
727         peerAddressResolver.addPeerAddress(memberName, address);
728
729         for(ShardInformation info : localShards.values()){
730             String shardName = info.getShardName();
731             String peerId = getShardIdentifier(memberName, shardName).toString();
732             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
733
734             info.peerUp(memberName, peerId, getSelf());
735         }
736     }
737
738     private void memberReachable(ClusterEvent.ReachableMember message) {
739         String memberName = message.member().roles().head();
740         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
741
742         addPeerAddress(memberName, message.member().address());
743
744         markMemberAvailable(memberName);
745     }
746
747     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
748         String memberName = message.member().roles().head();
749         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
750
751         markMemberUnavailable(memberName);
752     }
753
754     private void markMemberUnavailable(final String memberName) {
755         for(ShardInformation info : localShards.values()){
756             String leaderId = info.getLeaderId();
757             if(leaderId != null && leaderId.contains(memberName)) {
758                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
759                 info.setLeaderAvailable(false);
760
761                 primaryShardInfoCache.remove(info.getShardName());
762             }
763
764             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
765         }
766     }
767
768     private void markMemberAvailable(final String memberName) {
769         for(ShardInformation info : localShards.values()){
770             String leaderId = info.getLeaderId();
771             if(leaderId != null && leaderId.contains(memberName)) {
772                 LOG.debug("Marking Leader {} as available.", leaderId);
773                 info.setLeaderAvailable(true);
774             }
775
776             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
777         }
778     }
779
780     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
781         datastoreContextFactory = factory;
782         for (ShardInformation info : localShards.values()) {
783             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
784         }
785     }
786
787     private void onSwitchShardBehavior(SwitchShardBehavior message) {
788         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
789
790         ShardInformation shardInformation = localShards.get(identifier.getShardName());
791
792         if(shardInformation != null && shardInformation.getActor() != null) {
793             shardInformation.getActor().tell(
794                     new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
795         } else {
796             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
797                     message.getShardName(), message.getNewState());
798         }
799     }
800
801     /**
802      * Notifies all the local shards of a change in the schema context
803      *
804      * @param message
805      */
806     private void updateSchemaContext(final Object message) {
807         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
808
809         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
810
811         for (ShardInformation info : localShards.values()) {
812             if (info.getActor() == null) {
813                 LOG.debug("Creating Shard {}", info.getShardId());
814                 info.setActor(newShardActor(schemaContext, info));
815             } else {
816                 info.getActor().tell(message, getSelf());
817             }
818         }
819     }
820
821     @VisibleForTesting
822     protected ClusterWrapper getCluster() {
823         return cluster;
824     }
825
826     @VisibleForTesting
827     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
828         return getContext().actorOf(info.newProps(schemaContext)
829                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
830     }
831
832     private void findPrimary(FindPrimary message) {
833         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
834
835         final String shardName = message.getShardName();
836         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
837
838         // First see if the there is a local replica for the shard
839         final ShardInformation info = localShards.get(shardName);
840         if (info != null && info.isActiveMember()) {
841             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
842                 @Override
843                 public Object get() {
844                     String primaryPath = info.getSerializedLeaderActor();
845                     Object found = canReturnLocalShardState && info.isLeader() ?
846                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
847                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
848
849                             if(LOG.isDebugEnabled()) {
850                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
851                             }
852
853                             return found;
854                 }
855             });
856
857             return;
858         }
859
860         Collection<String> visitedAddresses;
861         if(message instanceof RemoteFindPrimary) {
862             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
863         } else {
864             visitedAddresses = new ArrayList<>();
865         }
866
867         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
868
869         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
870             if(visitedAddresses.contains(address)) {
871                 continue;
872             }
873
874             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
875                     shardName, address);
876
877             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
878                     message.isWaitUntilReady(), visitedAddresses), getContext());
879             return;
880         }
881
882         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
883
884         getSender().tell(new PrimaryNotFoundException(
885                 String.format("No primary shard found for %s.", shardName)), getSelf());
886     }
887
888     /**
889      * Construct the name of the shard actor given the name of the member on
890      * which the shard resides and the name of the shard
891      *
892      * @param memberName
893      * @param shardName
894      * @return
895      */
896     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
897         return peerAddressResolver.getShardIdentifier(memberName, shardName);
898     }
899
900     /**
901      * Create shards that are local to the member on which the ShardManager
902      * runs
903      *
904      */
905     private void createLocalShards() {
906         String memberName = this.cluster.getCurrentMemberName();
907         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
908
909         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
910         if(restoreFromSnapshot != null)
911         {
912             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
913                 shardSnapshots.put(snapshot.getName(), snapshot);
914             }
915         }
916
917         restoreFromSnapshot = null; // null out to GC
918
919         for(String shardName : memberShardNames){
920             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
921
922             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
923
924             Map<String, String> peerAddresses = getPeerAddresses(shardName);
925             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
926                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
927                         shardSnapshots.get(shardName)), peerAddressResolver));
928             mBean.addLocalShard(shardId.toString());
929         }
930     }
931
932     /**
933      * Given the name of the shard find the addresses of all it's peers
934      *
935      * @param shardName
936      */
937     private Map<String, String> getPeerAddresses(String shardName) {
938         Collection<String> members = configuration.getMembersFromShardName(shardName);
939         Map<String, String> peerAddresses = new HashMap<>();
940
941         String currentMemberName = this.cluster.getCurrentMemberName();
942
943         for(String memberName : members) {
944             if(!currentMemberName.equals(memberName)) {
945                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
946                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
947                 peerAddresses.put(shardId.toString(), address);
948             }
949         }
950         return peerAddresses;
951     }
952
953     @Override
954     public SupervisorStrategy supervisorStrategy() {
955
956         return new OneForOneStrategy(10, Duration.create("1 minute"),
957                 new Function<Throwable, SupervisorStrategy.Directive>() {
958             @Override
959             public SupervisorStrategy.Directive apply(Throwable t) {
960                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
961                 return SupervisorStrategy.resume();
962             }
963         }
964                 );
965
966     }
967
968     @Override
969     public String persistenceId() {
970         return persistenceId;
971     }
972
973     @VisibleForTesting
974     ShardManagerInfoMBean getMBean(){
975         return mBean;
976     }
977
978     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
979         if (shardReplicaOperationsInProgress.contains(shardName)) {
980             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
981             LOG.debug ("{}: {}", persistenceId(), msg);
982             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
983             return true;
984         }
985
986         return false;
987     }
988
989     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
990         final String shardName = shardReplicaMsg.getShardName();
991
992         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
993
994         // verify the shard with the specified name is present in the cluster configuration
995         if (!(this.configuration.isShardConfigured(shardName))) {
996             String msg = String.format("No module configuration exists for shard %s", shardName);
997             LOG.debug ("{}: {}", persistenceId(), msg);
998             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
999             return;
1000         }
1001
1002         // Create the localShard
1003         if (schemaContext == null) {
1004             String msg = String.format(
1005                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1006             LOG.debug ("{}: {}", persistenceId(), msg);
1007             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1008             return;
1009         }
1010
1011         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1012             @Override
1013             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1014                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1015             }
1016
1017             @Override
1018             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1019                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1020             }
1021
1022         });
1023     }
1024
1025     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1026         String msg = String.format("Local shard %s already exists", shardName);
1027         LOG.debug ("{}: {}", persistenceId(), msg);
1028         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
1029     }
1030
1031     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1032         if(isShardReplicaOperationInProgress(shardName, sender)) {
1033             return;
1034         }
1035
1036         shardReplicaOperationsInProgress.add(shardName);
1037
1038         final ShardInformation shardInfo;
1039         final boolean removeShardOnFailure;
1040         ShardInformation existingShardInfo = localShards.get(shardName);
1041         if(existingShardInfo == null) {
1042             removeShardOnFailure = true;
1043             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1044
1045             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1046                     DisableElectionsRaftPolicy.class.getName()).build();
1047
1048             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1049                     Shard.builder(), peerAddressResolver);
1050             shardInfo.setActiveMember(false);
1051             localShards.put(shardName, shardInfo);
1052             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1053         } else {
1054             removeShardOnFailure = false;
1055             shardInfo = existingShardInfo;
1056         }
1057
1058         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1059
1060         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1061         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1062                 response.getPrimaryPath(), shardInfo.getShardId());
1063
1064         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1065                 duration());
1066         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1067             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1068
1069         futureObj.onComplete(new OnComplete<Object>() {
1070             @Override
1071             public void onComplete(Throwable failure, Object addServerResponse) {
1072                 if (failure != null) {
1073                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1074                             response.getPrimaryPath(), shardName, failure);
1075
1076                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1077                             response.getPrimaryPath(), shardName);
1078                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1079                 } else {
1080                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1081                             response.getPrimaryPath(), removeShardOnFailure), sender);
1082                 }
1083             }
1084         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1085     }
1086
1087     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1088             boolean removeShardOnFailure) {
1089         shardReplicaOperationsInProgress.remove(shardName);
1090
1091         if(removeShardOnFailure) {
1092             ShardInformation shardInfo = localShards.remove(shardName);
1093             if (shardInfo.getActor() != null) {
1094                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1095             }
1096         }
1097
1098         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
1099             new RuntimeException(message, failure)), getSelf());
1100     }
1101
1102     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1103             String leaderPath, boolean removeShardOnFailure) {
1104         String shardName = shardInfo.getShardName();
1105         shardReplicaOperationsInProgress.remove(shardName);
1106
1107         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1108
1109         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1110             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1111
1112             // Make the local shard voting capable
1113             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1114             shardInfo.setActiveMember(true);
1115             persistShardList();
1116
1117             mBean.addLocalShard(shardInfo.getShardId().toString());
1118             sender.tell(new akka.actor.Status.Success(null), getSelf());
1119         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1120             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1121         } else {
1122             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1123                     persistenceId(), shardName, replyMsg.getStatus());
1124
1125             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1126
1127             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1128         }
1129     }
1130
1131     private Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1132                                                String leaderPath, ShardIdentifier shardId) {
1133         Exception failure;
1134         switch (serverChangeStatus) {
1135             case TIMEOUT:
1136                 failure = new TimeoutException(String.format(
1137                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1138                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1139                         leaderPath, shardId.getShardName()));
1140                 break;
1141             case NO_LEADER:
1142                 failure = createNoShardLeaderException(shardId);
1143                 break;
1144             case NOT_SUPPORTED:
1145                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1146                         serverChange.getSimpleName(), shardId.getShardName()));
1147                 break;
1148             default :
1149                 failure = new RuntimeException(String.format(
1150                         "%s request to leader %s for shard %s failed with status %s",
1151                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1152         }
1153         return failure;
1154     }
1155
1156     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1157         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1158
1159         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1160                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1161             @Override
1162             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1163                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1164             }
1165
1166             @Override
1167             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1168                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1169             }
1170         });
1171     }
1172
1173     private void persistShardList() {
1174         List<String> shardList = new ArrayList<>(localShards.keySet());
1175         for (ShardInformation shardInfo : localShards.values()) {
1176             if (!shardInfo.isActiveMember()) {
1177                 shardList.remove(shardInfo.getShardName());
1178             }
1179         }
1180         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1181         saveSnapshot(updateShardManagerSnapshot(shardList));
1182     }
1183
1184     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1185         currentSnapshot = new ShardManagerSnapshot(shardList);
1186         return currentSnapshot;
1187     }
1188
1189     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1190         currentSnapshot = snapshot;
1191
1192         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1193
1194         String currentMember = cluster.getCurrentMemberName();
1195         Set<String> configuredShardList =
1196             new HashSet<>(configuration.getMemberShardNames(currentMember));
1197         for (String shard : currentSnapshot.getShardList()) {
1198             if (!configuredShardList.contains(shard)) {
1199                 // add the current member as a replica for the shard
1200                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1201                 configuration.addMemberReplicaForShard(shard, currentMember);
1202             } else {
1203                 configuredShardList.remove(shard);
1204             }
1205         }
1206         for (String shard : configuredShardList) {
1207             // remove the member as a replica for the shard
1208             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1209             configuration.removeMemberReplicaForShard(shard, currentMember);
1210         }
1211     }
1212
1213     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1214         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1215             persistenceId());
1216         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
1217     }
1218
1219     private static class ForwardedAddServerReply {
1220         ShardInformation shardInfo;
1221         AddServerReply addServerReply;
1222         String leaderPath;
1223         boolean removeShardOnFailure;
1224
1225         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1226                 boolean removeShardOnFailure) {
1227             this.shardInfo = shardInfo;
1228             this.addServerReply = addServerReply;
1229             this.leaderPath = leaderPath;
1230             this.removeShardOnFailure = removeShardOnFailure;
1231         }
1232     }
1233
1234     private static class ForwardedAddServerFailure {
1235         String shardName;
1236         String failureMessage;
1237         Throwable failure;
1238         boolean removeShardOnFailure;
1239
1240         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1241                 boolean removeShardOnFailure) {
1242             this.shardName = shardName;
1243             this.failureMessage = failureMessage;
1244             this.failure = failure;
1245             this.removeShardOnFailure = removeShardOnFailure;
1246         }
1247     }
1248
1249     @VisibleForTesting
1250     protected static class ShardInformation {
1251         private final ShardIdentifier shardId;
1252         private final String shardName;
1253         private ActorRef actor;
1254         private ActorPath actorPath;
1255         private final Map<String, String> initialPeerAddresses;
1256         private Optional<DataTree> localShardDataTree;
1257         private boolean leaderAvailable = false;
1258
1259         // flag that determines if the actor is ready for business
1260         private boolean actorInitialized = false;
1261
1262         private boolean followerSyncStatus = false;
1263
1264         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1265         private String role ;
1266         private String leaderId;
1267         private short leaderVersion;
1268
1269         private DatastoreContext datastoreContext;
1270         private Shard.AbstractBuilder<?, ?> builder;
1271         private final ShardPeerAddressResolver addressResolver;
1272         private boolean isActiveMember = true;
1273
1274         private ShardInformation(String shardName, ShardIdentifier shardId,
1275                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1276                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1277             this.shardName = shardName;
1278             this.shardId = shardId;
1279             this.initialPeerAddresses = initialPeerAddresses;
1280             this.datastoreContext = datastoreContext;
1281             this.builder = builder;
1282             this.addressResolver = addressResolver;
1283         }
1284
1285         Props newProps(SchemaContext schemaContext) {
1286             Preconditions.checkNotNull(builder);
1287             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1288                     schemaContext(schemaContext).props();
1289             builder = null;
1290             return props;
1291         }
1292
1293         String getShardName() {
1294             return shardName;
1295         }
1296
1297         @Nullable
1298         ActorRef getActor(){
1299             return actor;
1300         }
1301
1302         ActorPath getActorPath() {
1303             return actorPath;
1304         }
1305
1306         void setActor(ActorRef actor) {
1307             this.actor = actor;
1308             this.actorPath = actor.path();
1309         }
1310
1311         ShardIdentifier getShardId() {
1312             return shardId;
1313         }
1314
1315         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1316             this.localShardDataTree = localShardDataTree;
1317         }
1318
1319         Optional<DataTree> getLocalShardDataTree() {
1320             return localShardDataTree;
1321         }
1322
1323         DatastoreContext getDatastoreContext() {
1324             return datastoreContext;
1325         }
1326
1327         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1328             this.datastoreContext = datastoreContext;
1329             if (actor != null) {
1330                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1331                 actor.tell(this.datastoreContext, sender);
1332             }
1333         }
1334
1335         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1336             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1337
1338             if(actor != null) {
1339                 if(LOG.isDebugEnabled()) {
1340                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1341                             peerId, peerAddress, actor.path());
1342                 }
1343
1344                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1345             }
1346
1347             notifyOnShardInitializedCallbacks();
1348         }
1349
1350         void peerDown(String memberName, String peerId, ActorRef sender) {
1351             if(actor != null) {
1352                 actor.tell(new PeerDown(memberName, peerId), sender);
1353             }
1354         }
1355
1356         void peerUp(String memberName, String peerId, ActorRef sender) {
1357             if(actor != null) {
1358                 actor.tell(new PeerUp(memberName, peerId), sender);
1359             }
1360         }
1361
1362         boolean isShardReady() {
1363             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1364         }
1365
1366         boolean isShardReadyWithLeaderId() {
1367             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1368                     (isLeader() || addressResolver.resolve(leaderId) != null);
1369         }
1370
1371         boolean isShardInitialized() {
1372             return getActor() != null && actorInitialized;
1373         }
1374
1375         boolean isLeader() {
1376             return Objects.equal(leaderId, shardId.toString());
1377         }
1378
1379         String getSerializedLeaderActor() {
1380             if(isLeader()) {
1381                 return Serialization.serializedActorPath(getActor());
1382             } else {
1383                 return addressResolver.resolve(leaderId);
1384             }
1385         }
1386
1387         void setActorInitialized() {
1388             LOG.debug("Shard {} is initialized", shardId);
1389
1390             this.actorInitialized = true;
1391
1392             notifyOnShardInitializedCallbacks();
1393         }
1394
1395         private void notifyOnShardInitializedCallbacks() {
1396             if(onShardInitializedSet.isEmpty()) {
1397                 return;
1398             }
1399
1400             boolean ready = isShardReadyWithLeaderId();
1401
1402             if(LOG.isDebugEnabled()) {
1403                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1404                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1405             }
1406
1407             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1408             while(iter.hasNext()) {
1409                 OnShardInitialized onShardInitialized = iter.next();
1410                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1411                     iter.remove();
1412                     onShardInitialized.getTimeoutSchedule().cancel();
1413                     onShardInitialized.getReplyRunnable().run();
1414                 }
1415             }
1416         }
1417
1418         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1419             onShardInitializedSet.add(onShardInitialized);
1420         }
1421
1422         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1423             onShardInitializedSet.remove(onShardInitialized);
1424         }
1425
1426         void setRole(String newRole) {
1427             this.role = newRole;
1428
1429             notifyOnShardInitializedCallbacks();
1430         }
1431
1432         void setFollowerSyncStatus(boolean syncStatus){
1433             this.followerSyncStatus = syncStatus;
1434         }
1435
1436         boolean isInSync(){
1437             if(RaftState.Follower.name().equals(this.role)){
1438                 return followerSyncStatus;
1439             } else if(RaftState.Leader.name().equals(this.role)){
1440                 return true;
1441             }
1442
1443             return false;
1444         }
1445
1446         boolean setLeaderId(String leaderId) {
1447             boolean changed = !Objects.equal(this.leaderId, leaderId);
1448             this.leaderId = leaderId;
1449             if(leaderId != null) {
1450                 this.leaderAvailable = true;
1451             }
1452             notifyOnShardInitializedCallbacks();
1453
1454             return changed;
1455         }
1456
1457         String getLeaderId() {
1458             return leaderId;
1459         }
1460
1461         void setLeaderAvailable(boolean leaderAvailable) {
1462             this.leaderAvailable = leaderAvailable;
1463
1464             if(leaderAvailable) {
1465                 notifyOnShardInitializedCallbacks();
1466             }
1467         }
1468
1469         short getLeaderVersion() {
1470             return leaderVersion;
1471         }
1472
1473         void setLeaderVersion(short leaderVersion) {
1474             this.leaderVersion = leaderVersion;
1475         }
1476
1477         boolean isActiveMember() {
1478             return isActiveMember;
1479         }
1480
1481         void setActiveMember(boolean isActiveMember) {
1482             this.isActiveMember = isActiveMember;
1483         }
1484     }
1485
1486     private static class OnShardInitialized {
1487         private final Runnable replyRunnable;
1488         private Cancellable timeoutSchedule;
1489
1490         OnShardInitialized(Runnable replyRunnable) {
1491             this.replyRunnable = replyRunnable;
1492         }
1493
1494         Runnable getReplyRunnable() {
1495             return replyRunnable;
1496         }
1497
1498         Cancellable getTimeoutSchedule() {
1499             return timeoutSchedule;
1500         }
1501
1502         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1503             this.timeoutSchedule = timeoutSchedule;
1504         }
1505     }
1506
1507     private static class OnShardReady extends OnShardInitialized {
1508         OnShardReady(Runnable replyRunnable) {
1509             super(replyRunnable);
1510         }
1511     }
1512
1513     private static class ShardNotInitializedTimeout {
1514         private final ActorRef sender;
1515         private final ShardInformation shardInfo;
1516         private final OnShardInitialized onShardInitialized;
1517
1518         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1519             this.sender = sender;
1520             this.shardInfo = shardInfo;
1521             this.onShardInitialized = onShardInitialized;
1522         }
1523
1524         ActorRef getSender() {
1525             return sender;
1526         }
1527
1528         ShardInformation getShardInfo() {
1529             return shardInfo;
1530         }
1531
1532         OnShardInitialized getOnShardInitialized() {
1533             return onShardInitialized;
1534         }
1535     }
1536
1537     /**
1538      * We no longer persist SchemaContextModules but keep this class around for now for backwards
1539      * compatibility so we don't get de-serialization failures on upgrade from Helium.
1540      */
1541     @Deprecated
1542     static class SchemaContextModules implements Serializable {
1543         private static final long serialVersionUID = -8884620101025936590L;
1544
1545         private final Set<String> modules;
1546
1547         SchemaContextModules(Set<String> modules){
1548             this.modules = modules;
1549         }
1550
1551         public Set<String> getModules() {
1552             return modules;
1553         }
1554     }
1555
1556     public static Builder builder() {
1557         return new Builder();
1558     }
1559
1560     public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
1561         private ClusterWrapper cluster;
1562         private Configuration configuration;
1563         private DatastoreContextFactory datastoreContextFactory;
1564         private CountDownLatch waitTillReadyCountdownLatch;
1565         private PrimaryShardInfoFutureCache primaryShardInfoCache;
1566         private DatastoreSnapshot restoreFromSnapshot;
1567
1568         private volatile boolean sealed;
1569
1570         @SuppressWarnings("unchecked")
1571         private T self() {
1572             return (T) this;
1573         }
1574
1575         protected void checkSealed() {
1576             Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
1577         }
1578
1579         public T cluster(ClusterWrapper cluster) {
1580             checkSealed();
1581             this.cluster = cluster;
1582             return self();
1583         }
1584
1585         public T configuration(Configuration configuration) {
1586             checkSealed();
1587             this.configuration = configuration;
1588             return self();
1589         }
1590
1591         public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
1592             checkSealed();
1593             this.datastoreContextFactory = datastoreContextFactory;
1594             return self();
1595         }
1596
1597         public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
1598             checkSealed();
1599             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
1600             return self();
1601         }
1602
1603         public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
1604             checkSealed();
1605             this.primaryShardInfoCache = primaryShardInfoCache;
1606             return self();
1607         }
1608
1609         public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
1610             checkSealed();
1611             this.restoreFromSnapshot = restoreFromSnapshot;
1612             return self();
1613         }
1614
1615         protected void verify() {
1616             sealed = true;
1617             Preconditions.checkNotNull(cluster, "cluster should not be null");
1618             Preconditions.checkNotNull(configuration, "configuration should not be null");
1619             Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
1620             Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
1621             Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
1622         }
1623
1624         public Props props() {
1625             verify();
1626             return Props.create(ShardManager.class, this);
1627         }
1628     }
1629
1630     public static class Builder extends AbstractBuilder<Builder> {
1631     }
1632
1633     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1634         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1635                 getShardInitializationTimeout().duration().$times(2));
1636
1637
1638         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1639         futureObj.onComplete(new OnComplete<Object>() {
1640             @Override
1641             public void onComplete(Throwable failure, Object response) {
1642                 if (failure != null) {
1643                     handler.onFailure(failure);
1644                 } else {
1645                     if(response instanceof RemotePrimaryShardFound) {
1646                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1647                     } else if(response instanceof LocalPrimaryShardFound) {
1648                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1649                     } else {
1650                         handler.onUnknownResponse(response);
1651                     }
1652                 }
1653             }
1654         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1655     }
1656
1657     /**
1658      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1659      * a remote or local find primary message is processed
1660      */
1661     private static interface FindPrimaryResponseHandler {
1662         /**
1663          * Invoked when a Failure message is received as a response
1664          *
1665          * @param failure
1666          */
1667         void onFailure(Throwable failure);
1668
1669         /**
1670          * Invoked when a RemotePrimaryShardFound response is received
1671          *
1672          * @param response
1673          */
1674         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1675
1676         /**
1677          * Invoked when a LocalPrimaryShardFound response is received
1678          * @param response
1679          */
1680         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1681
1682         /**
1683          * Invoked when an unknown response is received. This is another type of failure.
1684          *
1685          * @param response
1686          */
1687         void onUnknownResponse(Object response);
1688     }
1689
1690     /**
1691      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1692      * replica and sends a wrapped Failure response to some targetActor
1693      */
1694     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1695         private final ActorRef targetActor;
1696         private final String shardName;
1697         private final String persistenceId;
1698         private final ActorRef shardManagerActor;
1699
1700         /**
1701          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1702          * @param shardName The name of the shard for which the primary replica had to be found
1703          * @param persistenceId The persistenceId for the ShardManager
1704          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1705          */
1706         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1707             this.targetActor = Preconditions.checkNotNull(targetActor);
1708             this.shardName = Preconditions.checkNotNull(shardName);
1709             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1710             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1711         }
1712
1713         public ActorRef getTargetActor() {
1714             return targetActor;
1715         }
1716
1717         public String getShardName() {
1718             return shardName;
1719         }
1720
1721         @Override
1722         public void onFailure(Throwable failure) {
1723             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1724             targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1725                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1726         }
1727
1728         @Override
1729         public void onUnknownResponse(Object response) {
1730             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1731                     shardName, response);
1732             LOG.debug ("{}: {}", persistenceId, msg);
1733             targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1734                     new RuntimeException(msg)), shardManagerActor);
1735         }
1736     }
1737
1738
1739     /**
1740      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1741      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1742      * as a successful response to find primary.
1743      */
1744     private static class PrimaryShardFoundForContext {
1745         private final String shardName;
1746         private final Object contextMessage;
1747         private final RemotePrimaryShardFound remotePrimaryShardFound;
1748         private final LocalPrimaryShardFound localPrimaryShardFound;
1749
1750         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
1751                 @Nonnull Object primaryFoundMessage) {
1752             this.shardName = Preconditions.checkNotNull(shardName);
1753             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1754             Preconditions.checkNotNull(primaryFoundMessage);
1755             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
1756                     (RemotePrimaryShardFound) primaryFoundMessage : null;
1757             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
1758                     (LocalPrimaryShardFound) primaryFoundMessage : null;
1759         }
1760
1761         @Nonnull
1762         String getPrimaryPath(){
1763             if(remotePrimaryShardFound != null) {
1764                 return remotePrimaryShardFound.getPrimaryPath();
1765             }
1766             return localPrimaryShardFound.getPrimaryPath();
1767         }
1768
1769         @Nonnull
1770         Object getContextMessage() {
1771             return contextMessage;
1772         }
1773
1774         @Nullable
1775         RemotePrimaryShardFound getRemotePrimaryShardFound() {
1776             return remotePrimaryShardFound;
1777         }
1778
1779         @Nonnull
1780         String getShardName() {
1781             return shardName;
1782         }
1783     }
1784
1785     /**
1786      * The WrappedShardResponse class wraps a response from a Shard.
1787      */
1788     private static class WrappedShardResponse {
1789         private final String shardName;
1790         private final Object response;
1791
1792         private WrappedShardResponse(String shardName, Object response) {
1793             this.shardName = shardName;
1794             this.response = response;
1795         }
1796
1797         String getShardName() {
1798             return shardName;
1799         }
1800
1801         Object getResponse() {
1802             return response;
1803         }
1804     }
1805 }
1806
1807
1808