Split out AbstractBuilder and rename it
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.Futures;
22 import akka.dispatch.OnComplete;
23 import akka.japi.Function;
24 import akka.pattern.Patterns;
25 import akka.persistence.RecoveryCompleted;
26 import akka.persistence.SaveSnapshotFailure;
27 import akka.persistence.SaveSnapshotSuccess;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.serialization.Serialization;
31 import akka.util.Timeout;
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.base.Objects;
34 import com.google.common.base.Optional;
35 import com.google.common.base.Preconditions;
36 import com.google.common.base.Strings;
37 import com.google.common.base.Supplier;
38 import com.google.common.collect.Sets;
39 import java.io.ByteArrayInputStream;
40 import java.io.ObjectInputStream;
41 import java.util.ArrayList;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.HashMap;
45 import java.util.HashSet;
46 import java.util.Iterator;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.Set;
50 import java.util.concurrent.CountDownLatch;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import javax.annotation.Nonnull;
54 import javax.annotation.Nullable;
55 import org.apache.commons.lang3.SerializationUtils;
56 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
57 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
58 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
59 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
60 import org.opendaylight.controller.cluster.datastore.Shard;
61 import org.opendaylight.controller.cluster.datastore.config.Configuration;
62 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
63 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
64 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
65 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
66 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
67 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
68 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
69 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
70 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
71 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
72 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
73 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
74 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
77 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
78 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
79 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
80 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
81 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
82 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
83 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
84 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
85 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
86 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
87 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
88 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
89 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
90 import org.opendaylight.controller.cluster.raft.RaftState;
91 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
92 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
93 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
94 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
95 import org.opendaylight.controller.cluster.raft.messages.AddServer;
96 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
97 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
98 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
99 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
100 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
101 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
102 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
103 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
104 import org.slf4j.Logger;
105 import org.slf4j.LoggerFactory;
106 import scala.concurrent.ExecutionContext;
107 import scala.concurrent.Future;
108 import scala.concurrent.duration.Duration;
109 import scala.concurrent.duration.FiniteDuration;
110
111 /**
112  * The ShardManager has the following jobs,
113  * <ul>
114  * <li> Create all the local shard replicas that belong on this cluster member
115  * <li> Find the address of the local shard
116  * <li> Find the primary replica for any given shard
117  * <li> Monitor the cluster members and store their addresses
118  * <ul>
119  */
120 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
121
122     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
123
124     // Stores a mapping between a shard name and it's corresponding information
125     // Shard names look like inventory, topology etc and are as specified in
126     // configuration
127     private final Map<String, ShardInformation> localShards = new HashMap<>();
128
129     // The type of a ShardManager reflects the type of the datastore itself
130     // A data store could be of type config/operational
131     private final String type;
132
133     private final ClusterWrapper cluster;
134
135     private final Configuration configuration;
136
137     private final String shardDispatcherPath;
138
139     private final ShardManagerInfo mBean;
140
141     private DatastoreContextFactory datastoreContextFactory;
142
143     private final CountDownLatch waitTillReadyCountdownLatch;
144
145     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
146
147     private final ShardPeerAddressResolver peerAddressResolver;
148
149     private SchemaContext schemaContext;
150
151     private DatastoreSnapshot restoreFromSnapshot;
152
153     private ShardManagerSnapshot currentSnapshot;
154
155     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
156
157     private final String persistenceId;
158
159     ShardManager(AbstractShardManagerCreator<?> builder) {
160         this.cluster = builder.getCluster();
161         this.configuration = builder.getConfiguration();
162         this.datastoreContextFactory = builder.getDdatastoreContextFactory();
163         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
164         this.shardDispatcherPath =
165                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
166         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
167         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
168         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
169
170         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
171         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
172
173         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
174
175         // Subscribe this actor to cluster member events
176         cluster.subscribeToMemberEvents(getSelf());
177
178         List<String> localShardActorNames = new ArrayList<>();
179         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
180                 "shard-manager-" + this.type,
181                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
182                 localShardActorNames);
183         mBean.setShardManager(this);
184     }
185
186     @Override
187     public void postStop() {
188         LOG.info("Stopping ShardManager {}", persistenceId());
189
190         mBean.unregisterMBean();
191     }
192
193     @Override
194     public void handleCommand(Object message) throws Exception {
195         if (message  instanceof FindPrimary) {
196             findPrimary((FindPrimary)message);
197         } else if(message instanceof FindLocalShard){
198             findLocalShard((FindLocalShard) message);
199         } else if (message instanceof UpdateSchemaContext) {
200             updateSchemaContext(message);
201         } else if(message instanceof ActorInitialized) {
202             onActorInitialized(message);
203         } else if (message instanceof ClusterEvent.MemberUp){
204             memberUp((ClusterEvent.MemberUp) message);
205         } else if (message instanceof ClusterEvent.MemberExited){
206             memberExited((ClusterEvent.MemberExited) message);
207         } else if(message instanceof ClusterEvent.MemberRemoved) {
208             memberRemoved((ClusterEvent.MemberRemoved) message);
209         } else if(message instanceof ClusterEvent.UnreachableMember) {
210             memberUnreachable((ClusterEvent.UnreachableMember)message);
211         } else if(message instanceof ClusterEvent.ReachableMember) {
212             memberReachable((ClusterEvent.ReachableMember) message);
213         } else if(message instanceof DatastoreContextFactory) {
214             onDatastoreContextFactory((DatastoreContextFactory)message);
215         } else if(message instanceof RoleChangeNotification) {
216             onRoleChangeNotification((RoleChangeNotification) message);
217         } else if(message instanceof FollowerInitialSyncUpStatus){
218             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
219         } else if(message instanceof ShardNotInitializedTimeout) {
220             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
221         } else if(message instanceof ShardLeaderStateChanged) {
222             onLeaderStateChanged((ShardLeaderStateChanged) message);
223         } else if(message instanceof SwitchShardBehavior){
224             onSwitchShardBehavior((SwitchShardBehavior) message);
225         } else if(message instanceof CreateShard) {
226             onCreateShard((CreateShard)message);
227         } else if(message instanceof AddShardReplica){
228             onAddShardReplica((AddShardReplica)message);
229         } else if(message instanceof ForwardedAddServerReply) {
230             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
231             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
232                     msg.removeShardOnFailure);
233         } else if(message instanceof ForwardedAddServerFailure) {
234             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
235             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
236         } else if(message instanceof PrimaryShardFoundForContext) {
237             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
238             onPrimaryShardFoundContext(primaryShardFoundContext);
239         } else if(message instanceof RemoveShardReplica) {
240             onRemoveShardReplica((RemoveShardReplica) message);
241         } else if(message instanceof WrappedShardResponse){
242             onWrappedShardResponse((WrappedShardResponse) message);
243         } else if(message instanceof GetSnapshot) {
244             onGetSnapshot();
245         } else if(message instanceof ServerRemoved){
246             onShardReplicaRemoved((ServerRemoved) message);
247         } else if(message instanceof SaveSnapshotSuccess) {
248             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
249         } else if(message instanceof SaveSnapshotFailure) {
250             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
251                     persistenceId(), ((SaveSnapshotFailure) message).cause());
252         } else if(message instanceof Shutdown) {
253             onShutDown();
254         } else {
255             unknownMessage(message);
256         }
257     }
258
259     private void onShutDown() {
260         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
261         for (ShardInformation info : localShards.values()) {
262             if (info.getActor() != null) {
263                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
264
265                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
266                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
267             }
268         }
269
270         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
271
272         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
273         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
274
275         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
276             @Override
277             public void onComplete(Throwable failure, Iterable<Boolean> results) {
278                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
279
280                 self().tell(PoisonPill.getInstance(), self());
281
282                 if(failure != null) {
283                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
284                 } else {
285                     int nfailed = 0;
286                     for(Boolean r: results) {
287                         if(!r) {
288                             nfailed++;
289                         }
290                     }
291
292                     if(nfailed > 0) {
293                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
294                     }
295                 }
296             }
297         }, dispatcher);
298     }
299
300     private void onWrappedShardResponse(WrappedShardResponse message) {
301         if (message.getResponse() instanceof RemoveServerReply) {
302             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
303                     message.getLeaderPath());
304         }
305     }
306
307     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
308             String leaderPath) {
309         shardReplicaOperationsInProgress.remove(shardId);
310
311         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
312
313         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
314             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
315                     shardId.getShardName());
316             originalSender.tell(new akka.actor.Status.Success(null), getSelf());
317         } else {
318             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
319                     persistenceId(), shardId, replyMsg.getStatus());
320
321             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
322                     leaderPath, shardId);
323             originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
324         }
325     }
326
327     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
328         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
329             addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
330                     getSender());
331         } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
332             removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
333                     primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
334         }
335     }
336
337     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
338             final ActorRef sender) {
339         if(isShardReplicaOperationInProgress(shardName, sender)) {
340             return;
341         }
342
343         shardReplicaOperationsInProgress.add(shardName);
344
345         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
346
347         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
348
349         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
350         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
351                 primaryPath, shardId);
352
353         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
354                 duration());
355         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
356                 new RemoveServer(shardId.toString()), removeServerTimeout);
357
358         futureObj.onComplete(new OnComplete<Object>() {
359             @Override
360             public void onComplete(Throwable failure, Object response) {
361                 if (failure != null) {
362                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
363                             primaryPath, shardName);
364
365                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
366
367                     // FAILURE
368                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
369                 } else {
370                     // SUCCESS
371                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
372                 }
373             }
374         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
375     }
376
377     private void onShardReplicaRemoved(ServerRemoved message) {
378         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
379         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
380         if(shardInformation == null) {
381             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
382             return;
383         } else if(shardInformation.getActor() != null) {
384             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
385             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
386         }
387         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
388         persistShardList();
389     }
390
391     private void onGetSnapshot() {
392         LOG.debug("{}: onGetSnapshot", persistenceId());
393
394         List<String> notInitialized = null;
395         for(ShardInformation shardInfo: localShards.values()) {
396             if(!shardInfo.isShardInitialized()) {
397                 if(notInitialized == null) {
398                     notInitialized = new ArrayList<>();
399                 }
400
401                 notInitialized.add(shardInfo.getShardName());
402             }
403         }
404
405         if(notInitialized != null) {
406             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
407                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
408             return;
409         }
410
411         byte[] shardManagerSnapshot = null;
412         if(currentSnapshot != null) {
413             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
414         }
415
416         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
417                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
418                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
419
420         for(ShardInformation shardInfo: localShards.values()) {
421             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
422         }
423     }
424
425     private void onCreateShard(CreateShard createShard) {
426         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
427
428         Object reply;
429         try {
430             String shardName = createShard.getModuleShardConfig().getShardName();
431             if(localShards.containsKey(shardName)) {
432                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
433                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
434             } else {
435                 doCreateShard(createShard);
436                 reply = new akka.actor.Status.Success(null);
437             }
438         } catch (Exception e) {
439             LOG.error("{}: onCreateShard failed", persistenceId(), e);
440             reply = new akka.actor.Status.Failure(e);
441         }
442
443         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
444             getSender().tell(reply, getSelf());
445         }
446     }
447
448     private void doCreateShard(CreateShard createShard) {
449         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
450         String shardName = moduleShardConfig.getShardName();
451
452         configuration.addModuleShardConfiguration(moduleShardConfig);
453
454         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
455         if(shardDatastoreContext == null) {
456             shardDatastoreContext = newShardDatastoreContext(shardName);
457         } else {
458             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
459                     peerAddressResolver).build();
460         }
461
462         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
463
464         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
465                 currentSnapshot.getShardList().contains(shardName);
466
467         Map<String, String> peerAddresses;
468         boolean isActiveMember;
469         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
470                 contains(cluster.getCurrentMemberName())) {
471             peerAddresses = getPeerAddresses(shardName);
472             isActiveMember = true;
473         } else {
474             // The local member is not in the static shard member configuration and the shard did not
475             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
476             // the shard with no peers and with elections disabled so it stays as follower. A
477             // subsequent AddServer request will be needed to make it an active member.
478             isActiveMember = false;
479             peerAddresses = Collections.emptyMap();
480             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
481                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
482         }
483
484         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
485                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
486                 isActiveMember);
487
488         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
489                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
490         info.setActiveMember(isActiveMember);
491         localShards.put(info.getShardName(), info);
492
493         mBean.addLocalShard(shardId.toString());
494
495         if(schemaContext != null) {
496             info.setActor(newShardActor(schemaContext, info));
497         }
498     }
499
500     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
501         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
502                 shardPeerAddressResolver(peerAddressResolver);
503     }
504
505     private DatastoreContext newShardDatastoreContext(String shardName) {
506         return newShardDatastoreContextBuilder(shardName).build();
507     }
508
509     private void checkReady(){
510         if (isReadyWithLeaderId()) {
511             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
512                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
513
514             waitTillReadyCountdownLatch.countDown();
515         }
516     }
517
518     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
519         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
520
521         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
522         if(shardInformation != null) {
523             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
524             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
525             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
526                 primaryShardInfoCache.remove(shardInformation.getShardName());
527             }
528
529             checkReady();
530         } else {
531             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
532         }
533     }
534
535     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
536         ShardInformation shardInfo = message.getShardInfo();
537
538         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
539                 shardInfo.getShardName());
540
541         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
542
543         if(!shardInfo.isShardInitialized()) {
544             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
545             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
546         } else {
547             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
548             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
549         }
550     }
551
552     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
553         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
554                 status.getName(), status.isInitialSyncDone());
555
556         ShardInformation shardInformation = findShardInformation(status.getName());
557
558         if(shardInformation != null) {
559             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
560
561             mBean.setSyncStatus(isInSync());
562         }
563
564     }
565
566     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
567         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
568                 roleChanged.getOldRole(), roleChanged.getNewRole());
569
570         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
571         if(shardInformation != null) {
572             shardInformation.setRole(roleChanged.getNewRole());
573             checkReady();
574             mBean.setSyncStatus(isInSync());
575         }
576     }
577
578
579     private ShardInformation findShardInformation(String memberId) {
580         for(ShardInformation info : localShards.values()){
581             if(info.getShardId().toString().equals(memberId)){
582                 return info;
583             }
584         }
585
586         return null;
587     }
588
589     private boolean isReadyWithLeaderId() {
590         boolean isReady = true;
591         for (ShardInformation info : localShards.values()) {
592             if(!info.isShardReadyWithLeaderId()){
593                 isReady = false;
594                 break;
595             }
596         }
597         return isReady;
598     }
599
600     private boolean isInSync(){
601         for (ShardInformation info : localShards.values()) {
602             if(!info.isInSync()){
603                 return false;
604             }
605         }
606         return true;
607     }
608
609     private void onActorInitialized(Object message) {
610         final ActorRef sender = getSender();
611
612         if (sender == null) {
613             return; //why is a non-actor sending this message? Just ignore.
614         }
615
616         String actorName = sender.path().name();
617         //find shard name from actor name; actor name is stringified shardId
618         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
619
620         if (shardId.getShardName() == null) {
621             return;
622         }
623
624         markShardAsInitialized(shardId.getShardName());
625     }
626
627     private void markShardAsInitialized(String shardName) {
628         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
629
630         ShardInformation shardInformation = localShards.get(shardName);
631         if (shardInformation != null) {
632             shardInformation.setActorInitialized();
633
634             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
635         }
636     }
637
638     @Override
639     protected void handleRecover(Object message) throws Exception {
640         if (message instanceof RecoveryCompleted) {
641             onRecoveryCompleted();
642         } else if (message instanceof SnapshotOffer) {
643             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
644         }
645     }
646
647     private void onRecoveryCompleted() {
648         LOG.info("Recovery complete : {}", persistenceId());
649
650         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
651         // journal on upgrade from Helium.
652         deleteMessages(lastSequenceNr());
653
654         if(currentSnapshot == null && restoreFromSnapshot != null &&
655                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
656             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
657                     restoreFromSnapshot.getShardManagerSnapshot()))) {
658                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
659
660                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
661
662                 applyShardManagerSnapshot(snapshot);
663             } catch(Exception e) {
664                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
665             }
666         }
667
668         createLocalShards();
669     }
670
671     private void findLocalShard(FindLocalShard message) {
672         final ShardInformation shardInformation = localShards.get(message.getShardName());
673
674         if(shardInformation == null){
675             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
676             return;
677         }
678
679         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
680             @Override
681             public Object get() {
682                 return new LocalShardFound(shardInformation.getActor());
683             }
684         });
685     }
686
687     private void sendResponse(ShardInformation shardInformation, boolean doWait,
688             boolean wantShardReady, final Supplier<Object> messageSupplier) {
689         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
690             if(doWait) {
691                 final ActorRef sender = getSender();
692                 final ActorRef self = self();
693
694                 Runnable replyRunnable = new Runnable() {
695                     @Override
696                     public void run() {
697                         sender.tell(messageSupplier.get(), self);
698                     }
699                 };
700
701                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
702                     new OnShardInitialized(replyRunnable);
703
704                 shardInformation.addOnShardInitialized(onShardInitialized);
705
706                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
707                 if(shardInformation.isShardInitialized()) {
708                     // If the shard is already initialized then we'll wait enough time for the shard to
709                     // elect a leader, ie 2 times the election timeout.
710                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
711                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
712                 }
713
714                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
715                         shardInformation.getShardName());
716
717                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
718                         timeout, getSelf(),
719                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
720                         getContext().dispatcher(), getSelf());
721
722                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
723
724             } else if (!shardInformation.isShardInitialized()) {
725                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
726                         shardInformation.getShardName());
727                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
728             } else {
729                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
730                         shardInformation.getShardName());
731                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
732             }
733
734             return;
735         }
736
737         getSender().tell(messageSupplier.get(), getSelf());
738     }
739
740     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
741         return new NoShardLeaderException(null, shardId.toString());
742     }
743
744     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
745         return new NotInitializedException(String.format(
746                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
747     }
748
749     private void memberRemoved(ClusterEvent.MemberRemoved message) {
750         String memberName = message.member().roles().iterator().next();
751
752         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
753                 message.member().address());
754
755         peerAddressResolver.removePeerAddress(memberName);
756
757         for(ShardInformation info : localShards.values()){
758             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
759         }
760     }
761
762     private void memberExited(ClusterEvent.MemberExited message) {
763         String memberName = message.member().roles().iterator().next();
764
765         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
766                 message.member().address());
767
768         peerAddressResolver.removePeerAddress(memberName);
769
770         for(ShardInformation info : localShards.values()){
771             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
772         }
773     }
774
775     private void memberUp(ClusterEvent.MemberUp message) {
776         String memberName = message.member().roles().iterator().next();
777
778         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
779                 message.member().address());
780
781         addPeerAddress(memberName, message.member().address());
782
783         checkReady();
784     }
785
786     private void addPeerAddress(String memberName, Address address) {
787         peerAddressResolver.addPeerAddress(memberName, address);
788
789         for(ShardInformation info : localShards.values()){
790             String shardName = info.getShardName();
791             String peerId = getShardIdentifier(memberName, shardName).toString();
792             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
793
794             info.peerUp(memberName, peerId, getSelf());
795         }
796     }
797
798     private void memberReachable(ClusterEvent.ReachableMember message) {
799         String memberName = message.member().roles().iterator().next();
800         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
801
802         addPeerAddress(memberName, message.member().address());
803
804         markMemberAvailable(memberName);
805     }
806
807     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
808         String memberName = message.member().roles().iterator().next();
809         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
810
811         markMemberUnavailable(memberName);
812     }
813
814     private void markMemberUnavailable(final String memberName) {
815         for(ShardInformation info : localShards.values()){
816             String leaderId = info.getLeaderId();
817             if(leaderId != null && leaderId.contains(memberName)) {
818                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
819                 info.setLeaderAvailable(false);
820
821                 primaryShardInfoCache.remove(info.getShardName());
822             }
823
824             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
825         }
826     }
827
828     private void markMemberAvailable(final String memberName) {
829         for(ShardInformation info : localShards.values()){
830             String leaderId = info.getLeaderId();
831             if(leaderId != null && leaderId.contains(memberName)) {
832                 LOG.debug("Marking Leader {} as available.", leaderId);
833                 info.setLeaderAvailable(true);
834             }
835
836             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
837         }
838     }
839
840     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
841         datastoreContextFactory = factory;
842         for (ShardInformation info : localShards.values()) {
843             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
844         }
845     }
846
847     private void onSwitchShardBehavior(SwitchShardBehavior message) {
848         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
849
850         ShardInformation shardInformation = localShards.get(identifier.getShardName());
851
852         if(shardInformation != null && shardInformation.getActor() != null) {
853             shardInformation.getActor().tell(
854                     new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
855         } else {
856             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
857                     message.getShardName(), message.getNewState());
858         }
859     }
860
861     /**
862      * Notifies all the local shards of a change in the schema context
863      *
864      * @param message
865      */
866     private void updateSchemaContext(final Object message) {
867         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
868
869         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
870
871         for (ShardInformation info : localShards.values()) {
872             if (info.getActor() == null) {
873                 LOG.debug("Creating Shard {}", info.getShardId());
874                 info.setActor(newShardActor(schemaContext, info));
875             } else {
876                 info.getActor().tell(message, getSelf());
877             }
878         }
879     }
880
881     @VisibleForTesting
882     protected ClusterWrapper getCluster() {
883         return cluster;
884     }
885
886     @VisibleForTesting
887     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
888         return getContext().actorOf(info.newProps(schemaContext)
889                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
890     }
891
892     private void findPrimary(FindPrimary message) {
893         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
894
895         final String shardName = message.getShardName();
896         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
897
898         // First see if the there is a local replica for the shard
899         final ShardInformation info = localShards.get(shardName);
900         if (info != null && info.isActiveMember()) {
901             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
902                 @Override
903                 public Object get() {
904                     String primaryPath = info.getSerializedLeaderActor();
905                     Object found = canReturnLocalShardState && info.isLeader() ?
906                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
907                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
908
909                             if(LOG.isDebugEnabled()) {
910                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
911                             }
912
913                             return found;
914                 }
915             });
916
917             return;
918         }
919
920         final Collection<String> visitedAddresses;
921         if(message instanceof RemoteFindPrimary) {
922             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
923         } else {
924             visitedAddresses = new ArrayList<>(1);
925         }
926
927         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
928
929         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
930             if(visitedAddresses.contains(address)) {
931                 continue;
932             }
933
934             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
935                     persistenceId(), shardName, address, visitedAddresses);
936
937             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
938                     message.isWaitUntilReady(), visitedAddresses), getContext());
939             return;
940         }
941
942         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
943
944         getSender().tell(new PrimaryNotFoundException(
945                 String.format("No primary shard found for %s.", shardName)), getSelf());
946     }
947
948     /**
949      * Construct the name of the shard actor given the name of the member on
950      * which the shard resides and the name of the shard
951      *
952      * @param memberName
953      * @param shardName
954      * @return
955      */
956     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
957         return peerAddressResolver.getShardIdentifier(memberName, shardName);
958     }
959
960     /**
961      * Create shards that are local to the member on which the ShardManager
962      * runs
963      *
964      */
965     private void createLocalShards() {
966         String memberName = this.cluster.getCurrentMemberName();
967         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
968
969         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
970         if(restoreFromSnapshot != null)
971         {
972             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
973                 shardSnapshots.put(snapshot.getName(), snapshot);
974             }
975         }
976
977         restoreFromSnapshot = null; // null out to GC
978
979         for(String shardName : memberShardNames){
980             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
981
982             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
983
984             Map<String, String> peerAddresses = getPeerAddresses(shardName);
985             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
986                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
987                         shardSnapshots.get(shardName)), peerAddressResolver));
988             mBean.addLocalShard(shardId.toString());
989         }
990     }
991
992     /**
993      * Given the name of the shard find the addresses of all it's peers
994      *
995      * @param shardName
996      */
997     private Map<String, String> getPeerAddresses(String shardName) {
998         Collection<String> members = configuration.getMembersFromShardName(shardName);
999         Map<String, String> peerAddresses = new HashMap<>();
1000
1001         String currentMemberName = this.cluster.getCurrentMemberName();
1002
1003         for(String memberName : members) {
1004             if(!currentMemberName.equals(memberName)) {
1005                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1006                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1007                 peerAddresses.put(shardId.toString(), address);
1008             }
1009         }
1010         return peerAddresses;
1011     }
1012
1013     @Override
1014     public SupervisorStrategy supervisorStrategy() {
1015
1016         return new OneForOneStrategy(10, Duration.create("1 minute"),
1017                 new Function<Throwable, SupervisorStrategy.Directive>() {
1018             @Override
1019             public SupervisorStrategy.Directive apply(Throwable t) {
1020                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1021                 return SupervisorStrategy.resume();
1022             }
1023         }
1024                 );
1025
1026     }
1027
1028     @Override
1029     public String persistenceId() {
1030         return persistenceId;
1031     }
1032
1033     @VisibleForTesting
1034     ShardManagerInfoMBean getMBean(){
1035         return mBean;
1036     }
1037
1038     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1039         if (shardReplicaOperationsInProgress.contains(shardName)) {
1040             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1041             LOG.debug ("{}: {}", persistenceId(), msg);
1042             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1043             return true;
1044         }
1045
1046         return false;
1047     }
1048
1049     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
1050         final String shardName = shardReplicaMsg.getShardName();
1051
1052         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1053
1054         // verify the shard with the specified name is present in the cluster configuration
1055         if (!(this.configuration.isShardConfigured(shardName))) {
1056             String msg = String.format("No module configuration exists for shard %s", shardName);
1057             LOG.debug ("{}: {}", persistenceId(), msg);
1058             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1059             return;
1060         }
1061
1062         // Create the localShard
1063         if (schemaContext == null) {
1064             String msg = String.format(
1065                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1066             LOG.debug ("{}: {}", persistenceId(), msg);
1067             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1068             return;
1069         }
1070
1071         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1072             @Override
1073             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1074                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1075             }
1076
1077             @Override
1078             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1079                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1080             }
1081
1082         });
1083     }
1084
1085     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1086         String msg = String.format("Local shard %s already exists", shardName);
1087         LOG.debug ("{}: {}", persistenceId(), msg);
1088         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
1089     }
1090
1091     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1092         if(isShardReplicaOperationInProgress(shardName, sender)) {
1093             return;
1094         }
1095
1096         shardReplicaOperationsInProgress.add(shardName);
1097
1098         final ShardInformation shardInfo;
1099         final boolean removeShardOnFailure;
1100         ShardInformation existingShardInfo = localShards.get(shardName);
1101         if(existingShardInfo == null) {
1102             removeShardOnFailure = true;
1103             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1104
1105             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1106                     DisableElectionsRaftPolicy.class.getName()).build();
1107
1108             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1109                     Shard.builder(), peerAddressResolver);
1110             shardInfo.setActiveMember(false);
1111             localShards.put(shardName, shardInfo);
1112             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1113         } else {
1114             removeShardOnFailure = false;
1115             shardInfo = existingShardInfo;
1116         }
1117
1118         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1119
1120         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1121         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1122                 response.getPrimaryPath(), shardInfo.getShardId());
1123
1124         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1125                 duration());
1126         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1127             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1128
1129         futureObj.onComplete(new OnComplete<Object>() {
1130             @Override
1131             public void onComplete(Throwable failure, Object addServerResponse) {
1132                 if (failure != null) {
1133                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1134                             response.getPrimaryPath(), shardName, failure);
1135
1136                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1137                             response.getPrimaryPath(), shardName);
1138                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1139                 } else {
1140                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1141                             response.getPrimaryPath(), removeShardOnFailure), sender);
1142                 }
1143             }
1144         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1145     }
1146
1147     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1148             boolean removeShardOnFailure) {
1149         shardReplicaOperationsInProgress.remove(shardName);
1150
1151         if(removeShardOnFailure) {
1152             ShardInformation shardInfo = localShards.remove(shardName);
1153             if (shardInfo.getActor() != null) {
1154                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1155             }
1156         }
1157
1158         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
1159             new RuntimeException(message, failure)), getSelf());
1160     }
1161
1162     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1163             String leaderPath, boolean removeShardOnFailure) {
1164         String shardName = shardInfo.getShardName();
1165         shardReplicaOperationsInProgress.remove(shardName);
1166
1167         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1168
1169         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1170             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1171
1172             // Make the local shard voting capable
1173             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1174             shardInfo.setActiveMember(true);
1175             persistShardList();
1176
1177             mBean.addLocalShard(shardInfo.getShardId().toString());
1178             sender.tell(new akka.actor.Status.Success(null), getSelf());
1179         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1180             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1181         } else {
1182             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1183                     persistenceId(), shardName, replyMsg.getStatus());
1184
1185             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1186
1187             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1188         }
1189     }
1190
1191     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1192                                                String leaderPath, ShardIdentifier shardId) {
1193         Exception failure;
1194         switch (serverChangeStatus) {
1195             case TIMEOUT:
1196                 failure = new TimeoutException(String.format(
1197                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1198                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1199                         leaderPath, shardId.getShardName()));
1200                 break;
1201             case NO_LEADER:
1202                 failure = createNoShardLeaderException(shardId);
1203                 break;
1204             case NOT_SUPPORTED:
1205                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1206                         serverChange.getSimpleName(), shardId.getShardName()));
1207                 break;
1208             default :
1209                 failure = new RuntimeException(String.format(
1210                         "%s request to leader %s for shard %s failed with status %s",
1211                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1212         }
1213         return failure;
1214     }
1215
1216     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1217         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1218
1219         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1220                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1221             @Override
1222             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1223                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1224             }
1225
1226             @Override
1227             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1228                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1229             }
1230         });
1231     }
1232
1233     private void persistShardList() {
1234         List<String> shardList = new ArrayList<>(localShards.keySet());
1235         for (ShardInformation shardInfo : localShards.values()) {
1236             if (!shardInfo.isActiveMember()) {
1237                 shardList.remove(shardInfo.getShardName());
1238             }
1239         }
1240         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1241         saveSnapshot(updateShardManagerSnapshot(shardList));
1242     }
1243
1244     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1245         currentSnapshot = new ShardManagerSnapshot(shardList);
1246         return currentSnapshot;
1247     }
1248
1249     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1250         currentSnapshot = snapshot;
1251
1252         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1253
1254         String currentMember = cluster.getCurrentMemberName();
1255         Set<String> configuredShardList =
1256             new HashSet<>(configuration.getMemberShardNames(currentMember));
1257         for (String shard : currentSnapshot.getShardList()) {
1258             if (!configuredShardList.contains(shard)) {
1259                 // add the current member as a replica for the shard
1260                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1261                 configuration.addMemberReplicaForShard(shard, currentMember);
1262             } else {
1263                 configuredShardList.remove(shard);
1264             }
1265         }
1266         for (String shard : configuredShardList) {
1267             // remove the member as a replica for the shard
1268             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1269             configuration.removeMemberReplicaForShard(shard, currentMember);
1270         }
1271     }
1272
1273     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1274         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1275             persistenceId());
1276         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1277             0, 0));
1278     }
1279
1280     private static class ForwardedAddServerReply {
1281         ShardInformation shardInfo;
1282         AddServerReply addServerReply;
1283         String leaderPath;
1284         boolean removeShardOnFailure;
1285
1286         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1287                 boolean removeShardOnFailure) {
1288             this.shardInfo = shardInfo;
1289             this.addServerReply = addServerReply;
1290             this.leaderPath = leaderPath;
1291             this.removeShardOnFailure = removeShardOnFailure;
1292         }
1293     }
1294
1295     private static class ForwardedAddServerFailure {
1296         String shardName;
1297         String failureMessage;
1298         Throwable failure;
1299         boolean removeShardOnFailure;
1300
1301         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1302                 boolean removeShardOnFailure) {
1303             this.shardName = shardName;
1304             this.failureMessage = failureMessage;
1305             this.failure = failure;
1306             this.removeShardOnFailure = removeShardOnFailure;
1307         }
1308     }
1309
1310     @VisibleForTesting
1311     protected static class ShardInformation {
1312         private final ShardIdentifier shardId;
1313         private final String shardName;
1314         private ActorRef actor;
1315         private final Map<String, String> initialPeerAddresses;
1316         private Optional<DataTree> localShardDataTree;
1317         private boolean leaderAvailable = false;
1318
1319         // flag that determines if the actor is ready for business
1320         private boolean actorInitialized = false;
1321
1322         private boolean followerSyncStatus = false;
1323
1324         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1325         private String role ;
1326         private String leaderId;
1327         private short leaderVersion;
1328
1329         private DatastoreContext datastoreContext;
1330         private Shard.AbstractBuilder<?, ?> builder;
1331         private final ShardPeerAddressResolver addressResolver;
1332         private boolean isActiveMember = true;
1333
1334         private ShardInformation(String shardName, ShardIdentifier shardId,
1335                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1336                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1337             this.shardName = shardName;
1338             this.shardId = shardId;
1339             this.initialPeerAddresses = initialPeerAddresses;
1340             this.datastoreContext = datastoreContext;
1341             this.builder = builder;
1342             this.addressResolver = addressResolver;
1343         }
1344
1345         Props newProps(SchemaContext schemaContext) {
1346             Preconditions.checkNotNull(builder);
1347             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1348                     schemaContext(schemaContext).props();
1349             builder = null;
1350             return props;
1351         }
1352
1353         String getShardName() {
1354             return shardName;
1355         }
1356
1357         @Nullable
1358         ActorRef getActor(){
1359             return actor;
1360         }
1361
1362         void setActor(ActorRef actor) {
1363             this.actor = actor;
1364         }
1365
1366         ShardIdentifier getShardId() {
1367             return shardId;
1368         }
1369
1370         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1371             this.localShardDataTree = localShardDataTree;
1372         }
1373
1374         Optional<DataTree> getLocalShardDataTree() {
1375             return localShardDataTree;
1376         }
1377
1378         DatastoreContext getDatastoreContext() {
1379             return datastoreContext;
1380         }
1381
1382         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1383             this.datastoreContext = datastoreContext;
1384             if (actor != null) {
1385                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1386                 actor.tell(this.datastoreContext, sender);
1387             }
1388         }
1389
1390         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1391             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1392
1393             if(actor != null) {
1394                 if(LOG.isDebugEnabled()) {
1395                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1396                             peerId, peerAddress, actor.path());
1397                 }
1398
1399                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1400             }
1401
1402             notifyOnShardInitializedCallbacks();
1403         }
1404
1405         void peerDown(String memberName, String peerId, ActorRef sender) {
1406             if(actor != null) {
1407                 actor.tell(new PeerDown(memberName, peerId), sender);
1408             }
1409         }
1410
1411         void peerUp(String memberName, String peerId, ActorRef sender) {
1412             if(actor != null) {
1413                 actor.tell(new PeerUp(memberName, peerId), sender);
1414             }
1415         }
1416
1417         boolean isShardReady() {
1418             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1419         }
1420
1421         boolean isShardReadyWithLeaderId() {
1422             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1423                     (isLeader() || addressResolver.resolve(leaderId) != null);
1424         }
1425
1426         boolean isShardInitialized() {
1427             return getActor() != null && actorInitialized;
1428         }
1429
1430         boolean isLeader() {
1431             return Objects.equal(leaderId, shardId.toString());
1432         }
1433
1434         String getSerializedLeaderActor() {
1435             if(isLeader()) {
1436                 return Serialization.serializedActorPath(getActor());
1437             } else {
1438                 return addressResolver.resolve(leaderId);
1439             }
1440         }
1441
1442         void setActorInitialized() {
1443             LOG.debug("Shard {} is initialized", shardId);
1444
1445             this.actorInitialized = true;
1446
1447             notifyOnShardInitializedCallbacks();
1448         }
1449
1450         private void notifyOnShardInitializedCallbacks() {
1451             if(onShardInitializedSet.isEmpty()) {
1452                 return;
1453             }
1454
1455             boolean ready = isShardReadyWithLeaderId();
1456
1457             if(LOG.isDebugEnabled()) {
1458                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1459                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1460             }
1461
1462             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1463             while(iter.hasNext()) {
1464                 OnShardInitialized onShardInitialized = iter.next();
1465                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1466                     iter.remove();
1467                     onShardInitialized.getTimeoutSchedule().cancel();
1468                     onShardInitialized.getReplyRunnable().run();
1469                 }
1470             }
1471         }
1472
1473         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1474             onShardInitializedSet.add(onShardInitialized);
1475         }
1476
1477         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1478             onShardInitializedSet.remove(onShardInitialized);
1479         }
1480
1481         void setRole(String newRole) {
1482             this.role = newRole;
1483
1484             notifyOnShardInitializedCallbacks();
1485         }
1486
1487         void setFollowerSyncStatus(boolean syncStatus){
1488             this.followerSyncStatus = syncStatus;
1489         }
1490
1491         boolean isInSync(){
1492             if(RaftState.Follower.name().equals(this.role)){
1493                 return followerSyncStatus;
1494             } else if(RaftState.Leader.name().equals(this.role)){
1495                 return true;
1496             }
1497
1498             return false;
1499         }
1500
1501         boolean setLeaderId(String leaderId) {
1502             boolean changed = !Objects.equal(this.leaderId, leaderId);
1503             this.leaderId = leaderId;
1504             if(leaderId != null) {
1505                 this.leaderAvailable = true;
1506             }
1507             notifyOnShardInitializedCallbacks();
1508
1509             return changed;
1510         }
1511
1512         String getLeaderId() {
1513             return leaderId;
1514         }
1515
1516         void setLeaderAvailable(boolean leaderAvailable) {
1517             this.leaderAvailable = leaderAvailable;
1518
1519             if(leaderAvailable) {
1520                 notifyOnShardInitializedCallbacks();
1521             }
1522         }
1523
1524         short getLeaderVersion() {
1525             return leaderVersion;
1526         }
1527
1528         void setLeaderVersion(short leaderVersion) {
1529             this.leaderVersion = leaderVersion;
1530         }
1531
1532         boolean isActiveMember() {
1533             return isActiveMember;
1534         }
1535
1536         void setActiveMember(boolean isActiveMember) {
1537             this.isActiveMember = isActiveMember;
1538         }
1539     }
1540
1541     private static class OnShardInitialized {
1542         private final Runnable replyRunnable;
1543         private Cancellable timeoutSchedule;
1544
1545         OnShardInitialized(Runnable replyRunnable) {
1546             this.replyRunnable = replyRunnable;
1547         }
1548
1549         Runnable getReplyRunnable() {
1550             return replyRunnable;
1551         }
1552
1553         Cancellable getTimeoutSchedule() {
1554             return timeoutSchedule;
1555         }
1556
1557         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1558             this.timeoutSchedule = timeoutSchedule;
1559         }
1560     }
1561
1562     private static class OnShardReady extends OnShardInitialized {
1563         OnShardReady(Runnable replyRunnable) {
1564             super(replyRunnable);
1565         }
1566     }
1567
1568     private static class ShardNotInitializedTimeout {
1569         private final ActorRef sender;
1570         private final ShardInformation shardInfo;
1571         private final OnShardInitialized onShardInitialized;
1572
1573         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1574             this.sender = sender;
1575             this.shardInfo = shardInfo;
1576             this.onShardInitialized = onShardInitialized;
1577         }
1578
1579         ActorRef getSender() {
1580             return sender;
1581         }
1582
1583         ShardInformation getShardInfo() {
1584             return shardInfo;
1585         }
1586
1587         OnShardInitialized getOnShardInitialized() {
1588             return onShardInitialized;
1589         }
1590     }
1591
1592     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1593         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1594                 getShardInitializationTimeout().duration().$times(2));
1595
1596
1597         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1598         futureObj.onComplete(new OnComplete<Object>() {
1599             @Override
1600             public void onComplete(Throwable failure, Object response) {
1601                 if (failure != null) {
1602                     handler.onFailure(failure);
1603                 } else {
1604                     if(response instanceof RemotePrimaryShardFound) {
1605                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1606                     } else if(response instanceof LocalPrimaryShardFound) {
1607                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1608                     } else {
1609                         handler.onUnknownResponse(response);
1610                     }
1611                 }
1612             }
1613         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1614     }
1615
1616     /**
1617      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1618      * a remote or local find primary message is processed
1619      */
1620     private static interface FindPrimaryResponseHandler {
1621         /**
1622          * Invoked when a Failure message is received as a response
1623          *
1624          * @param failure
1625          */
1626         void onFailure(Throwable failure);
1627
1628         /**
1629          * Invoked when a RemotePrimaryShardFound response is received
1630          *
1631          * @param response
1632          */
1633         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1634
1635         /**
1636          * Invoked when a LocalPrimaryShardFound response is received
1637          * @param response
1638          */
1639         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1640
1641         /**
1642          * Invoked when an unknown response is received. This is another type of failure.
1643          *
1644          * @param response
1645          */
1646         void onUnknownResponse(Object response);
1647     }
1648
1649     /**
1650      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1651      * replica and sends a wrapped Failure response to some targetActor
1652      */
1653     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1654         private final ActorRef targetActor;
1655         private final String shardName;
1656         private final String persistenceId;
1657         private final ActorRef shardManagerActor;
1658
1659         /**
1660          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1661          * @param shardName The name of the shard for which the primary replica had to be found
1662          * @param persistenceId The persistenceId for the ShardManager
1663          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1664          */
1665         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1666             this.targetActor = Preconditions.checkNotNull(targetActor);
1667             this.shardName = Preconditions.checkNotNull(shardName);
1668             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1669             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1670         }
1671
1672         public ActorRef getTargetActor() {
1673             return targetActor;
1674         }
1675
1676         public String getShardName() {
1677             return shardName;
1678         }
1679
1680         @Override
1681         public void onFailure(Throwable failure) {
1682             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1683             targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1684                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1685         }
1686
1687         @Override
1688         public void onUnknownResponse(Object response) {
1689             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1690                     shardName, response);
1691             LOG.debug ("{}: {}", persistenceId, msg);
1692             targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1693                     new RuntimeException(msg)), shardManagerActor);
1694         }
1695     }
1696
1697
1698     /**
1699      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1700      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1701      * as a successful response to find primary.
1702      */
1703     private static class PrimaryShardFoundForContext {
1704         private final String shardName;
1705         private final Object contextMessage;
1706         private final RemotePrimaryShardFound remotePrimaryShardFound;
1707         private final LocalPrimaryShardFound localPrimaryShardFound;
1708
1709         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
1710                 @Nonnull Object primaryFoundMessage) {
1711             this.shardName = Preconditions.checkNotNull(shardName);
1712             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1713             Preconditions.checkNotNull(primaryFoundMessage);
1714             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
1715                     (RemotePrimaryShardFound) primaryFoundMessage : null;
1716             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
1717                     (LocalPrimaryShardFound) primaryFoundMessage : null;
1718         }
1719
1720         @Nonnull
1721         String getPrimaryPath(){
1722             if(remotePrimaryShardFound != null) {
1723                 return remotePrimaryShardFound.getPrimaryPath();
1724             }
1725             return localPrimaryShardFound.getPrimaryPath();
1726         }
1727
1728         @Nonnull
1729         Object getContextMessage() {
1730             return contextMessage;
1731         }
1732
1733         @Nullable
1734         RemotePrimaryShardFound getRemotePrimaryShardFound() {
1735             return remotePrimaryShardFound;
1736         }
1737
1738         @Nonnull
1739         String getShardName() {
1740             return shardName;
1741         }
1742     }
1743
1744     /**
1745      * The WrappedShardResponse class wraps a response from a Shard.
1746      */
1747     private static class WrappedShardResponse {
1748         private final ShardIdentifier shardId;
1749         private final Object response;
1750         private final String leaderPath;
1751
1752         private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1753             this.shardId = shardId;
1754             this.response = response;
1755             this.leaderPath = leaderPath;
1756         }
1757
1758         ShardIdentifier getShardId() {
1759             return shardId;
1760         }
1761
1762         Object getResponse() {
1763             return response;
1764         }
1765
1766         String getLeaderPath() {
1767             return leaderPath;
1768         }
1769     }
1770 }
1771
1772
1773