Fix incorrect remove call in ShardManager
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Status;
18 import akka.actor.SupervisorStrategy;
19 import akka.actor.SupervisorStrategy.Directive;
20 import akka.cluster.ClusterEvent;
21 import akka.cluster.ClusterEvent.MemberWeaklyUp;
22 import akka.cluster.Member;
23 import akka.dispatch.Futures;
24 import akka.dispatch.OnComplete;
25 import akka.japi.Function;
26 import akka.pattern.Patterns;
27 import akka.persistence.RecoveryCompleted;
28 import akka.persistence.SaveSnapshotFailure;
29 import akka.persistence.SaveSnapshotSuccess;
30 import akka.persistence.SnapshotOffer;
31 import akka.persistence.SnapshotSelectionCriteria;
32 import akka.util.Timeout;
33 import com.google.common.annotations.VisibleForTesting;
34 import com.google.common.base.Preconditions;
35 import java.io.ByteArrayInputStream;
36 import java.io.ObjectInputStream;
37 import java.util.ArrayList;
38 import java.util.Collection;
39 import java.util.Collections;
40 import java.util.HashMap;
41 import java.util.HashSet;
42 import java.util.List;
43 import java.util.Map;
44 import java.util.Map.Entry;
45 import java.util.Set;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import java.util.function.Consumer;
50 import java.util.function.Supplier;
51 import org.apache.commons.lang3.SerializationUtils;
52 import org.opendaylight.controller.cluster.access.concepts.MemberName;
53 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
54 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
55 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
56 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
57 import org.opendaylight.controller.cluster.datastore.Shard;
58 import org.opendaylight.controller.cluster.datastore.config.Configuration;
59 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
60 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
61 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
63 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
64 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
65 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
66 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
67 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
68 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
69 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
70 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
71 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
72 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
73 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
75 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
76 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
77 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
78 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
79 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
80 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
81 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
82 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
83 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
84 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
85 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
86 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
87 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
88 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
89 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
90 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
91 import org.opendaylight.controller.cluster.raft.messages.AddServer;
92 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
93 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
94 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
95 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
96 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
97 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
98 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
99 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
100 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
101 import org.slf4j.Logger;
102 import org.slf4j.LoggerFactory;
103 import scala.concurrent.ExecutionContext;
104 import scala.concurrent.Future;
105 import scala.concurrent.duration.Duration;
106 import scala.concurrent.duration.FiniteDuration;
107
108 /**
109  * The ShardManager has the following jobs,
110  * <ul>
111  * <li> Create all the local shard replicas that belong on this cluster member
112  * <li> Find the address of the local shard
113  * <li> Find the primary replica for any given shard
114  * <li> Monitor the cluster members and store their addresses
115  * <ul>
116  */
117 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
118
119     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
120
121     // Stores a mapping between a shard name and it's corresponding information
122     // Shard names look like inventory, topology etc and are as specified in
123     // configuration
124     private final Map<String, ShardInformation> localShards = new HashMap<>();
125
126     // The type of a ShardManager reflects the type of the datastore itself
127     // A data store could be of type config/operational
128     private final String type;
129
130     private final ClusterWrapper cluster;
131
132     private final Configuration configuration;
133
134     private final String shardDispatcherPath;
135
136     private final ShardManagerInfo mBean;
137
138     private DatastoreContextFactory datastoreContextFactory;
139
140     private final CountDownLatch waitTillReadyCountdownLatch;
141
142     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
143
144     private final ShardPeerAddressResolver peerAddressResolver;
145
146     private SchemaContext schemaContext;
147
148     private DatastoreSnapshot restoreFromSnapshot;
149
150     private ShardManagerSnapshot currentSnapshot;
151
152     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
153
154     private final String persistenceId;
155
156     ShardManager(AbstractShardManagerCreator<?> builder) {
157         this.cluster = builder.getCluster();
158         this.configuration = builder.getConfiguration();
159         this.datastoreContextFactory = builder.getDatastoreContextFactory();
160         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
161         this.shardDispatcherPath =
162                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
163         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
164         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
165         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
166
167         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
168         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
169
170         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
171
172         // Subscribe this actor to cluster member events
173         cluster.subscribeToMemberEvents(getSelf());
174
175         mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
176                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
177         mBean.registerMBean();
178     }
179
180     @Override
181     public void postStop() {
182         LOG.info("Stopping ShardManager {}", persistenceId());
183
184         mBean.unregisterMBean();
185     }
186
187     @Override
188     public void handleCommand(Object message) throws Exception {
189         if (message  instanceof FindPrimary) {
190             findPrimary((FindPrimary)message);
191         } else if(message instanceof FindLocalShard){
192             findLocalShard((FindLocalShard) message);
193         } else if (message instanceof UpdateSchemaContext) {
194             updateSchemaContext(message);
195         } else if(message instanceof ActorInitialized) {
196             onActorInitialized(message);
197         } else if (message instanceof ClusterEvent.MemberUp){
198             memberUp((ClusterEvent.MemberUp) message);
199         } else if (message instanceof ClusterEvent.MemberWeaklyUp){
200             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
201         } else if (message instanceof ClusterEvent.MemberExited){
202             memberExited((ClusterEvent.MemberExited) message);
203         } else if(message instanceof ClusterEvent.MemberRemoved) {
204             memberRemoved((ClusterEvent.MemberRemoved) message);
205         } else if(message instanceof ClusterEvent.UnreachableMember) {
206             memberUnreachable((ClusterEvent.UnreachableMember)message);
207         } else if(message instanceof ClusterEvent.ReachableMember) {
208             memberReachable((ClusterEvent.ReachableMember) message);
209         } else if(message instanceof DatastoreContextFactory) {
210             onDatastoreContextFactory((DatastoreContextFactory)message);
211         } else if(message instanceof RoleChangeNotification) {
212             onRoleChangeNotification((RoleChangeNotification) message);
213         } else if(message instanceof FollowerInitialSyncUpStatus){
214             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
215         } else if(message instanceof ShardNotInitializedTimeout) {
216             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
217         } else if(message instanceof ShardLeaderStateChanged) {
218             onLeaderStateChanged((ShardLeaderStateChanged) message);
219         } else if(message instanceof SwitchShardBehavior){
220             onSwitchShardBehavior((SwitchShardBehavior) message);
221         } else if(message instanceof CreateShard) {
222             onCreateShard((CreateShard)message);
223         } else if(message instanceof AddShardReplica){
224             onAddShardReplica((AddShardReplica)message);
225         } else if(message instanceof ForwardedAddServerReply) {
226             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
227             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
228                     msg.removeShardOnFailure);
229         } else if(message instanceof ForwardedAddServerFailure) {
230             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
231             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
232         } else if(message instanceof RemoveShardReplica) {
233             onRemoveShardReplica((RemoveShardReplica) message);
234         } else if(message instanceof WrappedShardResponse){
235             onWrappedShardResponse((WrappedShardResponse) message);
236         } else if(message instanceof GetSnapshot) {
237             onGetSnapshot();
238         } else if(message instanceof ServerRemoved){
239             onShardReplicaRemoved((ServerRemoved) message);
240         } else if(message instanceof ChangeShardMembersVotingStatus){
241             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
242         } else if(message instanceof FlipShardMembersVotingStatus){
243             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
244         } else if(message instanceof SaveSnapshotSuccess) {
245             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
246         } else if(message instanceof SaveSnapshotFailure) {
247             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
248                     persistenceId(), ((SaveSnapshotFailure) message).cause());
249         } else if(message instanceof Shutdown) {
250             onShutDown();
251         } else if (message instanceof GetLocalShardIds) {
252             onGetLocalShardIds();
253         } else if(message instanceof RunnableMessage) {
254             ((RunnableMessage)message).run();
255         } else {
256             unknownMessage(message);
257         }
258     }
259
260     private void onShutDown() {
261         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
262         for (ShardInformation info : localShards.values()) {
263             if (info.getActor() != null) {
264                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
265
266                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
267                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
268             }
269         }
270
271         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
272
273         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
274         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
275
276         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
277             @Override
278             public void onComplete(Throwable failure, Iterable<Boolean> results) {
279                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
280
281                 self().tell(PoisonPill.getInstance(), self());
282
283                 if(failure != null) {
284                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
285                 } else {
286                     int nfailed = 0;
287                     for(Boolean r: results) {
288                         if(!r) {
289                             nfailed++;
290                         }
291                     }
292
293                     if(nfailed > 0) {
294                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
295                     }
296                 }
297             }
298         }, dispatcher);
299     }
300
301     private void onWrappedShardResponse(WrappedShardResponse message) {
302         if (message.getResponse() instanceof RemoveServerReply) {
303             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
304                     message.getLeaderPath());
305         }
306     }
307
308     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
309             String leaderPath) {
310         shardReplicaOperationsInProgress.remove(shardId.getShardName());
311
312         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
313
314         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
315             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
316                     shardId.getShardName());
317             originalSender.tell(new Status.Success(null), getSelf());
318         } else {
319             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
320                     persistenceId(), shardId, replyMsg.getStatus());
321
322             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
323             originalSender.tell(new Status.Failure(failure), getSelf());
324         }
325     }
326
327     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
328             final ActorRef sender) {
329         if(isShardReplicaOperationInProgress(shardName, sender)) {
330             return;
331         }
332
333         shardReplicaOperationsInProgress.add(shardName);
334
335         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
336
337         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
338
339         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
340         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
341                 primaryPath, shardId);
342
343         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
344                 duration());
345         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
346                 new RemoveServer(shardId.toString()), removeServerTimeout);
347
348         futureObj.onComplete(new OnComplete<Object>() {
349             @Override
350             public void onComplete(Throwable failure, Object response) {
351                 if (failure != null) {
352                     shardReplicaOperationsInProgress.remove(shardName);
353                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
354                             primaryPath, shardName);
355
356                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
357
358                     // FAILURE
359                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
360                 } else {
361                     // SUCCESS
362                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
363                 }
364             }
365         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
366     }
367
368     private void onShardReplicaRemoved(ServerRemoved message) {
369         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
370         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
371         if(shardInformation == null) {
372             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
373             return;
374         } else if(shardInformation.getActor() != null) {
375             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
376             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
377         }
378         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
379         persistShardList();
380     }
381
382     private void onGetSnapshot() {
383         LOG.debug("{}: onGetSnapshot", persistenceId());
384
385         List<String> notInitialized = null;
386         for(ShardInformation shardInfo: localShards.values()) {
387             if(!shardInfo.isShardInitialized()) {
388                 if(notInitialized == null) {
389                     notInitialized = new ArrayList<>();
390                 }
391
392                 notInitialized.add(shardInfo.getShardName());
393             }
394         }
395
396         if(notInitialized != null) {
397             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
398                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
399             return;
400         }
401
402         byte[] shardManagerSnapshot = null;
403         if(currentSnapshot != null) {
404             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
405         }
406
407         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
408                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
409                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
410
411         for(ShardInformation shardInfo: localShards.values()) {
412             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
413         }
414     }
415
416     private void onCreateShard(CreateShard createShard) {
417         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
418
419         Object reply;
420         try {
421             String shardName = createShard.getModuleShardConfig().getShardName();
422             if(localShards.containsKey(shardName)) {
423                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
424                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
425             } else {
426                 doCreateShard(createShard);
427                 reply = new Status.Success(null);
428             }
429         } catch (Exception e) {
430             LOG.error("{}: onCreateShard failed", persistenceId(), e);
431             reply = new Status.Failure(e);
432         }
433
434         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
435             getSender().tell(reply, getSelf());
436         }
437     }
438
439     private void doCreateShard(CreateShard createShard) {
440         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
441         String shardName = moduleShardConfig.getShardName();
442
443         configuration.addModuleShardConfiguration(moduleShardConfig);
444
445         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
446         if(shardDatastoreContext == null) {
447             shardDatastoreContext = newShardDatastoreContext(shardName);
448         } else {
449             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
450                     peerAddressResolver).build();
451         }
452
453         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
454
455         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
456                 currentSnapshot.getShardList().contains(shardName);
457
458         Map<String, String> peerAddresses;
459         boolean isActiveMember;
460         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
461                 contains(cluster.getCurrentMemberName())) {
462             peerAddresses = getPeerAddresses(shardName);
463             isActiveMember = true;
464         } else {
465             // The local member is not in the static shard member configuration and the shard did not
466             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
467             // the shard with no peers and with elections disabled so it stays as follower. A
468             // subsequent AddServer request will be needed to make it an active member.
469             isActiveMember = false;
470             peerAddresses = Collections.emptyMap();
471             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
472                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
473         }
474
475         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
476                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
477                 isActiveMember);
478
479         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
480                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
481         info.setActiveMember(isActiveMember);
482         localShards.put(info.getShardName(), info);
483
484         if(schemaContext != null) {
485             info.setActor(newShardActor(schemaContext, info));
486         }
487     }
488
489     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
490         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
491                 shardPeerAddressResolver(peerAddressResolver);
492     }
493
494     private DatastoreContext newShardDatastoreContext(String shardName) {
495         return newShardDatastoreContextBuilder(shardName).build();
496     }
497
498     private void checkReady(){
499         if (isReadyWithLeaderId()) {
500             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
501                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
502
503             waitTillReadyCountdownLatch.countDown();
504         }
505     }
506
507     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
508         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
509
510         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
511         if(shardInformation != null) {
512             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
513             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
514             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
515                 primaryShardInfoCache.remove(shardInformation.getShardName());
516             }
517
518             checkReady();
519         } else {
520             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
521         }
522     }
523
524     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
525         ShardInformation shardInfo = message.getShardInfo();
526
527         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
528                 shardInfo.getShardName());
529
530         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
531
532         if(!shardInfo.isShardInitialized()) {
533             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
534             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
535         } else {
536             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
537             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
538         }
539     }
540
541     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
542         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
543                 status.getName(), status.isInitialSyncDone());
544
545         ShardInformation shardInformation = findShardInformation(status.getName());
546
547         if(shardInformation != null) {
548             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
549
550             mBean.setSyncStatus(isInSync());
551         }
552
553     }
554
555     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
556         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
557                 roleChanged.getOldRole(), roleChanged.getNewRole());
558
559         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
560         if(shardInformation != null) {
561             shardInformation.setRole(roleChanged.getNewRole());
562             checkReady();
563             mBean.setSyncStatus(isInSync());
564         }
565     }
566
567
568     private ShardInformation findShardInformation(String memberId) {
569         for(ShardInformation info : localShards.values()){
570             if(info.getShardId().toString().equals(memberId)){
571                 return info;
572             }
573         }
574
575         return null;
576     }
577
578     private boolean isReadyWithLeaderId() {
579         boolean isReady = true;
580         for (ShardInformation info : localShards.values()) {
581             if(!info.isShardReadyWithLeaderId()){
582                 isReady = false;
583                 break;
584             }
585         }
586         return isReady;
587     }
588
589     private boolean isInSync(){
590         for (ShardInformation info : localShards.values()) {
591             if(!info.isInSync()){
592                 return false;
593             }
594         }
595         return true;
596     }
597
598     private void onActorInitialized(Object message) {
599         final ActorRef sender = getSender();
600
601         if (sender == null) {
602             return; //why is a non-actor sending this message? Just ignore.
603         }
604
605         String actorName = sender.path().name();
606         //find shard name from actor name; actor name is stringified shardId
607
608         final ShardIdentifier shardId;
609         try {
610             shardId = ShardIdentifier.fromShardIdString(actorName);
611         } catch (IllegalArgumentException e) {
612             LOG.debug("{}: ignoring actor {}", actorName, e);
613             return;
614         }
615
616         markShardAsInitialized(shardId.getShardName());
617     }
618
619     private void markShardAsInitialized(String shardName) {
620         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
621
622         ShardInformation shardInformation = localShards.get(shardName);
623         if (shardInformation != null) {
624             shardInformation.setActorInitialized();
625
626             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
627         }
628     }
629
630     @Override
631     protected void handleRecover(Object message) throws Exception {
632         if (message instanceof RecoveryCompleted) {
633             onRecoveryCompleted();
634         } else if (message instanceof SnapshotOffer) {
635             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
636         }
637     }
638
639     private void onRecoveryCompleted() {
640         LOG.info("Recovery complete : {}", persistenceId());
641
642         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
643         // journal on upgrade from Helium.
644         deleteMessages(lastSequenceNr());
645
646         if(currentSnapshot == null && restoreFromSnapshot != null &&
647                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
648             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
649                     restoreFromSnapshot.getShardManagerSnapshot()))) {
650                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
651
652                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
653
654                 applyShardManagerSnapshot(snapshot);
655             } catch(Exception e) {
656                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
657             }
658         }
659
660         createLocalShards();
661     }
662
663     private void findLocalShard(FindLocalShard message) {
664         final ShardInformation shardInformation = localShards.get(message.getShardName());
665
666         if(shardInformation == null){
667             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
668             return;
669         }
670
671         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor()));
672     }
673
674     private void sendResponse(ShardInformation shardInformation, boolean doWait,
675             boolean wantShardReady, final Supplier<Object> messageSupplier) {
676         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
677             if(doWait) {
678                 final ActorRef sender = getSender();
679                 final ActorRef self = self();
680
681                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
682
683                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
684                     new OnShardInitialized(replyRunnable);
685
686                 shardInformation.addOnShardInitialized(onShardInitialized);
687
688                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
689                 if(shardInformation.isShardInitialized()) {
690                     // If the shard is already initialized then we'll wait enough time for the shard to
691                     // elect a leader, ie 2 times the election timeout.
692                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
693                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
694                 }
695
696                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
697                         shardInformation.getShardName());
698
699                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
700                         timeout, getSelf(),
701                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
702                         getContext().dispatcher(), getSelf());
703
704                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
705
706             } else if (!shardInformation.isShardInitialized()) {
707                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
708                         shardInformation.getShardName());
709                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
710             } else {
711                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
712                         shardInformation.getShardName());
713                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
714             }
715
716             return;
717         }
718
719         getSender().tell(messageSupplier.get(), getSelf());
720     }
721
722     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
723         return new NoShardLeaderException(null, shardId.toString());
724     }
725
726     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
727         return new NotInitializedException(String.format(
728                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
729     }
730
731     @VisibleForTesting
732     static MemberName memberToName(final Member member) {
733         return MemberName.forName(member.roles().iterator().next());
734     }
735
736     private void memberRemoved(ClusterEvent.MemberRemoved message) {
737         MemberName memberName = memberToName(message.member());
738
739         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
740                 message.member().address());
741
742         peerAddressResolver.removePeerAddress(memberName);
743
744         for(ShardInformation info : localShards.values()){
745             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
746         }
747     }
748
749     private void memberExited(ClusterEvent.MemberExited message) {
750         MemberName memberName = memberToName(message.member());
751
752         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
753                 message.member().address());
754
755         peerAddressResolver.removePeerAddress(memberName);
756
757         for(ShardInformation info : localShards.values()){
758             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
759         }
760     }
761
762     private void memberUp(ClusterEvent.MemberUp message) {
763         MemberName memberName = memberToName(message.member());
764
765         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
766                 message.member().address());
767
768         memberUp(memberName, message.member().address());
769     }
770
771     private void memberUp(MemberName memberName, Address address) {
772         addPeerAddress(memberName, address);
773         checkReady();
774     }
775
776     private void memberWeaklyUp(MemberWeaklyUp message) {
777         MemberName memberName = memberToName(message.member());
778
779         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
780                 message.member().address());
781
782         memberUp(memberName, message.member().address());
783     }
784
785     private void addPeerAddress(MemberName memberName, Address address) {
786         peerAddressResolver.addPeerAddress(memberName, address);
787
788         for(ShardInformation info : localShards.values()){
789             String shardName = info.getShardName();
790             String peerId = getShardIdentifier(memberName, shardName).toString();
791             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
792
793             info.peerUp(memberName, peerId, getSelf());
794         }
795     }
796
797     private void memberReachable(ClusterEvent.ReachableMember message) {
798         MemberName memberName = memberToName(message.member());
799         LOG.info("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
800
801         addPeerAddress(memberName, message.member().address());
802
803         markMemberAvailable(memberName);
804     }
805
806     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
807         MemberName memberName = memberToName(message.member());
808         LOG.info("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
809
810         markMemberUnavailable(memberName);
811     }
812
813     private void markMemberUnavailable(final MemberName memberName) {
814         final String memberStr = memberName.getName();
815         for (ShardInformation info : localShards.values()) {
816             String leaderId = info.getLeaderId();
817             // XXX: why are we using String#contains() here?
818             if (leaderId != null && leaderId.contains(memberStr)) {
819                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
820                 info.setLeaderAvailable(false);
821
822                 primaryShardInfoCache.remove(info.getShardName());
823             }
824
825             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
826         }
827     }
828
829     private void markMemberAvailable(final MemberName memberName) {
830         final String memberStr = memberName.getName();
831         for (ShardInformation info : localShards.values()) {
832             String leaderId = info.getLeaderId();
833             // XXX: why are we using String#contains() here?
834             if (leaderId != null && leaderId.contains(memberStr)) {
835                 LOG.debug("Marking Leader {} as available.", leaderId);
836                 info.setLeaderAvailable(true);
837             }
838
839             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
840         }
841     }
842
843     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
844         datastoreContextFactory = factory;
845         for (ShardInformation info : localShards.values()) {
846             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
847         }
848     }
849
850     private void onGetLocalShardIds() {
851         final List<String> response = new ArrayList<>(localShards.size());
852
853         for (ShardInformation info : localShards.values()) {
854             response.add(info.getShardId().toString());
855         }
856
857         getSender().tell(new Status.Success(response), getSelf());
858     }
859
860     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
861         final ShardIdentifier identifier = message.getShardId();
862
863         if (identifier != null) {
864             final ShardInformation info = localShards.get(identifier.getShardName());
865             if (info == null) {
866                 getSender().tell(new Status.Failure(
867                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
868                 return;
869             }
870
871             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
872         } else {
873             for (ShardInformation info : localShards.values()) {
874                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
875             }
876         }
877
878         getSender().tell(new Status.Success(null), getSelf());
879     }
880
881     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
882         final ActorRef actor = info.getActor();
883         if (actor != null) {
884             actor.tell(switchBehavior, getSelf());
885           } else {
886             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
887                 info.getShardName(), switchBehavior.getNewState());
888         }
889     }
890
891     /**
892      * Notifies all the local shards of a change in the schema context
893      *
894      * @param message
895      */
896     private void updateSchemaContext(final Object message) {
897         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
898
899         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
900
901         for (ShardInformation info : localShards.values()) {
902             if (info.getActor() == null) {
903                 LOG.debug("Creating Shard {}", info.getShardId());
904                 info.setActor(newShardActor(schemaContext, info));
905             } else {
906                 info.getActor().tell(message, getSelf());
907             }
908         }
909     }
910
911     @VisibleForTesting
912     protected ClusterWrapper getCluster() {
913         return cluster;
914     }
915
916     @VisibleForTesting
917     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
918         return getContext().actorOf(info.newProps(schemaContext)
919                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
920     }
921
922     private void findPrimary(FindPrimary message) {
923         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
924
925         final String shardName = message.getShardName();
926         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
927
928         // First see if the there is a local replica for the shard
929         final ShardInformation info = localShards.get(shardName);
930         if (info != null && info.isActiveMember()) {
931             sendResponse(info, message.isWaitUntilReady(), true, () -> {
932                 String primaryPath = info.getSerializedLeaderActor();
933                 Object found = canReturnLocalShardState && info.isLeader() ?
934                         new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
935                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
936
937                         if(LOG.isDebugEnabled()) {
938                             LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
939                         }
940
941                         return found;
942             });
943
944             return;
945         }
946
947         final Collection<String> visitedAddresses;
948         if(message instanceof RemoteFindPrimary) {
949             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
950         } else {
951             visitedAddresses = new ArrayList<>(1);
952         }
953
954         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
955
956         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
957             if(visitedAddresses.contains(address)) {
958                 continue;
959             }
960
961             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
962                     persistenceId(), shardName, address, visitedAddresses);
963
964             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
965                     message.isWaitUntilReady(), visitedAddresses), getContext());
966             return;
967         }
968
969         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
970
971         getSender().tell(new PrimaryNotFoundException(
972                 String.format("No primary shard found for %s.", shardName)), getSelf());
973     }
974
975     /**
976      * Construct the name of the shard actor given the name of the member on
977      * which the shard resides and the name of the shard
978      *
979      * @param memberName
980      * @param shardName
981      * @return
982      */
983     private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
984         return peerAddressResolver.getShardIdentifier(memberName, shardName);
985     }
986
987     /**
988      * Create shards that are local to the member on which the ShardManager
989      * runs
990      *
991      */
992     private void createLocalShards() {
993         MemberName memberName = this.cluster.getCurrentMemberName();
994         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
995
996         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
997         if(restoreFromSnapshot != null)
998         {
999             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
1000                 shardSnapshots.put(snapshot.getName(), snapshot);
1001             }
1002         }
1003
1004         restoreFromSnapshot = null; // null out to GC
1005
1006         for(String shardName : memberShardNames){
1007             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1008
1009             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
1010
1011             Map<String, String> peerAddresses = getPeerAddresses(shardName);
1012             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
1013                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
1014                         shardSnapshots.get(shardName)), peerAddressResolver));
1015         }
1016     }
1017
1018     /**
1019      * Given the name of the shard find the addresses of all it's peers
1020      *
1021      * @param shardName
1022      */
1023     private Map<String, String> getPeerAddresses(String shardName) {
1024         Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1025         Map<String, String> peerAddresses = new HashMap<>();
1026
1027         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1028
1029         for (MemberName memberName : members) {
1030             if (!currentMemberName.equals(memberName)) {
1031                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1032                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1033                 peerAddresses.put(shardId.toString(), address);
1034             }
1035         }
1036         return peerAddresses;
1037     }
1038
1039     @Override
1040     public SupervisorStrategy supervisorStrategy() {
1041
1042         return new OneForOneStrategy(10, Duration.create("1 minute"),
1043                 (Function<Throwable, Directive>) t -> {
1044                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1045                     return SupervisorStrategy.resume();
1046                 }
1047                 );
1048
1049     }
1050
1051     @Override
1052     public String persistenceId() {
1053         return persistenceId;
1054     }
1055
1056     @VisibleForTesting
1057     ShardManagerInfoMBean getMBean(){
1058         return mBean;
1059     }
1060
1061     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1062         if (shardReplicaOperationsInProgress.contains(shardName)) {
1063             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1064             LOG.debug ("{}: {}", persistenceId(), msg);
1065             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1066             return true;
1067         }
1068
1069         return false;
1070     }
1071
1072     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
1073         final String shardName = shardReplicaMsg.getShardName();
1074
1075         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1076
1077         // verify the shard with the specified name is present in the cluster configuration
1078         if (!(this.configuration.isShardConfigured(shardName))) {
1079             String msg = String.format("No module configuration exists for shard %s", shardName);
1080             LOG.debug ("{}: {}", persistenceId(), msg);
1081             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1082             return;
1083         }
1084
1085         // Create the localShard
1086         if (schemaContext == null) {
1087             String msg = String.format(
1088                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1089             LOG.debug ("{}: {}", persistenceId(), msg);
1090             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1091             return;
1092         }
1093
1094         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1095             @Override
1096             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1097                 getSelf().tell((RunnableMessage) () -> addShard(getShardName(), response, getSender()), getTargetActor());
1098             }
1099
1100             @Override
1101             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1102                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1103             }
1104
1105         });
1106     }
1107
1108     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1109         String msg = String.format("Local shard %s already exists", shardName);
1110         LOG.debug ("{}: {}", persistenceId(), msg);
1111         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1112     }
1113
1114     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1115         if(isShardReplicaOperationInProgress(shardName, sender)) {
1116             return;
1117         }
1118
1119         shardReplicaOperationsInProgress.add(shardName);
1120
1121         final ShardInformation shardInfo;
1122         final boolean removeShardOnFailure;
1123         ShardInformation existingShardInfo = localShards.get(shardName);
1124         if(existingShardInfo == null) {
1125             removeShardOnFailure = true;
1126             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1127
1128             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1129                     DisableElectionsRaftPolicy.class.getName()).build();
1130
1131             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1132                     Shard.builder(), peerAddressResolver);
1133             shardInfo.setActiveMember(false);
1134             localShards.put(shardName, shardInfo);
1135             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1136         } else {
1137             removeShardOnFailure = false;
1138             shardInfo = existingShardInfo;
1139         }
1140
1141         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1142
1143         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1144         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1145                 response.getPrimaryPath(), shardInfo.getShardId());
1146
1147         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1148                 duration());
1149         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1150             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1151
1152         futureObj.onComplete(new OnComplete<Object>() {
1153             @Override
1154             public void onComplete(Throwable failure, Object addServerResponse) {
1155                 if (failure != null) {
1156                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1157                             response.getPrimaryPath(), shardName, failure);
1158
1159                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1160                             response.getPrimaryPath(), shardName);
1161                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1162                 } else {
1163                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1164                             response.getPrimaryPath(), removeShardOnFailure), sender);
1165                 }
1166             }
1167         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1168     }
1169
1170     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1171             boolean removeShardOnFailure) {
1172         shardReplicaOperationsInProgress.remove(shardName);
1173
1174         if(removeShardOnFailure) {
1175             ShardInformation shardInfo = localShards.remove(shardName);
1176             if (shardInfo.getActor() != null) {
1177                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1178             }
1179         }
1180
1181         sender.tell(new Status.Failure(message == null ? failure :
1182             new RuntimeException(message, failure)), getSelf());
1183     }
1184
1185     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1186             String leaderPath, boolean removeShardOnFailure) {
1187         String shardName = shardInfo.getShardName();
1188         shardReplicaOperationsInProgress.remove(shardName);
1189
1190         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1191
1192         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1193             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1194
1195             // Make the local shard voting capable
1196             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1197             shardInfo.setActiveMember(true);
1198             persistShardList();
1199
1200             sender.tell(new Status.Success(null), getSelf());
1201         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1202             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1203         } else {
1204             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1205                     persistenceId(), shardName, replyMsg.getStatus());
1206
1207             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1208
1209             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1210         }
1211     }
1212
1213     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1214                                                String leaderPath, ShardIdentifier shardId) {
1215         Exception failure;
1216         switch (serverChangeStatus) {
1217             case TIMEOUT:
1218                 failure = new TimeoutException(String.format(
1219                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1220                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1221                         leaderPath, shardId.getShardName()));
1222                 break;
1223             case NO_LEADER:
1224                 failure = createNoShardLeaderException(shardId);
1225                 break;
1226             case NOT_SUPPORTED:
1227                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1228                         serverChange.getSimpleName(), shardId.getShardName()));
1229                 break;
1230             default :
1231                 failure = new RuntimeException(String.format(
1232                         "%s request to leader %s for shard %s failed with status %s",
1233                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1234         }
1235         return failure;
1236     }
1237
1238     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1239         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1240
1241         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1242                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1243             @Override
1244             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1245                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1246             }
1247
1248             @Override
1249             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1250                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1251             }
1252
1253             private void doRemoveShardReplicaAsync(final String primaryPath) {
1254                 getSelf().tell((RunnableMessage) () -> removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender()), getTargetActor());
1255             }
1256         });
1257     }
1258
1259     private void persistShardList() {
1260         List<String> shardList = new ArrayList<>(localShards.keySet());
1261         for (ShardInformation shardInfo : localShards.values()) {
1262             if (!shardInfo.isActiveMember()) {
1263                 shardList.remove(shardInfo.getShardName());
1264             }
1265         }
1266         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1267         saveSnapshot(updateShardManagerSnapshot(shardList));
1268     }
1269
1270     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1271         currentSnapshot = new ShardManagerSnapshot(shardList);
1272         return currentSnapshot;
1273     }
1274
1275     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1276         currentSnapshot = snapshot;
1277
1278         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1279
1280         final MemberName currentMember = cluster.getCurrentMemberName();
1281         Set<String> configuredShardList =
1282             new HashSet<>(configuration.getMemberShardNames(currentMember));
1283         for (String shard : currentSnapshot.getShardList()) {
1284             if (!configuredShardList.contains(shard)) {
1285                 // add the current member as a replica for the shard
1286                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1287                 configuration.addMemberReplicaForShard(shard, currentMember);
1288             } else {
1289                 configuredShardList.remove(shard);
1290             }
1291         }
1292         for (String shard : configuredShardList) {
1293             // remove the member as a replica for the shard
1294             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1295             configuration.removeMemberReplicaForShard(shard, currentMember);
1296         }
1297     }
1298
1299     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1300         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1301             persistenceId());
1302         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1303             0, 0));
1304     }
1305
1306     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1307         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1308
1309         String shardName = changeMembersVotingStatus.getShardName();
1310         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1311         for(Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1312             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1313                     e.getValue());
1314         }
1315
1316         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1317
1318         findLocalShard(shardName, getSender(),
1319                 localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1320                         localShardFound.getPath(), getSender()));
1321     }
1322
1323     private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
1324         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1325
1326         ActorRef sender = getSender();
1327         final String shardName = flipMembersVotingStatus.getShardName();
1328         findLocalShard(shardName, sender, localShardFound -> {
1329             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1330                     Timeout.apply(30, TimeUnit.SECONDS));
1331
1332             future.onComplete(new OnComplete<Object>() {
1333                 @Override
1334                 public void onComplete(Throwable failure, Object response) {
1335                     if (failure != null) {
1336                         sender.tell(new Status.Failure(new RuntimeException(
1337                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1338                         return;
1339                     }
1340
1341                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1342                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1343                     for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1344                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1345                     }
1346
1347                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName).
1348                             toString(), !raftState.isVoting());
1349
1350                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1351                             shardName, localShardFound.getPath(), sender);
1352                 }
1353             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1354         });
1355
1356     }
1357
1358     private void findLocalShard(final String shardName, final ActorRef sender,
1359             final Consumer<LocalShardFound> onLocalShardFound) {
1360         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1361                 getShardInitializationTimeout().duration().$times(2));
1362
1363         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1364         futureObj.onComplete(new OnComplete<Object>() {
1365             @Override
1366             public void onComplete(Throwable failure, Object response) {
1367                 if (failure != null) {
1368                     LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure);
1369                     sender.tell(new Status.Failure(new RuntimeException(
1370                             String.format("Failed to find local shard %s", shardName), failure)), self());
1371                 } else {
1372                     if(response instanceof LocalShardFound) {
1373                         getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response), sender);
1374                     } else if(response instanceof LocalShardNotFound) {
1375                         String msg = String.format("Local shard %s does not exist", shardName);
1376                         LOG.debug ("{}: {}", persistenceId, msg);
1377                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
1378                     } else {
1379                         String msg = String.format("Failed to find local shard %s: received response: %s",
1380                                 shardName, response);
1381                         LOG.debug ("{}: {}", persistenceId, msg);
1382                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1383                                 new RuntimeException(msg)), self());
1384                     }
1385                 }
1386             }
1387         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1388     }
1389
1390     private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
1391             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1392         if(isShardReplicaOperationInProgress(shardName, sender)) {
1393             return;
1394         }
1395
1396         shardReplicaOperationsInProgress.add(shardName);
1397
1398         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1399         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1400
1401         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1402                 changeServersVotingStatus, shardActorRef.path());
1403
1404         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1405         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1406
1407         futureObj.onComplete(new OnComplete<Object>() {
1408             @Override
1409             public void onComplete(Throwable failure, Object response) {
1410                 shardReplicaOperationsInProgress.remove(shardName);
1411                 if (failure != null) {
1412                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
1413                             shardActorRef.path());
1414                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
1415                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
1416                 } else {
1417                     LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1418
1419                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1420                     if(replyMsg.getStatus() == ServerChangeStatus.OK) {
1421                         LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1422                         sender.tell(new Status.Success(null), getSelf());
1423                     } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1424                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1425                                 "The requested voting state change for shard %s is invalid. At least one member must be voting",
1426                                 shardId.getShardName()))), getSelf());
1427                     } else {
1428                         LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1429                                 persistenceId(), shardName, replyMsg.getStatus());
1430
1431                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1432                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1433                         sender.tell(new Status.Failure(error), getSelf());
1434                     }
1435                 }
1436             }
1437         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1438     }
1439
1440     private static final class ForwardedAddServerReply {
1441         ShardInformation shardInfo;
1442         AddServerReply addServerReply;
1443         String leaderPath;
1444         boolean removeShardOnFailure;
1445
1446         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1447                 boolean removeShardOnFailure) {
1448             this.shardInfo = shardInfo;
1449             this.addServerReply = addServerReply;
1450             this.leaderPath = leaderPath;
1451             this.removeShardOnFailure = removeShardOnFailure;
1452         }
1453     }
1454
1455     private static final class ForwardedAddServerFailure {
1456         String shardName;
1457         String failureMessage;
1458         Throwable failure;
1459         boolean removeShardOnFailure;
1460
1461         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1462                 boolean removeShardOnFailure) {
1463             this.shardName = shardName;
1464             this.failureMessage = failureMessage;
1465             this.failure = failure;
1466             this.removeShardOnFailure = removeShardOnFailure;
1467         }
1468     }
1469
1470     static class OnShardInitialized {
1471         private final Runnable replyRunnable;
1472         private Cancellable timeoutSchedule;
1473
1474         OnShardInitialized(Runnable replyRunnable) {
1475             this.replyRunnable = replyRunnable;
1476         }
1477
1478         Runnable getReplyRunnable() {
1479             return replyRunnable;
1480         }
1481
1482         Cancellable getTimeoutSchedule() {
1483             return timeoutSchedule;
1484         }
1485
1486         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1487             this.timeoutSchedule = timeoutSchedule;
1488         }
1489     }
1490
1491     static class OnShardReady extends OnShardInitialized {
1492         OnShardReady(Runnable replyRunnable) {
1493             super(replyRunnable);
1494         }
1495     }
1496
1497     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1498         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1499                 getShardInitializationTimeout().duration().$times(2));
1500
1501
1502         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1503         futureObj.onComplete(new OnComplete<Object>() {
1504             @Override
1505             public void onComplete(Throwable failure, Object response) {
1506                 if (failure != null) {
1507                     handler.onFailure(failure);
1508                 } else {
1509                     if(response instanceof RemotePrimaryShardFound) {
1510                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1511                     } else if(response instanceof LocalPrimaryShardFound) {
1512                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1513                     } else {
1514                         handler.onUnknownResponse(response);
1515                     }
1516                 }
1517             }
1518         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1519     }
1520
1521     private static interface RunnableMessage extends Runnable {
1522     }
1523
1524     /**
1525      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1526      * a remote or local find primary message is processed
1527      */
1528     private static interface FindPrimaryResponseHandler {
1529         /**
1530          * Invoked when a Failure message is received as a response
1531          *
1532          * @param failure
1533          */
1534         void onFailure(Throwable failure);
1535
1536         /**
1537          * Invoked when a RemotePrimaryShardFound response is received
1538          *
1539          * @param response
1540          */
1541         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1542
1543         /**
1544          * Invoked when a LocalPrimaryShardFound response is received
1545          * @param response
1546          */
1547         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1548
1549         /**
1550          * Invoked when an unknown response is received. This is another type of failure.
1551          *
1552          * @param response
1553          */
1554         void onUnknownResponse(Object response);
1555     }
1556
1557     /**
1558      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1559      * replica and sends a wrapped Failure response to some targetActor
1560      */
1561     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1562         private final ActorRef targetActor;
1563         private final String shardName;
1564         private final String persistenceId;
1565         private final ActorRef shardManagerActor;
1566
1567         /**
1568          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1569          * @param shardName The name of the shard for which the primary replica had to be found
1570          * @param persistenceId The persistenceId for the ShardManager
1571          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1572          */
1573         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1574             this.targetActor = Preconditions.checkNotNull(targetActor);
1575             this.shardName = Preconditions.checkNotNull(shardName);
1576             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1577             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1578         }
1579
1580         public ActorRef getTargetActor() {
1581             return targetActor;
1582         }
1583
1584         public String getShardName() {
1585             return shardName;
1586         }
1587
1588         @Override
1589         public void onFailure(Throwable failure) {
1590             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1591             targetActor.tell(new Status.Failure(new RuntimeException(
1592                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1593         }
1594
1595         @Override
1596         public void onUnknownResponse(Object response) {
1597             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1598                     shardName, response);
1599             LOG.debug ("{}: {}", persistenceId, msg);
1600             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1601                     new RuntimeException(msg)), shardManagerActor);
1602         }
1603     }
1604
1605     /**
1606      * The WrappedShardResponse class wraps a response from a Shard.
1607      */
1608     private static final class WrappedShardResponse {
1609         private final ShardIdentifier shardId;
1610         private final Object response;
1611         private final String leaderPath;
1612
1613         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1614             this.shardId = shardId;
1615             this.response = response;
1616             this.leaderPath = leaderPath;
1617         }
1618
1619         ShardIdentifier getShardId() {
1620             return shardId;
1621         }
1622
1623         Object getResponse() {
1624             return response;
1625         }
1626
1627         String getLeaderPath() {
1628             return leaderPath;
1629         }
1630     }
1631
1632     private static final class ShardNotInitializedTimeout {
1633         private final ActorRef sender;
1634         private final ShardInformation shardInfo;
1635         private final OnShardInitialized onShardInitialized;
1636
1637         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1638             this.sender = sender;
1639             this.shardInfo = shardInfo;
1640             this.onShardInitialized = onShardInitialized;
1641         }
1642
1643         ActorRef getSender() {
1644             return sender;
1645         }
1646
1647         ShardInformation getShardInfo() {
1648             return shardInfo;
1649         }
1650
1651         OnShardInitialized getOnShardInitialized() {
1652             return onShardInitialized;
1653         }
1654     }
1655 }
1656
1657
1658