Fix minor bug in ShardManager#removeShardReplica
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / shardmanager / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.shardmanager;
10
11 import static akka.pattern.Patterns.ask;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.OneForOneStrategy;
16 import akka.actor.PoisonPill;
17 import akka.actor.Status;
18 import akka.actor.SupervisorStrategy;
19 import akka.actor.SupervisorStrategy.Directive;
20 import akka.cluster.ClusterEvent;
21 import akka.cluster.Member;
22 import akka.dispatch.Futures;
23 import akka.dispatch.OnComplete;
24 import akka.japi.Function;
25 import akka.pattern.Patterns;
26 import akka.persistence.RecoveryCompleted;
27 import akka.persistence.SaveSnapshotFailure;
28 import akka.persistence.SaveSnapshotSuccess;
29 import akka.persistence.SnapshotOffer;
30 import akka.persistence.SnapshotSelectionCriteria;
31 import akka.util.Timeout;
32 import com.google.common.annotations.VisibleForTesting;
33 import com.google.common.base.Preconditions;
34 import java.io.ByteArrayInputStream;
35 import java.io.ObjectInputStream;
36 import java.util.ArrayList;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.HashMap;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Map.Entry;
44 import java.util.Set;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.TimeoutException;
48 import java.util.function.Consumer;
49 import java.util.function.Supplier;
50 import org.apache.commons.lang3.SerializationUtils;
51 import org.opendaylight.controller.cluster.access.concepts.MemberName;
52 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
53 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
54 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
55 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
56 import org.opendaylight.controller.cluster.datastore.Shard;
57 import org.opendaylight.controller.cluster.datastore.config.Configuration;
58 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
59 import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
60 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
61 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
62 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
63 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
64 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
65 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
66 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
67 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
68 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
69 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
70 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
71 import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
72 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
73 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
74 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
75 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
76 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
77 import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
78 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
79 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
80 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
81 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
82 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
83 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
84 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
85 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
86 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
87 import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
88 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
89 import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
90 import org.opendaylight.controller.cluster.raft.messages.AddServer;
91 import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
92 import org.opendaylight.controller.cluster.raft.messages.ChangeServersVotingStatus;
93 import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
94 import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
95 import org.opendaylight.controller.cluster.raft.messages.ServerChangeReply;
96 import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
97 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
98 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
99 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
100 import org.slf4j.Logger;
101 import org.slf4j.LoggerFactory;
102 import scala.concurrent.ExecutionContext;
103 import scala.concurrent.Future;
104 import scala.concurrent.duration.Duration;
105 import scala.concurrent.duration.FiniteDuration;
106
107 /**
108  * The ShardManager has the following jobs,
109  * <ul>
110  * <li> Create all the local shard replicas that belong on this cluster member
111  * <li> Find the address of the local shard
112  * <li> Find the primary replica for any given shard
113  * <li> Monitor the cluster members and store their addresses
114  * <ul>
115  */
116 class ShardManager extends AbstractUntypedPersistentActorWithMetering {
117
118     private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
119
120     // Stores a mapping between a shard name and it's corresponding information
121     // Shard names look like inventory, topology etc and are as specified in
122     // configuration
123     private final Map<String, ShardInformation> localShards = new HashMap<>();
124
125     // The type of a ShardManager reflects the type of the datastore itself
126     // A data store could be of type config/operational
127     private final String type;
128
129     private final ClusterWrapper cluster;
130
131     private final Configuration configuration;
132
133     private final String shardDispatcherPath;
134
135     private final ShardManagerInfo mBean;
136
137     private DatastoreContextFactory datastoreContextFactory;
138
139     private final CountDownLatch waitTillReadyCountdownLatch;
140
141     private final PrimaryShardInfoFutureCache primaryShardInfoCache;
142
143     private final ShardPeerAddressResolver peerAddressResolver;
144
145     private SchemaContext schemaContext;
146
147     private DatastoreSnapshot restoreFromSnapshot;
148
149     private ShardManagerSnapshot currentSnapshot;
150
151     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
152
153     private final String persistenceId;
154
155     ShardManager(AbstractShardManagerCreator<?> builder) {
156         this.cluster = builder.getCluster();
157         this.configuration = builder.getConfiguration();
158         this.datastoreContextFactory = builder.getDatastoreContextFactory();
159         this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
160         this.shardDispatcherPath =
161                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
162         this.waitTillReadyCountdownLatch = builder.getWaitTillReadyCountdownLatch();
163         this.primaryShardInfoCache = builder.getPrimaryShardInfoCache();
164         this.restoreFromSnapshot = builder.getRestoreFromSnapshot();
165
166         String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
167         persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
168
169         peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
170
171         // Subscribe this actor to cluster member events
172         cluster.subscribeToMemberEvents(getSelf());
173
174         mBean = new ShardManagerInfo(getSelf(), cluster.getCurrentMemberName(), "shard-manager-" + this.type,
175                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
176         mBean.registerMBean();
177     }
178
179     @Override
180     public void postStop() {
181         LOG.info("Stopping ShardManager {}", persistenceId());
182
183         mBean.unregisterMBean();
184     }
185
186     @Override
187     public void handleCommand(Object message) throws Exception {
188         if (message  instanceof FindPrimary) {
189             findPrimary((FindPrimary)message);
190         } else if(message instanceof FindLocalShard){
191             findLocalShard((FindLocalShard) message);
192         } else if (message instanceof UpdateSchemaContext) {
193             updateSchemaContext(message);
194         } else if(message instanceof ActorInitialized) {
195             onActorInitialized(message);
196         } else if (message instanceof ClusterEvent.MemberUp){
197             memberUp((ClusterEvent.MemberUp) message);
198         } else if (message instanceof ClusterEvent.MemberExited){
199             memberExited((ClusterEvent.MemberExited) message);
200         } else if(message instanceof ClusterEvent.MemberRemoved) {
201             memberRemoved((ClusterEvent.MemberRemoved) message);
202         } else if(message instanceof ClusterEvent.UnreachableMember) {
203             memberUnreachable((ClusterEvent.UnreachableMember)message);
204         } else if(message instanceof ClusterEvent.ReachableMember) {
205             memberReachable((ClusterEvent.ReachableMember) message);
206         } else if(message instanceof DatastoreContextFactory) {
207             onDatastoreContextFactory((DatastoreContextFactory)message);
208         } else if(message instanceof RoleChangeNotification) {
209             onRoleChangeNotification((RoleChangeNotification) message);
210         } else if(message instanceof FollowerInitialSyncUpStatus){
211             onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
212         } else if(message instanceof ShardNotInitializedTimeout) {
213             onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
214         } else if(message instanceof ShardLeaderStateChanged) {
215             onLeaderStateChanged((ShardLeaderStateChanged) message);
216         } else if(message instanceof SwitchShardBehavior){
217             onSwitchShardBehavior((SwitchShardBehavior) message);
218         } else if(message instanceof CreateShard) {
219             onCreateShard((CreateShard)message);
220         } else if(message instanceof AddShardReplica){
221             onAddShardReplica((AddShardReplica)message);
222         } else if(message instanceof ForwardedAddServerReply) {
223             ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
224             onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
225                     msg.removeShardOnFailure);
226         } else if(message instanceof ForwardedAddServerFailure) {
227             ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
228             onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
229         } else if(message instanceof RemoveShardReplica) {
230             onRemoveShardReplica((RemoveShardReplica) message);
231         } else if(message instanceof WrappedShardResponse){
232             onWrappedShardResponse((WrappedShardResponse) message);
233         } else if(message instanceof GetSnapshot) {
234             onGetSnapshot();
235         } else if(message instanceof ServerRemoved){
236             onShardReplicaRemoved((ServerRemoved) message);
237         } else if(message instanceof ChangeShardMembersVotingStatus){
238             onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
239         } else if(message instanceof FlipShardMembersVotingStatus){
240             onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
241         } else if(message instanceof SaveSnapshotSuccess) {
242             onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
243         } else if(message instanceof SaveSnapshotFailure) {
244             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
245                     persistenceId(), ((SaveSnapshotFailure) message).cause());
246         } else if(message instanceof Shutdown) {
247             onShutDown();
248         } else if (message instanceof GetLocalShardIds) {
249             onGetLocalShardIds();
250         } else if(message instanceof RunnableMessage) {
251             ((RunnableMessage)message).run();
252         } else {
253             unknownMessage(message);
254         }
255     }
256
257     private void onShutDown() {
258         List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
259         for (ShardInformation info : localShards.values()) {
260             if (info.getActor() != null) {
261                 LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
262
263                 FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
264                 stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
265             }
266         }
267
268         LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
269
270         ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
271         Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
272
273         combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
274             @Override
275             public void onComplete(Throwable failure, Iterable<Boolean> results) {
276                 LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
277
278                 self().tell(PoisonPill.getInstance(), self());
279
280                 if(failure != null) {
281                     LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
282                 } else {
283                     int nfailed = 0;
284                     for(Boolean r: results) {
285                         if(!r) {
286                             nfailed++;
287                         }
288                     }
289
290                     if(nfailed > 0) {
291                         LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
292                     }
293                 }
294             }
295         }, dispatcher);
296     }
297
298     private void onWrappedShardResponse(WrappedShardResponse message) {
299         if (message.getResponse() instanceof RemoveServerReply) {
300             onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
301                     message.getLeaderPath());
302         }
303     }
304
305     private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
306             String leaderPath) {
307         shardReplicaOperationsInProgress.remove(shardId);
308
309         LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
310
311         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
312             LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
313                     shardId.getShardName());
314             originalSender.tell(new Status.Success(null), getSelf());
315         } else {
316             LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
317                     persistenceId(), shardId, replyMsg.getStatus());
318
319             Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
320             originalSender.tell(new Status.Failure(failure), getSelf());
321         }
322     }
323
324     private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
325             final ActorRef sender) {
326         if(isShardReplicaOperationInProgress(shardName, sender)) {
327             return;
328         }
329
330         shardReplicaOperationsInProgress.add(shardName);
331
332         final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
333
334         final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
335
336         //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
337         LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
338                 primaryPath, shardId);
339
340         Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
341                 duration());
342         Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
343                 new RemoveServer(shardId.toString()), removeServerTimeout);
344
345         futureObj.onComplete(new OnComplete<Object>() {
346             @Override
347             public void onComplete(Throwable failure, Object response) {
348                 if (failure != null) {
349                     shardReplicaOperationsInProgress.remove(shardName);
350                     String msg = String.format("RemoveServer request to leader %s for shard %s failed",
351                             primaryPath, shardName);
352
353                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
354
355                     // FAILURE
356                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
357                 } else {
358                     // SUCCESS
359                     self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
360                 }
361             }
362         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
363     }
364
365     private void onShardReplicaRemoved(ServerRemoved message) {
366         final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
367         final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
368         if(shardInformation == null) {
369             LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
370             return;
371         } else if(shardInformation.getActor() != null) {
372             LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
373             shardInformation.getActor().tell(Shutdown.INSTANCE, self());
374         }
375         LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
376         persistShardList();
377     }
378
379     private void onGetSnapshot() {
380         LOG.debug("{}: onGetSnapshot", persistenceId());
381
382         List<String> notInitialized = null;
383         for(ShardInformation shardInfo: localShards.values()) {
384             if(!shardInfo.isShardInitialized()) {
385                 if(notInitialized == null) {
386                     notInitialized = new ArrayList<>();
387                 }
388
389                 notInitialized.add(shardInfo.getShardName());
390             }
391         }
392
393         if(notInitialized != null) {
394             getSender().tell(new Status.Failure(new IllegalStateException(String.format(
395                     "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
396             return;
397         }
398
399         byte[] shardManagerSnapshot = null;
400         if(currentSnapshot != null) {
401             shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
402         }
403
404         ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
405                 new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
406                 datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
407
408         for(ShardInformation shardInfo: localShards.values()) {
409             shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
410         }
411     }
412
413     private void onCreateShard(CreateShard createShard) {
414         LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
415
416         Object reply;
417         try {
418             String shardName = createShard.getModuleShardConfig().getShardName();
419             if(localShards.containsKey(shardName)) {
420                 LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
421                 reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
422             } else {
423                 doCreateShard(createShard);
424                 reply = new Status.Success(null);
425             }
426         } catch (Exception e) {
427             LOG.error("{}: onCreateShard failed", persistenceId(), e);
428             reply = new Status.Failure(e);
429         }
430
431         if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
432             getSender().tell(reply, getSelf());
433         }
434     }
435
436     private void doCreateShard(CreateShard createShard) {
437         ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
438         String shardName = moduleShardConfig.getShardName();
439
440         configuration.addModuleShardConfiguration(moduleShardConfig);
441
442         DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
443         if(shardDatastoreContext == null) {
444             shardDatastoreContext = newShardDatastoreContext(shardName);
445         } else {
446             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
447                     peerAddressResolver).build();
448         }
449
450         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
451
452         boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
453                 currentSnapshot.getShardList().contains(shardName);
454
455         Map<String, String> peerAddresses;
456         boolean isActiveMember;
457         if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
458                 contains(cluster.getCurrentMemberName())) {
459             peerAddresses = getPeerAddresses(shardName);
460             isActiveMember = true;
461         } else {
462             // The local member is not in the static shard member configuration and the shard did not
463             // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
464             // the shard with no peers and with elections disabled so it stays as follower. A
465             // subsequent AddServer request will be needed to make it an active member.
466             isActiveMember = false;
467             peerAddresses = Collections.emptyMap();
468             shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
469                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
470         }
471
472         LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
473                 persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
474                 isActiveMember);
475
476         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
477                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
478         info.setActiveMember(isActiveMember);
479         localShards.put(info.getShardName(), info);
480
481         if(schemaContext != null) {
482             info.setActor(newShardActor(schemaContext, info));
483         }
484     }
485
486     private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
487         return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
488                 shardPeerAddressResolver(peerAddressResolver);
489     }
490
491     private DatastoreContext newShardDatastoreContext(String shardName) {
492         return newShardDatastoreContextBuilder(shardName).build();
493     }
494
495     private void checkReady(){
496         if (isReadyWithLeaderId()) {
497             LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
498                     persistenceId(), type, waitTillReadyCountdownLatch.getCount());
499
500             waitTillReadyCountdownLatch.countDown();
501         }
502     }
503
504     private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
505         LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
506
507         ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
508         if(shardInformation != null) {
509             shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
510             shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
511             if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
512                 primaryShardInfoCache.remove(shardInformation.getShardName());
513             }
514
515             checkReady();
516         } else {
517             LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
518         }
519     }
520
521     private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
522         ShardInformation shardInfo = message.getShardInfo();
523
524         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
525                 shardInfo.getShardName());
526
527         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
528
529         if(!shardInfo.isShardInitialized()) {
530             LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
531             message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), getSelf());
532         } else {
533             LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
534             message.getSender().tell(createNoShardLeaderException(shardInfo.getShardId()), getSelf());
535         }
536     }
537
538     private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
539         LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
540                 status.getName(), status.isInitialSyncDone());
541
542         ShardInformation shardInformation = findShardInformation(status.getName());
543
544         if(shardInformation != null) {
545             shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
546
547             mBean.setSyncStatus(isInSync());
548         }
549
550     }
551
552     private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
553         LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
554                 roleChanged.getOldRole(), roleChanged.getNewRole());
555
556         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
557         if(shardInformation != null) {
558             shardInformation.setRole(roleChanged.getNewRole());
559             checkReady();
560             mBean.setSyncStatus(isInSync());
561         }
562     }
563
564
565     private ShardInformation findShardInformation(String memberId) {
566         for(ShardInformation info : localShards.values()){
567             if(info.getShardId().toString().equals(memberId)){
568                 return info;
569             }
570         }
571
572         return null;
573     }
574
575     private boolean isReadyWithLeaderId() {
576         boolean isReady = true;
577         for (ShardInformation info : localShards.values()) {
578             if(!info.isShardReadyWithLeaderId()){
579                 isReady = false;
580                 break;
581             }
582         }
583         return isReady;
584     }
585
586     private boolean isInSync(){
587         for (ShardInformation info : localShards.values()) {
588             if(!info.isInSync()){
589                 return false;
590             }
591         }
592         return true;
593     }
594
595     private void onActorInitialized(Object message) {
596         final ActorRef sender = getSender();
597
598         if (sender == null) {
599             return; //why is a non-actor sending this message? Just ignore.
600         }
601
602         String actorName = sender.path().name();
603         //find shard name from actor name; actor name is stringified shardId
604
605         final ShardIdentifier shardId;
606         try {
607             shardId = ShardIdentifier.fromShardIdString(actorName);
608         } catch (IllegalArgumentException e) {
609             LOG.debug("{}: ignoring actor {}", actorName, e);
610             return;
611         }
612
613         markShardAsInitialized(shardId.getShardName());
614     }
615
616     private void markShardAsInitialized(String shardName) {
617         LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
618
619         ShardInformation shardInformation = localShards.get(shardName);
620         if (shardInformation != null) {
621             shardInformation.setActorInitialized();
622
623             shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
624         }
625     }
626
627     @Override
628     protected void handleRecover(Object message) throws Exception {
629         if (message instanceof RecoveryCompleted) {
630             onRecoveryCompleted();
631         } else if (message instanceof SnapshotOffer) {
632             applyShardManagerSnapshot((ShardManagerSnapshot)((SnapshotOffer) message).snapshot());
633         }
634     }
635
636     private void onRecoveryCompleted() {
637         LOG.info("Recovery complete : {}", persistenceId());
638
639         // We no longer persist SchemaContext modules so delete all the prior messages from the akka
640         // journal on upgrade from Helium.
641         deleteMessages(lastSequenceNr());
642
643         if(currentSnapshot == null && restoreFromSnapshot != null &&
644                 restoreFromSnapshot.getShardManagerSnapshot() != null) {
645             try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
646                     restoreFromSnapshot.getShardManagerSnapshot()))) {
647                 ShardManagerSnapshot snapshot = (ShardManagerSnapshot) ois.readObject();
648
649                 LOG.debug("{}: Deserialized restored ShardManagerSnapshot: {}", persistenceId(), snapshot);
650
651                 applyShardManagerSnapshot(snapshot);
652             } catch(Exception e) {
653                 LOG.error("{}: Error deserializing restored ShardManagerSnapshot", persistenceId(), e);
654             }
655         }
656
657         createLocalShards();
658     }
659
660     private void findLocalShard(FindLocalShard message) {
661         final ShardInformation shardInformation = localShards.get(message.getShardName());
662
663         if(shardInformation == null){
664             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
665             return;
666         }
667
668         sendResponse(shardInformation, message.isWaitUntilInitialized(), false, () -> new LocalShardFound(shardInformation.getActor()));
669     }
670
671     private void sendResponse(ShardInformation shardInformation, boolean doWait,
672             boolean wantShardReady, final Supplier<Object> messageSupplier) {
673         if (!shardInformation.isShardInitialized() || (wantShardReady && !shardInformation.isShardReadyWithLeaderId())) {
674             if(doWait) {
675                 final ActorRef sender = getSender();
676                 final ActorRef self = self();
677
678                 Runnable replyRunnable = () -> sender.tell(messageSupplier.get(), self);
679
680                 OnShardInitialized onShardInitialized = wantShardReady ? new OnShardReady(replyRunnable) :
681                     new OnShardInitialized(replyRunnable);
682
683                 shardInformation.addOnShardInitialized(onShardInitialized);
684
685                 FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
686                 if(shardInformation.isShardInitialized()) {
687                     // If the shard is already initialized then we'll wait enough time for the shard to
688                     // elect a leader, ie 2 times the election timeout.
689                     timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
690                             .getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
691                 }
692
693                 LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
694                         shardInformation.getShardName());
695
696                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
697                         timeout, getSelf(),
698                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
699                         getContext().dispatcher(), getSelf());
700
701                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
702
703             } else if (!shardInformation.isShardInitialized()) {
704                 LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
705                         shardInformation.getShardName());
706                 getSender().tell(createNotInitializedException(shardInformation.getShardId()), getSelf());
707             } else {
708                 LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
709                         shardInformation.getShardName());
710                 getSender().tell(createNoShardLeaderException(shardInformation.getShardId()), getSelf());
711             }
712
713             return;
714         }
715
716         getSender().tell(messageSupplier.get(), getSelf());
717     }
718
719     private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
720         return new NoShardLeaderException(null, shardId.toString());
721     }
722
723     private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
724         return new NotInitializedException(String.format(
725                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
726     }
727
728     @VisibleForTesting
729     static MemberName memberToName(final Member member) {
730         return MemberName.forName(member.roles().iterator().next());
731     }
732
733     private void memberRemoved(ClusterEvent.MemberRemoved message) {
734         MemberName memberName = memberToName(message.member());
735
736         LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
737                 message.member().address());
738
739         peerAddressResolver.removePeerAddress(memberName);
740
741         for(ShardInformation info : localShards.values()){
742             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
743         }
744     }
745
746     private void memberExited(ClusterEvent.MemberExited message) {
747         MemberName memberName = memberToName(message.member());
748
749         LOG.debug("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
750                 message.member().address());
751
752         peerAddressResolver.removePeerAddress(memberName);
753
754         for(ShardInformation info : localShards.values()){
755             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
756         }
757     }
758
759     private void memberUp(ClusterEvent.MemberUp message) {
760         MemberName memberName = memberToName(message.member());
761
762         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
763                 message.member().address());
764
765         addPeerAddress(memberName, message.member().address());
766
767         checkReady();
768     }
769
770     private void addPeerAddress(MemberName memberName, Address address) {
771         peerAddressResolver.addPeerAddress(memberName, address);
772
773         for(ShardInformation info : localShards.values()){
774             String shardName = info.getShardName();
775             String peerId = getShardIdentifier(memberName, shardName).toString();
776             info.updatePeerAddress(peerId, peerAddressResolver.getShardActorAddress(shardName, memberName), getSelf());
777
778             info.peerUp(memberName, peerId, getSelf());
779         }
780     }
781
782     private void memberReachable(ClusterEvent.ReachableMember message) {
783         MemberName memberName = memberToName(message.member());
784         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
785
786         addPeerAddress(memberName, message.member().address());
787
788         markMemberAvailable(memberName);
789     }
790
791     private void memberUnreachable(ClusterEvent.UnreachableMember message) {
792         MemberName memberName = memberToName(message.member());
793         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
794
795         markMemberUnavailable(memberName);
796     }
797
798     private void markMemberUnavailable(final MemberName memberName) {
799         final String memberStr = memberName.getName();
800         for (ShardInformation info : localShards.values()) {
801             String leaderId = info.getLeaderId();
802             // XXX: why are we using String#contains() here?
803             if (leaderId != null && leaderId.contains(memberStr)) {
804                 LOG.debug("Marking Leader {} as unavailable.", leaderId);
805                 info.setLeaderAvailable(false);
806
807                 primaryShardInfoCache.remove(info.getShardName());
808             }
809
810             info.peerDown(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
811         }
812     }
813
814     private void markMemberAvailable(final MemberName memberName) {
815         final String memberStr = memberName.getName();
816         for (ShardInformation info : localShards.values()) {
817             String leaderId = info.getLeaderId();
818             // XXX: why are we using String#contains() here?
819             if (leaderId != null && leaderId.contains(memberStr)) {
820                 LOG.debug("Marking Leader {} as available.", leaderId);
821                 info.setLeaderAvailable(true);
822             }
823
824             info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
825         }
826     }
827
828     private void onDatastoreContextFactory(DatastoreContextFactory factory) {
829         datastoreContextFactory = factory;
830         for (ShardInformation info : localShards.values()) {
831             info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
832         }
833     }
834
835     private void onGetLocalShardIds() {
836         final List<String> response = new ArrayList<>(localShards.size());
837
838         for (ShardInformation info : localShards.values()) {
839             response.add(info.getShardId().toString());
840         }
841
842         getSender().tell(new Status.Success(response), getSelf());
843     }
844
845     private void onSwitchShardBehavior(final SwitchShardBehavior message) {
846         final ShardIdentifier identifier = message.getShardId();
847
848         if (identifier != null) {
849             final ShardInformation info = localShards.get(identifier.getShardName());
850             if (info == null) {
851                 getSender().tell(new Status.Failure(
852                     new IllegalArgumentException("Shard " + identifier + " is not local")), getSelf());
853                 return;
854             }
855
856             switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
857         } else {
858             for (ShardInformation info : localShards.values()) {
859                 switchShardBehavior(info, new SwitchBehavior(message.getNewState(), message.getTerm()));
860             }
861         }
862
863         getSender().tell(new Status.Success(null), getSelf());
864     }
865
866     private void switchShardBehavior(final ShardInformation info, final SwitchBehavior switchBehavior) {
867         final ActorRef actor = info.getActor();
868         if (actor != null) {
869             actor.tell(switchBehavior, getSelf());
870           } else {
871             LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
872                 info.getShardName(), switchBehavior.getNewState());
873         }
874     }
875
876     /**
877      * Notifies all the local shards of a change in the schema context
878      *
879      * @param message
880      */
881     private void updateSchemaContext(final Object message) {
882         schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
883
884         LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
885
886         for (ShardInformation info : localShards.values()) {
887             if (info.getActor() == null) {
888                 LOG.debug("Creating Shard {}", info.getShardId());
889                 info.setActor(newShardActor(schemaContext, info));
890             } else {
891                 info.getActor().tell(message, getSelf());
892             }
893         }
894     }
895
896     @VisibleForTesting
897     protected ClusterWrapper getCluster() {
898         return cluster;
899     }
900
901     @VisibleForTesting
902     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
903         return getContext().actorOf(info.newProps(schemaContext)
904                 .withDispatcher(shardDispatcherPath), info.getShardId().toString());
905     }
906
907     private void findPrimary(FindPrimary message) {
908         LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
909
910         final String shardName = message.getShardName();
911         final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
912
913         // First see if the there is a local replica for the shard
914         final ShardInformation info = localShards.get(shardName);
915         if (info != null && info.isActiveMember()) {
916             sendResponse(info, message.isWaitUntilReady(), true, () -> {
917                 String primaryPath = info.getSerializedLeaderActor();
918                 Object found = canReturnLocalShardState && info.isLeader() ?
919                         new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
920                             new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
921
922                         if(LOG.isDebugEnabled()) {
923                             LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
924                         }
925
926                         return found;
927             });
928
929             return;
930         }
931
932         final Collection<String> visitedAddresses;
933         if(message instanceof RemoteFindPrimary) {
934             visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
935         } else {
936             visitedAddresses = new ArrayList<>(1);
937         }
938
939         visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
940
941         for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
942             if(visitedAddresses.contains(address)) {
943                 continue;
944             }
945
946             LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
947                     persistenceId(), shardName, address, visitedAddresses);
948
949             getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
950                     message.isWaitUntilReady(), visitedAddresses), getContext());
951             return;
952         }
953
954         LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
955
956         getSender().tell(new PrimaryNotFoundException(
957                 String.format("No primary shard found for %s.", shardName)), getSelf());
958     }
959
960     /**
961      * Construct the name of the shard actor given the name of the member on
962      * which the shard resides and the name of the shard
963      *
964      * @param memberName
965      * @param shardName
966      * @return
967      */
968     private ShardIdentifier getShardIdentifier(MemberName memberName, String shardName){
969         return peerAddressResolver.getShardIdentifier(memberName, shardName);
970     }
971
972     /**
973      * Create shards that are local to the member on which the ShardManager
974      * runs
975      *
976      */
977     private void createLocalShards() {
978         MemberName memberName = this.cluster.getCurrentMemberName();
979         Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
980
981         Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
982         if(restoreFromSnapshot != null)
983         {
984             for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
985                 shardSnapshots.put(snapshot.getName(), snapshot);
986             }
987         }
988
989         restoreFromSnapshot = null; // null out to GC
990
991         for(String shardName : memberShardNames){
992             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
993
994             LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
995
996             Map<String, String> peerAddresses = getPeerAddresses(shardName);
997             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
998                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
999                         shardSnapshots.get(shardName)), peerAddressResolver));
1000         }
1001     }
1002
1003     /**
1004      * Given the name of the shard find the addresses of all it's peers
1005      *
1006      * @param shardName
1007      */
1008     private Map<String, String> getPeerAddresses(String shardName) {
1009         Collection<MemberName> members = configuration.getMembersFromShardName(shardName);
1010         Map<String, String> peerAddresses = new HashMap<>();
1011
1012         MemberName currentMemberName = this.cluster.getCurrentMemberName();
1013
1014         for (MemberName memberName : members) {
1015             if (!currentMemberName.equals(memberName)) {
1016                 ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
1017                 String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
1018                 peerAddresses.put(shardId.toString(), address);
1019             }
1020         }
1021         return peerAddresses;
1022     }
1023
1024     @Override
1025     public SupervisorStrategy supervisorStrategy() {
1026
1027         return new OneForOneStrategy(10, Duration.create("1 minute"),
1028                 (Function<Throwable, Directive>) t -> {
1029                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
1030                     return SupervisorStrategy.resume();
1031                 }
1032                 );
1033
1034     }
1035
1036     @Override
1037     public String persistenceId() {
1038         return persistenceId;
1039     }
1040
1041     @VisibleForTesting
1042     ShardManagerInfoMBean getMBean(){
1043         return mBean;
1044     }
1045
1046     private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
1047         if (shardReplicaOperationsInProgress.contains(shardName)) {
1048             String msg = String.format("A shard replica operation for %s is already in progress", shardName);
1049             LOG.debug ("{}: {}", persistenceId(), msg);
1050             sender.tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1051             return true;
1052         }
1053
1054         return false;
1055     }
1056
1057     private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
1058         final String shardName = shardReplicaMsg.getShardName();
1059
1060         LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
1061
1062         // verify the shard with the specified name is present in the cluster configuration
1063         if (!(this.configuration.isShardConfigured(shardName))) {
1064             String msg = String.format("No module configuration exists for shard %s", shardName);
1065             LOG.debug ("{}: {}", persistenceId(), msg);
1066             getSender().tell(new Status.Failure(new IllegalArgumentException(msg)), getSelf());
1067             return;
1068         }
1069
1070         // Create the localShard
1071         if (schemaContext == null) {
1072             String msg = String.format(
1073                   "No SchemaContext is available in order to create a local shard instance for %s", shardName);
1074             LOG.debug ("{}: {}", persistenceId(), msg);
1075             getSender().tell(new Status.Failure(new IllegalStateException(msg)), getSelf());
1076             return;
1077         }
1078
1079         findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
1080             @Override
1081             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1082                 getSelf().tell(new RunnableMessage() {
1083                     @Override
1084                     public void run() {
1085                         addShard(getShardName(), response, getSender());
1086                     }
1087                 }, getTargetActor());
1088             }
1089
1090             @Override
1091             public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
1092                 sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
1093             }
1094
1095         });
1096     }
1097
1098     private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
1099         String msg = String.format("Local shard %s already exists", shardName);
1100         LOG.debug ("{}: {}", persistenceId(), msg);
1101         sender.tell(new Status.Failure(new AlreadyExistsException(msg)), getSelf());
1102     }
1103
1104     private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
1105         if(isShardReplicaOperationInProgress(shardName, sender)) {
1106             return;
1107         }
1108
1109         shardReplicaOperationsInProgress.add(shardName);
1110
1111         final ShardInformation shardInfo;
1112         final boolean removeShardOnFailure;
1113         ShardInformation existingShardInfo = localShards.get(shardName);
1114         if(existingShardInfo == null) {
1115             removeShardOnFailure = true;
1116             ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1117
1118             DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
1119                     DisableElectionsRaftPolicy.class.getName()).build();
1120
1121             shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
1122                     Shard.builder(), peerAddressResolver);
1123             shardInfo.setActiveMember(false);
1124             localShards.put(shardName, shardInfo);
1125             shardInfo.setActor(newShardActor(schemaContext, shardInfo));
1126         } else {
1127             removeShardOnFailure = false;
1128             shardInfo = existingShardInfo;
1129         }
1130
1131         String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
1132
1133         //inform ShardLeader to add this shard as a replica by sending an AddServer message
1134         LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
1135                 response.getPrimaryPath(), shardInfo.getShardId());
1136
1137         Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
1138                 duration());
1139         Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
1140             new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
1141
1142         futureObj.onComplete(new OnComplete<Object>() {
1143             @Override
1144             public void onComplete(Throwable failure, Object addServerResponse) {
1145                 if (failure != null) {
1146                     LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
1147                             response.getPrimaryPath(), shardName, failure);
1148
1149                     String msg = String.format("AddServer request to leader %s for shard %s failed",
1150                             response.getPrimaryPath(), shardName);
1151                     self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
1152                 } else {
1153                     self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
1154                             response.getPrimaryPath(), removeShardOnFailure), sender);
1155                 }
1156             }
1157         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1158     }
1159
1160     private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
1161             boolean removeShardOnFailure) {
1162         shardReplicaOperationsInProgress.remove(shardName);
1163
1164         if(removeShardOnFailure) {
1165             ShardInformation shardInfo = localShards.remove(shardName);
1166             if (shardInfo.getActor() != null) {
1167                 shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
1168             }
1169         }
1170
1171         sender.tell(new Status.Failure(message == null ? failure :
1172             new RuntimeException(message, failure)), getSelf());
1173     }
1174
1175     private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
1176             String leaderPath, boolean removeShardOnFailure) {
1177         String shardName = shardInfo.getShardName();
1178         shardReplicaOperationsInProgress.remove(shardName);
1179
1180         LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
1181
1182         if (replyMsg.getStatus() == ServerChangeStatus.OK) {
1183             LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
1184
1185             // Make the local shard voting capable
1186             shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
1187             shardInfo.setActiveMember(true);
1188             persistShardList();
1189
1190             sender.tell(new Status.Success(null), getSelf());
1191         } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
1192             sendLocalReplicaAlreadyExistsReply(shardName, sender);
1193         } else {
1194             LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
1195                     persistenceId(), shardName, replyMsg.getStatus());
1196
1197             Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
1198
1199             onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
1200         }
1201     }
1202
1203     private static Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
1204                                                String leaderPath, ShardIdentifier shardId) {
1205         Exception failure;
1206         switch (serverChangeStatus) {
1207             case TIMEOUT:
1208                 failure = new TimeoutException(String.format(
1209                         "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
1210                         "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
1211                         leaderPath, shardId.getShardName()));
1212                 break;
1213             case NO_LEADER:
1214                 failure = createNoShardLeaderException(shardId);
1215                 break;
1216             case NOT_SUPPORTED:
1217                 failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
1218                         serverChange.getSimpleName(), shardId.getShardName()));
1219                 break;
1220             default :
1221                 failure = new RuntimeException(String.format(
1222                         "%s request to leader %s for shard %s failed with status %s",
1223                         serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
1224         }
1225         return failure;
1226     }
1227
1228     private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
1229         LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
1230
1231         findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
1232                 shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
1233             @Override
1234             public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
1235                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1236             }
1237
1238             @Override
1239             public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
1240                 doRemoveShardReplicaAsync(response.getPrimaryPath());
1241             }
1242
1243             private void doRemoveShardReplicaAsync(final String primaryPath) {
1244                 getSelf().tell(new RunnableMessage() {
1245                     @Override
1246                     public void run() {
1247                         removeShardReplica(shardReplicaMsg, getShardName(), primaryPath, getSender());
1248                     }
1249                 }, getTargetActor());
1250             }
1251         });
1252     }
1253
1254     private void persistShardList() {
1255         List<String> shardList = new ArrayList<>(localShards.keySet());
1256         for (ShardInformation shardInfo : localShards.values()) {
1257             if (!shardInfo.isActiveMember()) {
1258                 shardList.remove(shardInfo.getShardName());
1259             }
1260         }
1261         LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
1262         saveSnapshot(updateShardManagerSnapshot(shardList));
1263     }
1264
1265     private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
1266         currentSnapshot = new ShardManagerSnapshot(shardList);
1267         return currentSnapshot;
1268     }
1269
1270     private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
1271         currentSnapshot = snapshot;
1272
1273         LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
1274
1275         final MemberName currentMember = cluster.getCurrentMemberName();
1276         Set<String> configuredShardList =
1277             new HashSet<>(configuration.getMemberShardNames(currentMember));
1278         for (String shard : currentSnapshot.getShardList()) {
1279             if (!configuredShardList.contains(shard)) {
1280                 // add the current member as a replica for the shard
1281                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
1282                 configuration.addMemberReplicaForShard(shard, currentMember);
1283             } else {
1284                 configuredShardList.remove(shard);
1285             }
1286         }
1287         for (String shard : configuredShardList) {
1288             // remove the member as a replica for the shard
1289             LOG.debug ("{}: removing shard {}", persistenceId(), shard);
1290             configuration.removeMemberReplicaForShard(shard, currentMember);
1291         }
1292     }
1293
1294     private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
1295         LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
1296             persistenceId());
1297         deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
1298             0, 0));
1299     }
1300
1301     private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
1302         LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
1303
1304         String shardName = changeMembersVotingStatus.getShardName();
1305         Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1306         for(Entry<String, Boolean> e: changeMembersVotingStatus.getMeberVotingStatusMap().entrySet()) {
1307             serverVotingStatusMap.put(getShardIdentifier(MemberName.forName(e.getKey()), shardName).toString(),
1308                     e.getValue());
1309         }
1310
1311         ChangeServersVotingStatus changeServersVotingStatus = new ChangeServersVotingStatus(serverVotingStatusMap);
1312
1313         findLocalShard(shardName, getSender(),
1314                 localShardFound -> changeShardMembersVotingStatus(changeServersVotingStatus, shardName,
1315                         localShardFound.getPath(), getSender()));
1316     }
1317
1318     private void onFlipShardMembersVotingStatus(FlipShardMembersVotingStatus flipMembersVotingStatus) {
1319         LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
1320
1321         ActorRef sender = getSender();
1322         final String shardName = flipMembersVotingStatus.getShardName();
1323         findLocalShard(shardName, sender, localShardFound -> {
1324             Future<Object> future = ask(localShardFound.getPath(), GetOnDemandRaftState.INSTANCE,
1325                     Timeout.apply(30, TimeUnit.SECONDS));
1326
1327             future.onComplete(new OnComplete<Object>() {
1328                 @Override
1329                 public void onComplete(Throwable failure, Object response) {
1330                     if (failure != null) {
1331                         sender.tell(new Status.Failure(new RuntimeException(
1332                                 String.format("Failed to access local shard %s", shardName), failure)), self());
1333                         return;
1334                     }
1335
1336                     OnDemandRaftState raftState = (OnDemandRaftState) response;
1337                     Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
1338                     for(Entry<String, Boolean> e: raftState.getPeerVotingStates().entrySet()) {
1339                         serverVotingStatusMap.put(e.getKey(), !e.getValue());
1340                     }
1341
1342                     serverVotingStatusMap.put(getShardIdentifier(cluster.getCurrentMemberName(), shardName).
1343                             toString(), !raftState.isVoting());
1344
1345                     changeShardMembersVotingStatus(new ChangeServersVotingStatus(serverVotingStatusMap),
1346                             shardName, localShardFound.getPath(), sender);
1347                 }
1348             }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1349         });
1350
1351     }
1352
1353     private void findLocalShard(final String shardName, final ActorRef sender,
1354             final Consumer<LocalShardFound> onLocalShardFound) {
1355         Timeout findLocalTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1356                 getShardInitializationTimeout().duration().$times(2));
1357
1358         Future<Object> futureObj = ask(getSelf(), new FindLocalShard(shardName, true), findLocalTimeout);
1359         futureObj.onComplete(new OnComplete<Object>() {
1360             @Override
1361             public void onComplete(Throwable failure, Object response) {
1362                 if (failure != null) {
1363                     LOG.debug ("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName, failure);
1364                     sender.tell(new Status.Failure(new RuntimeException(
1365                             String.format("Failed to find local shard %s", shardName), failure)), self());
1366                 } else {
1367                     if(response instanceof LocalShardFound) {
1368                         getSelf().tell(new RunnableMessage() {
1369                             @Override
1370                             public void run() {
1371                                 onLocalShardFound.accept((LocalShardFound) response);
1372                             }
1373                         }, sender);
1374                     } else if(response instanceof LocalShardNotFound) {
1375                         String msg = String.format("Local shard %s does not exist", shardName);
1376                         LOG.debug ("{}: {}", persistenceId, msg);
1377                         sender.tell(new Status.Failure(new IllegalArgumentException(msg)), self());
1378                     } else {
1379                         String msg = String.format("Failed to find local shard %s: received response: %s",
1380                                 shardName, response);
1381                         LOG.debug ("{}: {}", persistenceId, msg);
1382                         sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1383                                 new RuntimeException(msg)), self());
1384                     }
1385                 }
1386             }
1387         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1388     }
1389
1390     private void changeShardMembersVotingStatus(ChangeServersVotingStatus changeServersVotingStatus,
1391             final String shardName, final ActorRef shardActorRef, final ActorRef sender) {
1392         if(isShardReplicaOperationInProgress(shardName, sender)) {
1393             return;
1394         }
1395
1396         shardReplicaOperationsInProgress.add(shardName);
1397
1398         DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
1399         final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
1400
1401         LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
1402                 changeServersVotingStatus, shardActorRef.path());
1403
1404         Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
1405         Future<Object> futureObj = ask(shardActorRef, changeServersVotingStatus, timeout);
1406
1407         futureObj.onComplete(new OnComplete<Object>() {
1408             @Override
1409             public void onComplete(Throwable failure, Object response) {
1410                 shardReplicaOperationsInProgress.remove(shardName);
1411                 if (failure != null) {
1412                     String msg = String.format("ChangeServersVotingStatus request to local shard %s failed",
1413                             shardActorRef.path());
1414                     LOG.debug ("{}: {}", persistenceId(), msg, failure);
1415                     sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
1416                 } else {
1417                     LOG.debug ("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
1418
1419                     ServerChangeReply replyMsg = (ServerChangeReply) response;
1420                     if(replyMsg.getStatus() == ServerChangeStatus.OK) {
1421                         LOG.debug ("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
1422                         sender.tell(new Status.Success(null), getSelf());
1423                     } else if(replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
1424                         sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
1425                                 "The requested voting state change for shard %s is invalid. At least one member must be voting",
1426                                 shardId.getShardName()))), getSelf());
1427                     } else {
1428                         LOG.warn ("{}: ChangeServersVotingStatus failed for shard {} with status {}",
1429                                 persistenceId(), shardName, replyMsg.getStatus());
1430
1431                         Exception error = getServerChangeException(ChangeServersVotingStatus.class,
1432                                 replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
1433                         sender.tell(new Status.Failure(error), getSelf());
1434                     }
1435                 }
1436             }
1437         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1438     }
1439
1440     private static final class ForwardedAddServerReply {
1441         ShardInformation shardInfo;
1442         AddServerReply addServerReply;
1443         String leaderPath;
1444         boolean removeShardOnFailure;
1445
1446         ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
1447                 boolean removeShardOnFailure) {
1448             this.shardInfo = shardInfo;
1449             this.addServerReply = addServerReply;
1450             this.leaderPath = leaderPath;
1451             this.removeShardOnFailure = removeShardOnFailure;
1452         }
1453     }
1454
1455     private static final class ForwardedAddServerFailure {
1456         String shardName;
1457         String failureMessage;
1458         Throwable failure;
1459         boolean removeShardOnFailure;
1460
1461         ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
1462                 boolean removeShardOnFailure) {
1463             this.shardName = shardName;
1464             this.failureMessage = failureMessage;
1465             this.failure = failure;
1466             this.removeShardOnFailure = removeShardOnFailure;
1467         }
1468     }
1469
1470     static class OnShardInitialized {
1471         private final Runnable replyRunnable;
1472         private Cancellable timeoutSchedule;
1473
1474         OnShardInitialized(Runnable replyRunnable) {
1475             this.replyRunnable = replyRunnable;
1476         }
1477
1478         Runnable getReplyRunnable() {
1479             return replyRunnable;
1480         }
1481
1482         Cancellable getTimeoutSchedule() {
1483             return timeoutSchedule;
1484         }
1485
1486         void setTimeoutSchedule(Cancellable timeoutSchedule) {
1487             this.timeoutSchedule = timeoutSchedule;
1488         }
1489     }
1490
1491     static class OnShardReady extends OnShardInitialized {
1492         OnShardReady(Runnable replyRunnable) {
1493             super(replyRunnable);
1494         }
1495     }
1496
1497     private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
1498         Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
1499                 getShardInitializationTimeout().duration().$times(2));
1500
1501
1502         Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
1503         futureObj.onComplete(new OnComplete<Object>() {
1504             @Override
1505             public void onComplete(Throwable failure, Object response) {
1506                 if (failure != null) {
1507                     handler.onFailure(failure);
1508                 } else {
1509                     if(response instanceof RemotePrimaryShardFound) {
1510                         handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
1511                     } else if(response instanceof LocalPrimaryShardFound) {
1512                         handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
1513                     } else {
1514                         handler.onUnknownResponse(response);
1515                     }
1516                 }
1517             }
1518         }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
1519     }
1520
1521     private static interface RunnableMessage extends Runnable {
1522     }
1523
1524     /**
1525      * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
1526      * a remote or local find primary message is processed
1527      */
1528     private static interface FindPrimaryResponseHandler {
1529         /**
1530          * Invoked when a Failure message is received as a response
1531          *
1532          * @param failure
1533          */
1534         void onFailure(Throwable failure);
1535
1536         /**
1537          * Invoked when a RemotePrimaryShardFound response is received
1538          *
1539          * @param response
1540          */
1541         void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
1542
1543         /**
1544          * Invoked when a LocalPrimaryShardFound response is received
1545          * @param response
1546          */
1547         void onLocalPrimaryFound(LocalPrimaryShardFound response);
1548
1549         /**
1550          * Invoked when an unknown response is received. This is another type of failure.
1551          *
1552          * @param response
1553          */
1554         void onUnknownResponse(Object response);
1555     }
1556
1557     /**
1558      * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
1559      * replica and sends a wrapped Failure response to some targetActor
1560      */
1561     private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
1562         private final ActorRef targetActor;
1563         private final String shardName;
1564         private final String persistenceId;
1565         private final ActorRef shardManagerActor;
1566
1567         /**
1568          * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
1569          * @param shardName The name of the shard for which the primary replica had to be found
1570          * @param persistenceId The persistenceId for the ShardManager
1571          * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
1572          */
1573         protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
1574             this.targetActor = Preconditions.checkNotNull(targetActor);
1575             this.shardName = Preconditions.checkNotNull(shardName);
1576             this.persistenceId = Preconditions.checkNotNull(persistenceId);
1577             this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
1578         }
1579
1580         public ActorRef getTargetActor() {
1581             return targetActor;
1582         }
1583
1584         public String getShardName() {
1585             return shardName;
1586         }
1587
1588         @Override
1589         public void onFailure(Throwable failure) {
1590             LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
1591             targetActor.tell(new Status.Failure(new RuntimeException(
1592                     String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
1593         }
1594
1595         @Override
1596         public void onUnknownResponse(Object response) {
1597             String msg = String.format("Failed to find leader for shard %s: received response: %s",
1598                     shardName, response);
1599             LOG.debug ("{}: {}", persistenceId, msg);
1600             targetActor.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response :
1601                     new RuntimeException(msg)), shardManagerActor);
1602         }
1603     }
1604
1605     /**
1606      * The WrappedShardResponse class wraps a response from a Shard.
1607      */
1608     private static final class WrappedShardResponse {
1609         private final ShardIdentifier shardId;
1610         private final Object response;
1611         private final String leaderPath;
1612
1613         WrappedShardResponse(ShardIdentifier shardId, Object response, String leaderPath) {
1614             this.shardId = shardId;
1615             this.response = response;
1616             this.leaderPath = leaderPath;
1617         }
1618
1619         ShardIdentifier getShardId() {
1620             return shardId;
1621         }
1622
1623         Object getResponse() {
1624             return response;
1625         }
1626
1627         String getLeaderPath() {
1628             return leaderPath;
1629         }
1630     }
1631
1632     private static final class ShardNotInitializedTimeout {
1633         private final ActorRef sender;
1634         private final ShardInformation shardInfo;
1635         private final OnShardInitialized onShardInitialized;
1636
1637         ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
1638             this.sender = sender;
1639             this.shardInfo = shardInfo;
1640             this.onShardInitialized = onShardInitialized;
1641         }
1642
1643         ActorRef getSender() {
1644             return sender;
1645         }
1646
1647         ShardInformation getShardInfo() {
1648             return shardInfo;
1649         }
1650
1651         OnShardInitialized getOnShardInitialized() {
1652             return onShardInitialized;
1653         }
1654     }
1655 }
1656
1657
1658