e314e1c894242d802bb242d034de86a403de52b0
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.actor.SupervisorStrategy;
20 import akka.cluster.ClusterEvent;
21 import akka.dispatch.Futures;
22 import akka.dispatch.OnComplete;
23 import akka.japi.Function;
24 import akka.pattern.Patterns;
25 import akka.persistence.RecoveryCompleted;
26 import akka.persistence.SaveSnapshotFailure;
27 import akka.persistence.SaveSnapshotSuccess;
28 import akka.persistence.SnapshotOffer;
29 import akka.persistence.SnapshotSelectionCriteria;
30 import akka.serialization.Serialization;
31 import akka.util.Timeout;
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.base.Objects;
34 import com.google.common.base.Optional;
35 import com.google.common.base.Preconditions;
36 import com.google.common.base.Strings;
37 import com.google.common.base.Supplier;
38 import com.google.common.collect.Sets;
39 import java.io.ByteArrayInputStream;
40 import java.io.ObjectInputStream;
41 import java.util.ArrayList;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.HashMap;
45 import java.util.HashSet;
46 import java.util.Iterator;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.Set;
50 import java.util.concurrent.CountDownLatch;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import javax.annotation.Nonnull;
54 import javax.annotation.Nullable;
55 import org.apache.commons.lang3.SerializationUtils;
56 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
57 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
58 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
59 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
60 import org.opendaylight.controller.cluster.datastore.Shard;
61 import org.opendaylight.controller.cluster.datastore.config.Configuration;
62 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
63 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
64 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
65 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
66 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
67 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
68 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
69 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
70 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
71 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
72 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
73 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
76 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
77 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
78 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
79 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
80 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
81 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
82 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
83 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
84 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
85 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
86 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
87 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
88 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
89 import org.opendaylight.controller.cluster.raft.RaftState;
90 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
91 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
92 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
93 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
94 import org.opendaylight.controller.cluster.raft.messages.AddServer;
95 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
96 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
97 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
98 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
99 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
100 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
101 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
102 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
103 import org.slf4j.Logger;
104 import org.slf4j.LoggerFactory;
105 import scala.concurrent.ExecutionContext;
106 import scala.concurrent.Future;
107 import scala.concurrent.duration.Duration;
108 import scala.concurrent.duration.FiniteDuration;
109
110 /**
111  * The ShardManager has the following jobs,
112  * <ul>
113  * <li> Create all the local shard replicas that belong on this cluster member
114  * <li> Find the address of the local shard
115  * <li> Find the primary replica for any given shard
116  * <li> Monitor the cluster members and store their addresses
117  * <ul>
118  */
119 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
120
121     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
122
123     // Stores a mapping between a shard name and it's corresponding information
124     // Shard names look like inventory, topology etc and are as specified in
125     // configuration
126     private final Map<String, ShardInformation> localShards = new HashMap<>();
127
128     // The type of a ShardManager reflects the type of the datastore itself
129     // A data store could be of type config/operational
130     private final String type;
131
132     private final ClusterWrapper cluster;
133
134     private final Configuration configuration;
135
136     private final String shardDispatcherPath;
137
138     private final ShardManagerInfo mBean;
139
140     private DatastoreContextFactory datastoreContextFactory;
141
142     private final CountDownLatch waitTillReadyCountdownLatch;
143
144     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
145
146     private final ShardPeerAddressResolver peerAddressResolver;
147
148     private SchemaContext schemaContext;
149
150     private DatastoreSnapshot restoreFromSnapshot;
151
152     private ShardManagerSnapshot currentSnapshot;
153
154     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
155
156     private final String persistenceId;
157
158     ShardManager(AbstractShardManagerCreator<?> builder) {
159         this.cluster = builder.getCluster();
160         this.configuration = builder.getConfiguration();
161         this.datastoreContextFactory = builder.getDdatastoreContextFactory();
162         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
163         this.shardDispatcherPath =
164                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
165         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
166         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
167         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
168
169         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
170         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
171
172         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
173
174         // Subscribe this actor to cluster member events
175         cluster.subscribeToMemberEvents(getSelf());
176
177         List<String> localShardActorNames = new ArrayList<>();
178         mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
179                 "shard-manager-" + this.type,
180                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
181                 localShardActorNames);
182         mBean.setShardManager(this);
183     }
184
185     @Override
186     public void postStop() {
187         LOG.info("Stopping ShardManager {}", persistenceId());
188
189         mBean.unregisterMBean();
190     }
191
192     @Override
193     public void handleCommand(Object message) throws Exception {
194         if (message  instanceof FindPrimary) {
195             findPrimary((FindPrimary)message);
196         } else if(message instanceof FindLocalShard){
197             findLocalShard((FindLocalShard) message);
198         } else if (message instanceof UpdateSchemaContext) {
199             updateSchemaContext(message);
200         } else if(message instanceof ActorInitialized) {
201             onActorInitialized(message);
202         } else if (message instanceof ClusterEvent.MemberUp){
203             memberUp((ClusterEvent.MemberUp) message);
204         } else if (message instanceof ClusterEvent.MemberExited){
205             memberExited((ClusterEvent.MemberExited) message);
206         } else if(message instanceof ClusterEvent.MemberRemoved) {
207             memberRemoved((ClusterEvent.MemberRemoved) message);
208         } else if(message instanceof ClusterEvent.UnreachableMember) {
209             memberUnreachable((ClusterEvent.UnreachableMember)message);
210         } else if(message instanceof ClusterEvent.ReachableMember) {
211             memberReachable((ClusterEvent.ReachableMember) message);
212         } else if(message instanceof DatastoreContextFactory) {
213             onDatastoreContextFactory((DatastoreContextFactory)message);
214         } else if(message instanceof RoleChangeNotification) {
215             onRoleChangeNotification((RoleChangeNotification) message);
216         } else if(message instanceof FollowerInitialSyncUpStatus){
217             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
218         } else if(message instanceof ShardNotInitializedTimeout) {
219             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
220         } else if(message instanceof ShardLeaderStateChanged) {
221             onLeaderStateChanged((ShardLeaderStateChanged) message);
222         } else if(message instanceof SwitchShardBehavior){
223             onSwitchShardBehavior((SwitchShardBehavior) message);
224         } else if(message instanceof CreateShard) {
225             onCreateShard((CreateShard)message);
226         } else if(message instanceof AddShardReplica){
227             onAddShardReplica((AddShardReplica)message);
228         } else if(message instanceof ForwardedAddServerReply) {
229             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
230             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
231                     msg.removeShardOnFailure);
232         } else if(message instanceof ForwardedAddServerFailure) {
233             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
234             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
235         } else if(message instanceof PrimaryShardFoundForContext) {
236             PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
237             onPrimaryShardFoundContext(primaryShardFoundContext);
238         } else if(message instanceof RemoveShardReplica) {
239             onRemoveShardReplica((RemoveShardReplica) message);
240         } else if(message instanceof WrappedShardResponse){
241             onWrappedShardResponse((WrappedShardResponse) message);
242         } else if(message instanceof GetSnapshot) {
243             onGetSnapshot();
244         } else if(message instanceof ServerRemoved){
245             onShardReplicaRemoved((ServerRemoved) message);
246         } else if(message instanceof SaveSnapshotSuccess) {
247             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
248         } else if(message instanceof SaveSnapshotFailure) {
249             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
250                     persistenceId(), ((SaveSnapshotFailure) message).cause());
251         } else if(message instanceof Shutdown) {
252             onShutDown();
253         } else {
254             unknownMessage(message);
255         }
256     }
257
258     private void onShutDown() {
259         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
260         for (ShardInformation info : localShards.values()) {
261             if (info.getActor() != null) {
262                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
263
264                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
265                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
266             }
267         }
268
269         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
270
271         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
272         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
273
274         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
275             @Override
276             public void onComplete(Throwable failure, Iterable<Boolean> results) {
277                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
278
279                 self().tell(PoisonPill.getInstance(), self());
280
281                 if(failure != null) {
282                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
283                 } else {
284                     int nfailed = 0;
285                     for(Boolean r: results) {
286                         if(!r) {
287                             nfailed++;
288                         }
289                     }
290
291                     if(nfailed > 0) {
292                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
293                     }
294                 }
295             }
296         }, dispatcher);
297     }
298
299     private void onWrappedShardResponse(WrappedShardResponse message) {
300         if (message.getResponse() instanceof RemoveServerReply) {
301             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
302                     message.getLeaderPath());
303         }
304     }
305
306     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
307             String leaderPath) {
308         shardReplicaOperationsInProgress.remove(shardId);
309
310         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
311
312         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
313             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
314                     shardId.getShardName());
315             originalSender.tell(new akka.actor.Status.Success(null), getSelf());
316         } else {
317             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
318                     persistenceId(), shardId, replyMsg.getStatus());
319
320             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
321                     leaderPath, shardId);
322             originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
323         }
324     }
325
326     private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
327         if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
328             addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
329                     getSender());
330         } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
331             removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
332                     primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
333         }
334     }
335
336     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
337             final ActorRef sender) {
338         if(isShardReplicaOperationInProgress(shardName, sender)) {
339             return;
340         }
341
342         shardReplicaOperationsInProgress.add(shardName);
343
344         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
345
346         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
347
348         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
349         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
350                 primaryPath, shardId);
351
352         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
353                 duration());
354         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
355                 new RemoveServer(shardId.toString()), removeServerTimeout);
356
357         futureObj.onComplete(new OnComplete<Object>() {
358             @Override
359             public void onComplete(Throwable failure, Object response) {
360                 if (failure != null) {
361                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
362                             primaryPath, shardName);
363
364                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
365
366                     // FAILURE
367                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
368                 } else {
369                     // SUCCESS
370                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
371                 }
372             }
373         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
374     }
375
376     private void onShardReplicaRemoved(ServerRemoved message) {
377         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
378         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
379         if(shardInformation == null) {
380             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
381             return;
382         } else if(shardInformation.getActor() != null) {
383             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
384             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
385         }
386         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
387         persistShardList();
388     }
389
390     private void onGetSnapshot() {
391         LOG.debug("{}: onGetSnapshot", persistenceId());
392
393         List<String> notInitialized = null;
394         for(ShardInformation shardInfo: localShards.values()) {
395             if(!shardInfo.isShardInitialized()) {
396                 if(notInitialized == null) {
397                     notInitialized = new ArrayList<>();
398                 }
399
400                 notInitialized.add(shardInfo.getShardName());
401             }
402         }
403
404         if(notInitialized != null) {
405             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
406                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
407             return;
408         }
409
410         byte[] shardManagerSnapshot = null;
411         if(currentSnapshot != null) {
412             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
413         }
414
415         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
416                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
417                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
418
419         for(ShardInformation shardInfo: localShards.values()) {
420             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
421         }
422     }
423
424     private void onCreateShard(CreateShard createShard) {
425         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
426
427         Object reply;
428         try {
429             String shardName = createShard.getModuleShardConfig().getShardName();
430             if(localShards.containsKey(shardName)) {
431                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
432                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
433             } else {
434                 doCreateShard(createShard);
435                 reply = new akka.actor.Status.Success(null);
436             }
437         } catch (Exception e) {
438             LOG.error("{}: onCreateShard failed", persistenceId(), e);
439             reply = new akka.actor.Status.Failure(e);
440         }
441
442         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
443             getSender().tell(reply, getSelf());
444         }
445     }
446
447     private void doCreateShard(CreateShard createShard) {
448         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
449         String shardName = moduleShardConfig.getShardName();
450
451         configuration.addModuleShardConfiguration(moduleShardConfig);
452
453         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
454         if(shardDatastoreContext == null) {
455             shardDatastoreContext = newShardDatastoreContext(shardName);
456         } else {
457             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
458                     peerAddressResolver).build();
459         }
460
461         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
462
463         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
464                 currentSnapshot.getShardList().contains(shardName);
465
466         Map<String, String> peerAddresses;
467         boolean isActiveMember;
468         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
469                 contains(cluster.getCurrentMemberName())) {
470             peerAddresses = getPeerAddresses(shardName);
471             isActiveMember = true;
472         } else {
473             // The local member is not in the static shard member configuration and the shard did not
474             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
475             // the shard with no peers and with elections disabled so it stays as follower. A
476             // subsequent AddServer request will be needed to make it an active member.
477             isActiveMember = false;
478             peerAddresses = Collections.emptyMap();
479             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
480                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
481         }
482
483         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
484                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
485                 isActiveMember);
486
487         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
488                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
489         info.setActiveMember(isActiveMember);
490         localShards.put(info.getShardName(), info);
491
492         mBean.addLocalShard(shardId.toString());
493
494         if(schemaContext != null) {
495             info.setActor(newShardActor(schemaContext, info));
496         }
497     }
498
499     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
500         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
501                 shardPeerAddressResolver(peerAddressResolver);
502     }
503
504     private DatastoreContext newShardDatastoreContext(String shardName) {
505         return newShardDatastoreContextBuilder(shardName).build();
506     }
507
508     private void checkReady(){
509         if (isReadyWithLeaderId()) {
510             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
511                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
512
513             waitTillReadyCountdownLatch.countDown();
514         }
515     }
516
517     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
518         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
519
520         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
521         if(shardInformation != null) {
522             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
523             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
524             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
525                 primaryShardInfoCache.remove(shardInformation.getShardName());
526             }
527
528             checkReady();
529         } else {
530             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
531         }
532     }
533
534     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
535         ShardInformation shardInfo = message.getShardInfo();
536
537         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
538                 shardInfo.getShardName());
539
540         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
541
542         if(!shardInfo.isShardInitialized()) {
543             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
544             message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
545         } else {
546             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
547             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
548         }
549     }
550
551     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
552         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
553                 status.getName(), status.isInitialSyncDone());
554
555         ShardInformation shardInformation = findShardInformation(status.getName());
556
557         if(shardInformation != null) {
558             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
559
560             mBean.setSyncStatus(isInSync());
561         }
562
563     }
564
565     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
566         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
567                 roleChanged.getOldRole(), roleChanged.getNewRole());
568
569         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
570         if(shardInformation != null) {
571             shardInformation.setRole(roleChanged.getNewRole());
572             checkReady();
573             mBean.setSyncStatus(isInSync());
574         }
575     }
576
577
578     private ShardInformation findShardInformation(String memberId) {
579         for(ShardInformation info : localShards.values()){
580             if(info.getShardId().toString().equals(memberId)){
581                 return info;
582             }
583         }
584
585         return null;
586     }
587
588     private boolean isReadyWithLeaderId() {
589         boolean isReady = true;
590         for (ShardInformation info : localShards.values()) {
591             if(!info.isShardReadyWithLeaderId()){
592                 isReady = false;
593                 break;
594             }
595         }
596         return isReady;
597     }
598
599     private boolean isInSync(){
600         for (ShardInformation info : localShards.values()) {
601             if(!info.isInSync()){
602                 return false;
603             }
604         }
605         return true;
606     }
607
608     private void onActorInitialized(Object message) {
609         final ActorRef sender = getSender();
610
611         if (sender == null) {
612             return; //why is a non-actor sending this message? Just ignore.
613         }
614
615         String actorName = sender.path().name();
616         //find shard name from actor name; actor name is stringified shardId
617         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
618
619         if (shardId.getShardName() == null) {
620             return;
621         }
622
623         markShardAsInitialized(shardId.getShardName());
624     }
625
626     private void markShardAsInitialized(String shardName) {
627         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
628
629         ShardInformation shardInformation = localShards.get(shardName);
630         if (shardInformation != null) {
631             shardInformation.setActorInitialized();
632
633             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
634         }
635     }
636
637     @Override
638     protected void handleRecover(Object message) throws Exception {
639         if (message instanceof RecoveryCompleted) {
640             onRecoveryCompleted();
641         } else if (message instanceof SnapshotOffer) {
642             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
643         }
644     }
645
646     private void onRecoveryCompleted() {
647         LOG.info("Recovery complete : {}", persistenceId());
648
649         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
650         // journal on upgrade from Helium.
651         deleteMessages(lastSequenceNr());
652
653         if(currentSnapshot == null && restoreFromSnapshot != null &&
654                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
655             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
656                     restoreFromSnapshot.getShardManagerSnapshot()))) {
657                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
658
659                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
660
661                 applyShardManagerSnapshot(snapshot);
662             } catch(Exception e) {
663                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
664             }
665         }
666
667         createLocalShards();
668     }
669
670     private void findLocalShard(FindLocalShard message) {
671         final ShardInformation shardInformation = localShards.get(message.getShardName());
672
673         if(shardInformation == null){
674             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
675             return;
676         }
677
678         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier<Object>() {
679             @Override
680             public Object get() {
681                 return new LocalShardFound(shardInformation.getActor());
682             }
683         });
684     }
685
686     private void sendResponse(ShardInformation shardInformation, boolean doWait,
687             boolean wantShardReady, final Supplier<Object> messageSupplier) {
688         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
689             if(doWait) {
690                 final ActorRef sender = getSender();
691                 final ActorRef self = self();
692
693                 Runnable replyRunnable = new Runnable() {
694                     @Override
695                     public void run() {
696                         sender.tell(messageSupplier.get(), self);
697                     }
698                 };
699
700                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
701                     new OnShardInitialized(replyRunnable);
702
703                 shardInformation.addOnShardInitialized(onShardInitialized);
704
705                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
706                 if(shardInformation.isShardInitialized()) {
707                     // If the shard is already initialized then we'll wait enough time for the shard to
708                     // elect a leader, ie 2 times the election timeout.
709                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
710                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
711                 }
712
713                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
714                         shardInformation.getShardName());
715
716                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
717                         timeout, getSelf(),
718                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
719                         getContext().dispatcher(), getSelf());
720
721                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
722
723             } else if (!shardInformation.isShardInitialized()) {
724                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
725                         shardInformation.getShardName());
726                 getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
727             } else {
728                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
729                         shardInformation.getShardName());
730                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
731             }
732
733             return;
734         }
735
736         getSender().tell(messageSupplier.get(), getSelf());
737     }
738
739     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
740         return new NoShardLeaderException(null, shardId.toString());
741     }
742
743     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
744         return new NotInitializedException(String.format(
745                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
746     }
747
748     private void memberRemoved(ClusterEvent.MemberRemoved message) {
749         String memberName = message.member().roles().iterator().next();
750
751         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
752                 message.member().address());
753
754         peerAddressResolver.removePeerAddress(memberName);
755
756         for(ShardInformation info : localShards.values()){
757             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
758         }
759     }
760
761     private void memberExited(ClusterEvent.MemberExited message) {
762         String memberName = message.member().roles().iterator().next();
763
764         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
765                 message.member().address());
766
767         peerAddressResolver.removePeerAddress(memberName);
768
769         for(ShardInformation info : localShards.values()){
770             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
771         }
772     }
773
774     private void memberUp(ClusterEvent.MemberUp message) {
775         String memberName = message.member().roles().iterator().next();
776
777         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
778                 message.member().address());
779
780         addPeerAddress(memberName, message.member().address());
781
782         checkReady();
783     }
784
785     private void addPeerAddress(String memberName, Address address) {
786         peerAddressResolver.addPeerAddress(memberName, address);
787
788         for(ShardInformation info : localShards.values()){
789             String shardName = info.getShardName();
790             String peerId = getShardIdentifier(memberName, shardName).toString();
791             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
792
793             info.peerUp(memberName, peerId, getSelf());
794         }
795     }
796
797     private void memberReachable(ClusterEvent.ReachableMember message) {
798         String memberName = message.member().roles().iterator().next();
799         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
800
801         addPeerAddress(memberName, message.member().address());
802
803         markMemberAvailable(memberName);
804     }
805
806     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
807         String memberName = message.member().roles().iterator().next();
808         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
809
810         markMemberUnavailable(memberName);
811     }
812
813     private void markMemberUnavailable(final String memberName) {
814         for(ShardInformation info : localShards.values()){
815             String leaderId = info.getLeaderId();
816             if(leaderId != null && leaderId.contains(memberName)) {
817                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
818                 info.setLeaderAvailable(false);
819
820                 primaryShardInfoCache.remove(info.getShardName());
821             }
822
823             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
824         }
825     }
826
827     private void markMemberAvailable(final String memberName) {
828         for(ShardInformation info : localShards.values()){
829             String leaderId = info.getLeaderId();
830             if(leaderId != null && leaderId.contains(memberName)) {
831                 LOG.debug("Marking Leader {} as available.", leaderId);
832                 info.setLeaderAvailable(true);
833             }
834
835             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
836         }
837     }
838
839     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
840         datastoreContextFactory = factory;
841         for (ShardInformation info : localShards.values()) {
842             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
843         }
844     }
845
846     private void onSwitchShardBehavior(SwitchShardBehavior message) {
847         ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
848
849         ShardInformation shardInformation = localShards.get(identifier.getShardName());
850
851         if(shardInformation != null && shardInformation.getActor() != null) {
852             shardInformation.getActor().tell(
853                     new SwitchBehavior(message.getNewState(), message.getTerm()), getSelf());
854         } else {
855             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
856                     message.getShardName(), message.getNewState());
857         }
858     }
859
860     /**
861      * Notifies all the local shards of a change in the schema context
862      *
863      * @param message
864      */
865     private void updateSchemaContext(final Object message) {
866         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
867
868         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
869
870         for (ShardInformation info : localShards.values()) {
871             if (info.getActor() == null) {
872                 LOG.debug("Creating Shard {}", info.getShardId());
873                 info.setActor(newShardActor(schemaContext, info));
874             } else {
875                 info.getActor().tell(message, getSelf());
876             }
877         }
878     }
879
880     @VisibleForTesting
881     protected ClusterWrapper getCluster() {
882         return cluster;
883     }
884
885     @VisibleForTesting
886     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
887         return getContext().actorOf(info.newProps(schemaContext)
888                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
889     }
890
891     private void findPrimary(FindPrimary message) {
892         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
893
894         final String shardName = message.getShardName();
895         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
896
897         // First see if the there is a local replica for the shard
898         final ShardInformation info = localShards.get(shardName);
899         if (info != null && info.isActiveMember()) {
900             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
901                 @Override
902                 public Object get() {
903                     String primaryPath = info.getSerializedLeaderActor();
904                     Object found = canReturnLocalShardState && info.isLeader() ?
905                             new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
906                                 new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
907
908                             if(LOG.isDebugEnabled()) {
909                                 LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
910                             }
911
912                             return found;
913                 }
914             });
915
916             return;
917         }
918
919         final Collection<String> visitedAddresses;
920         if(message instanceof RemoteFindPrimary) {
921             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
922         } else {
923             visitedAddresses = new ArrayList<>(1);
924         }
925
926         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
927
928         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
929             if(visitedAddresses.contains(address)) {
930                 continue;
931             }
932
933             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
934                     persistenceId(), shardName, address, visitedAddresses);
935
936             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
937                     message.isWaitUntilReady(), visitedAddresses), getContext());
938             return;
939         }
940
941         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
942
943         getSender().tell(new PrimaryNotFoundException(
944                 String.format("No primary shard found for %s.", shardName)), getSelf());
945     }
946
947     /**
948      * Construct the name of the shard actor given the name of the member on
949      * which the shard resides and the name of the shard
950      *
951      * @param memberName
952      * @param shardName
953      * @return
954      */
955     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
956         return peerAddressResolver.getShardIdentifier(memberName, shardName);
957     }
958
959     /**
960      * Create shards that are local to the member on which the ShardManager
961      * runs
962      *
963      */
964     private void createLocalShards() {
965         String memberName = this.cluster.getCurrentMemberName();
966         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
967
968         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
969         if(restoreFromSnapshot != null)
970         {
971             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
972                 shardSnapshots.put(snapshot.getName(), snapshot);
973             }
974         }
975
976         restoreFromSnapshot = null; // null out to GC
977
978         for(String shardName : memberShardNames){
979             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
980
981             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
982
983             Map<String, String> peerAddresses = getPeerAddresses(shardName);
984             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
985                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
986                         shardSnapshots.get(shardName)), peerAddressResolver));
987             mBean.addLocalShard(shardId.toString());
988         }
989     }
990
991     /**
992      * Given the name of the shard find the addresses of all it's peers
993      *
994      * @param shardName
995      */
996     private Map<String, String> getPeerAddresses(String shardName) {
997         Collection<String> members = configuration.getMembersFromShardName(shardName);
998         Map<String, String> peerAddresses = new HashMap<>();
999
1000         String currentMemberName = this.cluster.getCurrentMemberName();
1001
1002         for(String memberName : members) {
1003             if(!currentMemberName.equals(memberName)) {
1004                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1005                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1006                 peerAddresses.put(shardId.toString(), address);
1007             }
1008         }
1009         return peerAddresses;
1010     }
1011
1012     @Override
1013     public SupervisorStrategy supervisorStrategy() {
1014
1015         return new OneForOneStrategy(10, Duration.create("1 minute"),
1016                 new Function<Throwable, SupervisorStrategy.Directive>() {
1017             @Override
1018             public SupervisorStrategy.Directive apply(Throwable t) {
1019                 LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1020                 return SupervisorStrategy.resume();
1021             }
1022         }
1023                 );
1024
1025     }
1026
1027     @Override
1028     public String persistenceId() {
1029         return persistenceId;
1030     }
1031
1032     @VisibleForTesting
1033     ShardManagerInfoMBean getMBean(){
1034         return mBean;
1035     }
1036
1037     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1038         if (shardReplicaOperationsInProgress.contains(shardName)) {
1039             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1040             LOG.debug ("{}: {}", persistenceId(), msg);
1041             sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1042             return true;
1043         }
1044
1045         return false;
1046     }
1047
1048     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
1049         final String shardName = shardReplicaMsg.getShardName();
1050
1051         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1052
1053         // verify the shard with the specified name is present in the cluster configuration
1054         if (!(this.configuration.isShardConfigured(shardName))) {
1055             String msg = String.format("No module configuration exists for shard %s", shardName);
1056             LOG.debug ("{}: {}", persistenceId(), msg);
1057             getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
1058             return;
1059         }
1060
1061         // Create the localShard
1062         if (schemaContext == null) {
1063             String msg = String.format(
1064                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1065             LOG.debug ("{}: {}", persistenceId(), msg);
1066             getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
1067             return;
1068         }
1069
1070         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1071             @Override
1072             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1073                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1074             }
1075
1076             @Override
1077             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1078                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1079             }
1080
1081         });
1082     }
1083
1084     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1085         String msg = String.format("Local shard %s already exists", shardName);
1086         LOG.debug ("{}: {}", persistenceId(), msg);
1087         sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
1088     }
1089
1090     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1091         if(isShardReplicaOperationInProgress(shardName, sender)) {
1092             return;
1093         }
1094
1095         shardReplicaOperationsInProgress.add(shardName);
1096
1097         final ShardInformation shardInfo;
1098         final boolean removeShardOnFailure;
1099         ShardInformation existingShardInfo = localShards.get(shardName);
1100         if(existingShardInfo == null) {
1101             removeShardOnFailure = true;
1102             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1103
1104             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1105                     DisableElectionsRaftPolicy.class.getName()).build();
1106
1107             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1108                     Shard.builder(), peerAddressResolver);
1109             shardInfo.setActiveMember(false);
1110             localShards.put(shardName, shardInfo);
1111             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1112         } else {
1113             removeShardOnFailure = false;
1114             shardInfo = existingShardInfo;
1115         }
1116
1117         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1118
1119         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1120         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1121                 response.getPrimaryPath(), shardInfo.getShardId());
1122
1123         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1124                 duration());
1125         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1126             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1127
1128         futureObj.onComplete(new OnComplete<Object>() {
1129             @Override
1130             public void onComplete(Throwable failure, Object addServerResponse) {
1131                 if (failure != null) {
1132                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1133                             response.getPrimaryPath(), shardName, failure);
1134
1135                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1136                             response.getPrimaryPath(), shardName);
1137                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1138                 } else {
1139                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1140                             response.getPrimaryPath(), removeShardOnFailure), sender);
1141                 }
1142             }
1143         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1144     }
1145
1146     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1147             boolean removeShardOnFailure) {
1148         shardReplicaOperationsInProgress.remove(shardName);
1149
1150         if(removeShardOnFailure) {
1151             ShardInformation shardInfo = localShards.remove(shardName);
1152             if (shardInfo.getActor() != null) {
1153                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1154             }
1155         }
1156
1157         sender.tell(new akka.actor.Status.Failure(message == null ? failure :
1158             new RuntimeException(message, failure)), getSelf());
1159     }
1160
1161     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1162             String leaderPath, boolean removeShardOnFailure) {
1163         String shardName = shardInfo.getShardName();
1164         shardReplicaOperationsInProgress.remove(shardName);
1165
1166         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1167
1168         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1169             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1170
1171             // Make the local shard voting capable
1172             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1173             shardInfo.setActiveMember(true);
1174             persistShardList();
1175
1176             mBean.addLocalShard(shardInfo.getShardId().toString());
1177             sender.tell(new akka.actor.Status.Success(null), getSelf());
1178         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1179             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1180         } else {
1181             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1182                     persistenceId(), shardName, replyMsg.getStatus());
1183
1184             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1185
1186             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1187         }
1188     }
1189
1190     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1191                                                String leaderPath, ShardIdentifier shardId) {
1192         Exception failure;
1193         switch (serverChangeStatus) {
1194             case TIMEOUT:
1195                 failure = new TimeoutException(String.format(
1196                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1197                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1198                         leaderPath, shardId.getShardName()));
1199                 break;
1200             case NO_LEADER:
1201                 failure = createNoShardLeaderException(shardId);
1202                 break;
1203             case NOT_SUPPORTED:
1204                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1205                         serverChange.getSimpleName(), shardId.getShardName()));
1206                 break;
1207             default :
1208                 failure = new RuntimeException(String.format(
1209                         "%s request to leader %s for shard %s failed with status %s",
1210                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1211         }
1212         return failure;
1213     }
1214
1215     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1216         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1217
1218         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1219                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1220             @Override
1221             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1222                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1223             }
1224
1225             @Override
1226             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1227                 getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
1228             }
1229         });
1230     }
1231
1232     private void persistShardList() {
1233         List<String> shardList = new ArrayList<>(localShards.keySet());
1234         for (ShardInformation shardInfo : localShards.values()) {
1235             if (!shardInfo.isActiveMember()) {
1236                 shardList.remove(shardInfo.getShardName());
1237             }
1238         }
1239         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1240         saveSnapshot(updateShardManagerSnapshot(shardList));
1241     }
1242
1243     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1244         currentSnapshot = new ShardManagerSnapshot(shardList);
1245         return currentSnapshot;
1246     }
1247
1248     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1249         currentSnapshot = snapshot;
1250
1251         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1252
1253         String currentMember = cluster.getCurrentMemberName();
1254         Set<String> configuredShardList =
1255             new HashSet<>(configuration.getMemberShardNames(currentMember));
1256         for (String shard : currentSnapshot.getShardList()) {
1257             if (!configuredShardList.contains(shard)) {
1258                 // add the current member as a replica for the shard
1259                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1260                 configuration.addMemberReplicaForShard(shard, currentMember);
1261             } else {
1262                 configuredShardList.remove(shard);
1263             }
1264         }
1265         for (String shard : configuredShardList) {
1266             // remove the member as a replica for the shard
1267             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1268             configuration.removeMemberReplicaForShard(shard, currentMember);
1269         }
1270     }
1271
1272     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1273         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1274             persistenceId());
1275         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1276             0, 0));
1277     }
1278
1279     private static class ForwardedAddServerReply {
1280         ShardInformation shardInfo;
1281         AddServerReply addServerReply;
1282         String leaderPath;
1283         boolean removeShardOnFailure;
1284
1285         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1286                 boolean removeShardOnFailure) {
1287             this.shardInfo = shardInfo;
1288             this.addServerReply = addServerReply;
1289             this.leaderPath = leaderPath;
1290             this.removeShardOnFailure = removeShardOnFailure;
1291         }
1292     }
1293
1294     private static class ForwardedAddServerFailure {
1295         String shardName;
1296         String failureMessage;
1297         Throwable failure;
1298         boolean removeShardOnFailure;
1299
1300         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1301                 boolean removeShardOnFailure) {
1302             this.shardName = shardName;
1303             this.failureMessage = failureMessage;
1304             this.failure = failure;
1305             this.removeShardOnFailure = removeShardOnFailure;
1306         }
1307     }
1308
1309     @VisibleForTesting
1310     protected static class ShardInformation {
1311         private final ShardIdentifier shardId;
1312         private final String shardName;
1313         private ActorRef actor;
1314         private final Map<String, String> initialPeerAddresses;
1315         private Optional<DataTree> localShardDataTree;
1316         private boolean leaderAvailable = false;
1317
1318         // flag that determines if the actor is ready for business
1319         private boolean actorInitialized = false;
1320
1321         private boolean followerSyncStatus = false;
1322
1323         private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
1324         private String role ;
1325         private String leaderId;
1326         private short leaderVersion;
1327
1328         private DatastoreContext datastoreContext;
1329         private Shard.AbstractBuilder<?, ?> builder;
1330         private final ShardPeerAddressResolver addressResolver;
1331         private boolean isActiveMember = true;
1332
1333         private ShardInformation(String shardName, ShardIdentifier shardId,
1334                 Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
1335                 Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
1336             this.shardName = shardName;
1337             this.shardId = shardId;
1338             this.initialPeerAddresses = initialPeerAddresses;
1339             this.datastoreContext = datastoreContext;
1340             this.builder = builder;
1341             this.addressResolver = addressResolver;
1342         }
1343
1344         Props newProps(SchemaContext schemaContext) {
1345             Preconditions.checkNotNull(builder);
1346             Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
1347                     schemaContext(schemaContext).props();
1348             builder = null;
1349             return props;
1350         }
1351
1352         String getShardName() {
1353             return shardName;
1354         }
1355
1356         @Nullable
1357         ActorRef getActor(){
1358             return actor;
1359         }
1360
1361         void setActor(ActorRef actor) {
1362             this.actor = actor;
1363         }
1364
1365         ShardIdentifier getShardId() {
1366             return shardId;
1367         }
1368
1369         void setLocalDataTree(Optional<DataTree> localShardDataTree) {
1370             this.localShardDataTree = localShardDataTree;
1371         }
1372
1373         Optional<DataTree> getLocalShardDataTree() {
1374             return localShardDataTree;
1375         }
1376
1377         DatastoreContext getDatastoreContext() {
1378             return datastoreContext;
1379         }
1380
1381         void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
1382             this.datastoreContext = datastoreContext;
1383             if (actor != null) {
1384                 LOG.debug ("Sending new DatastoreContext to {}", shardId);
1385                 actor.tell(this.datastoreContext, sender);
1386             }
1387         }
1388
1389         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
1390             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
1391
1392             if(actor != null) {
1393                 if(LOG.isDebugEnabled()) {
1394                     LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
1395                             peerId, peerAddress, actor.path());
1396                 }
1397
1398                 actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
1399             }
1400
1401             notifyOnShardInitializedCallbacks();
1402         }
1403
1404         void peerDown(String memberName, String peerId, ActorRef sender) {
1405             if(actor != null) {
1406                 actor.tell(new PeerDown(memberName, peerId), sender);
1407             }
1408         }
1409
1410         void peerUp(String memberName, String peerId, ActorRef sender) {
1411             if(actor != null) {
1412                 actor.tell(new PeerUp(memberName, peerId), sender);
1413             }
1414         }
1415
1416         boolean isShardReady() {
1417             return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
1418         }
1419
1420         boolean isShardReadyWithLeaderId() {
1421             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
1422                     (isLeader() || addressResolver.resolve(leaderId) != null);
1423         }
1424
1425         boolean isShardInitialized() {
1426             return getActor() != null && actorInitialized;
1427         }
1428
1429         boolean isLeader() {
1430             return Objects.equal(leaderId, shardId.toString());
1431         }
1432
1433         String getSerializedLeaderActor() {
1434             if(isLeader()) {
1435                 return Serialization.serializedActorPath(getActor());
1436             } else {
1437                 return addressResolver.resolve(leaderId);
1438             }
1439         }
1440
1441         void setActorInitialized() {
1442             LOG.debug("Shard {} is initialized", shardId);
1443
1444             this.actorInitialized = true;
1445
1446             notifyOnShardInitializedCallbacks();
1447         }
1448
1449         private void notifyOnShardInitializedCallbacks() {
1450             if(onShardInitializedSet.isEmpty()) {
1451                 return;
1452             }
1453
1454             boolean ready = isShardReadyWithLeaderId();
1455
1456             if(LOG.isDebugEnabled()) {
1457                 LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
1458                         ready ? "ready" : "initialized", onShardInitializedSet.size());
1459             }
1460
1461             Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
1462             while(iter.hasNext()) {
1463                 OnShardInitialized onShardInitialized = iter.next();
1464                 if(!(onShardInitialized instanceof OnShardReady) || ready) {
1465                     iter.remove();
1466                     onShardInitialized.getTimeoutSchedule().cancel();
1467                     onShardInitialized.getReplyRunnable().run();
1468                 }
1469             }
1470         }
1471
1472         void addOnShardInitialized(OnShardInitialized onShardInitialized) {
1473             onShardInitializedSet.add(onShardInitialized);
1474         }
1475
1476         void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
1477             onShardInitializedSet.remove(onShardInitialized);
1478         }
1479
1480         void setRole(String newRole) {
1481             this.role = newRole;
1482
1483             notifyOnShardInitializedCallbacks();
1484         }
1485
1486         void setFollowerSyncStatus(boolean syncStatus){
1487             this.followerSyncStatus = syncStatus;
1488         }
1489
1490         boolean isInSync(){
1491             if(RaftState.Follower.name().equals(this.role)){
1492                 return followerSyncStatus;
1493             } else if(RaftState.Leader.name().equals(this.role)){
1494                 return true;
1495             }
1496
1497             return false;
1498         }
1499
1500         boolean setLeaderId(String leaderId) {
1501             boolean changed = !Objects.equal(this.leaderId, leaderId);
1502             this.leaderId = leaderId;
1503             if(leaderId != null) {
1504                 this.leaderAvailable = true;
1505             }
1506             notifyOnShardInitializedCallbacks();
1507
1508             return changed;
1509         }
1510
1511         String getLeaderId() {
1512             return leaderId;
1513         }
1514
1515         void setLeaderAvailable(boolean leaderAvailable) {
1516             this.leaderAvailable = leaderAvailable;
1517
1518             if(leaderAvailable) {
1519                 notifyOnShardInitializedCallbacks();
1520             }
1521         }
1522
1523         short getLeaderVersion() {
1524             return leaderVersion;
1525         }
1526
1527         void setLeaderVersion(short leaderVersion) {
1528             this.leaderVersion = leaderVersion;
1529         }
1530
1531         boolean isActiveMember() {
1532             return isActiveMember;
1533         }
1534
1535         void setActiveMember(boolean isActiveMember) {
1536             this.isActiveMember = isActiveMember;
1537         }
1538     }
1539
1540     private static class OnShardInitialized {
1541         private final Runnable replyRunnable;
1542         private Cancellable timeoutSchedule;
1543
1544         OnShardInitialized(Runnable replyRunnable) {
1545             this.replyRunnable = replyRunnable;
1546         }
1547
1548         Runnable getReplyRunnable() {
1549             return replyRunnable;
1550         }
1551
1552         Cancellable getTimeoutSchedule() {
1553             return timeoutSchedule;
1554         }
1555
1556         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1557             this.timeoutSchedule = timeoutSchedule;
1558         }
1559     }
1560
1561     private static class OnShardReady extends OnShardInitialized {
1562         OnShardReady(Runnable replyRunnable) {
1563             super(replyRunnable);
1564         }
1565     }
1566
1567     private static class ShardNotInitializedTimeout {
1568         private final ActorRef sender;
1569         private final ShardInformation shardInfo;
1570         private final OnShardInitialized onShardInitialized;
1571
1572         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1573             this.sender = sender;
1574             this.shardInfo = shardInfo;
1575             this.onShardInitialized = onShardInitialized;
1576         }
1577
1578         ActorRef getSender() {
1579             return sender;
1580         }
1581
1582         ShardInformation getShardInfo() {
1583             return shardInfo;
1584         }
1585
1586         OnShardInitialized getOnShardInitialized() {
1587             return onShardInitialized;
1588         }
1589     }
1590
1591     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1592         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1593                 getShardInitializationTimeout().duration().$times(2));
1594
1595
1596         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1597         futureObj.onComplete(new OnComplete<Object>() {
1598             @Override
1599             public void onComplete(Throwable failure, Object response) {
1600                 if (failure != null) {
1601                     handler.onFailure(failure);
1602                 } else {
1603                     if(response instanceof RemotePrimaryShardFound) {
1604                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1605                     } else if(response instanceof LocalPrimaryShardFound) {
1606                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1607                     } else {
1608                         handler.onUnknownResponse(response);
1609                     }
1610                 }
1611             }
1612         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1613     }
1614
1615     /**
1616      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1617      * a remote or local find primary message is processed
1618      */
1619     private static interface FindPrimaryResponseHandler {
1620         /**
1621          * Invoked when a Failure message is received as a response
1622          *
1623          * @param failure
1624          */
1625         void onFailure(Throwable failure);
1626
1627         /**
1628          * Invoked when a RemotePrimaryShardFound response is received
1629          *
1630          * @param response
1631          */
1632         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1633
1634         /**
1635          * Invoked when a LocalPrimaryShardFound response is received
1636          * @param response
1637          */
1638         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1639
1640         /**
1641          * Invoked when an unknown response is received. This is another type of failure.
1642          *
1643          * @param response
1644          */
1645         void onUnknownResponse(Object response);
1646     }
1647
1648     /**
1649      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1650      * replica and sends a wrapped Failure response to some targetActor
1651      */
1652     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1653         private final ActorRef targetActor;
1654         private final String shardName;
1655         private final String persistenceId;
1656         private final ActorRef shardManagerActor;
1657
1658         /**
1659          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1660          * @param shardName The name of the shard for which the primary replica had to be found
1661          * @param persistenceId The persistenceId for the ShardManager
1662          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1663          */
1664         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1665             this.targetActor = Preconditions.checkNotNull(targetActor);
1666             this.shardName = Preconditions.checkNotNull(shardName);
1667             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1668             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1669         }
1670
1671         public ActorRef getTargetActor() {
1672             return targetActor;
1673         }
1674
1675         public String getShardName() {
1676             return shardName;
1677         }
1678
1679         @Override
1680         public void onFailure(Throwable failure) {
1681             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1682             targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
1683                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1684         }
1685
1686         @Override
1687         public void onUnknownResponse(Object response) {
1688             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1689                     shardName, response);
1690             LOG.debug ("{}: {}", persistenceId, msg);
1691             targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
1692                     new RuntimeException(msg)), shardManagerActor);
1693         }
1694     }
1695
1696
1697     /**
1698      * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
1699      * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
1700      * as a successful response to find primary.
1701      */
1702     private static class PrimaryShardFoundForContext {
1703         private final String shardName;
1704         private final Object contextMessage;
1705         private final RemotePrimaryShardFound remotePrimaryShardFound;
1706         private final LocalPrimaryShardFound localPrimaryShardFound;
1707
1708         public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
1709                 @Nonnull Object primaryFoundMessage) {
1710             this.shardName = Preconditions.checkNotNull(shardName);
1711             this.contextMessage = Preconditions.checkNotNull(contextMessage);
1712             Preconditions.checkNotNull(primaryFoundMessage);
1713             this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
1714                     (RemotePrimaryShardFound) primaryFoundMessage : null;
1715             this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
1716                     (LocalPrimaryShardFound) primaryFoundMessage : null;
1717         }
1718
1719         @Nonnull
1720         String getPrimaryPath(){
1721             if(remotePrimaryShardFound != null) {
1722                 return remotePrimaryShardFound.getPrimaryPath();
1723             }
1724             return localPrimaryShardFound.getPrimaryPath();
1725         }
1726
1727         @Nonnull
1728         Object getContextMessage() {
1729             return contextMessage;
1730         }
1731
1732         @Nullable
1733         RemotePrimaryShardFound getRemotePrimaryShardFound() {
1734             return remotePrimaryShardFound;
1735         }
1736
1737         @Nonnull
1738         String getShardName() {
1739             return shardName;
1740         }
1741     }
1742
1743     /**
1744      * The WrappedShardResponse class wraps a response from a Shard.
1745      */
1746     private static class WrappedShardResponse {
1747         private final ShardIdentifier shardId;
1748         private final Object response;
1749         private final String leaderPath;
1750
1751         private WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1752             this.shardId = shardId;
1753             this.response = response;
1754             this.leaderPath = leaderPath;
1755         }
1756
1757         ShardIdentifier getShardId() {
1758             return shardId;
1759         }
1760
1761         Object getResponse() {
1762             return response;
1763         }
1764
1765         String getLeaderPath() {
1766             return leaderPath;
1767         }
1768     }
1769 }
1770
1771
1772