+ }
+
+ private void onShutDown() {
+ List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
+ for (ShardInformation info : localShards.values()) {
+ if (info.getActor() != null) {
+ LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
+
+ FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
+ stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, Shutdown.INSTANCE));
+ }
+ }
+
+ LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
+
+ ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
+ Future<Iterable<Boolean>> combinedFutures = Futures.sequence(stopFutures, dispatcher);
+
+ combinedFutures.onComplete(new OnComplete<Iterable<Boolean>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Boolean> results) {
+ LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
+
+ self().tell(PoisonPill.getInstance(), self());
+
+ if(failure != null) {
+ LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
+ } else {
+ int nfailed = 0;
+ for(Boolean r: results) {
+ if(!r) {
+ nfailed++;
+ }
+ }
+
+ if(nfailed > 0) {
+ LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
+ }
+ }
+ }
+ }, dispatcher);
+ }
+
+ private void onWrappedShardResponse(WrappedShardResponse message) {
+ if (message.getResponse() instanceof RemoveServerReply) {
+ onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
+ message.getLeaderPath());
+ }
+ }
+
+ private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
+ String leaderPath) {
+ shardReplicaOperationsInProgress.remove(shardId);
+
+ LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+ shardId.getShardName());
+ originalSender.tell(new akka.actor.Status.Success(null), getSelf());
+ } else {
+ LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
+ persistenceId(), shardId, replyMsg.getStatus());
+
+ Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
+ leaderPath, shardId);
+ originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
+ }
+ }
+
+ private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
+ if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
+ addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
+ getSender());
+ } else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
+ removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
+ primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
+ }
+ }
+
+ private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
+ final ActorRef sender) {
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
+
+ final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
+
+ //inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
+ LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ primaryPath, shardId);
+
+ Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
+ duration());
+ Future<Object> futureObj = ask(getContext().actorSelection(primaryPath),
+ new RemoveServer(shardId.toString()), removeServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ String msg = String.format("RemoveServer request to leader %s for shard %s failed",
+ primaryPath, shardName);
+
+ LOG.debug ("{}: {}", persistenceId(), msg, failure);
+
+ // FAILURE
+ sender.tell(new Status.Failure(new RuntimeException(msg, failure)), self());
+ } else {
+ // SUCCESS
+ self().tell(new WrappedShardResponse(shardId, response, primaryPath), sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void onShardReplicaRemoved(ServerRemoved message) {
+ final ShardIdentifier shardId = new ShardIdentifier.Builder().fromShardIdString(message.getServerId()).build();
+ final ShardInformation shardInformation = localShards.remove(shardId.getShardName());
+ if(shardInformation == null) {
+ LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
+ return;
+ } else if(shardInformation.getActor() != null) {
+ LOG.debug("{} : Sending Shutdown to Shard actor {}", persistenceId(), shardInformation.getActor());
+ shardInformation.getActor().tell(Shutdown.INSTANCE, self());
+ }
+ LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardId.getShardName());
+ persistShardList();
+ }
+
+ private void onGetSnapshot() {
+ LOG.debug("{}: onGetSnapshot", persistenceId());
+
+ List<String> notInitialized = null;
+ for(ShardInformation shardInfo: localShards.values()) {
+ if(!shardInfo.isShardInitialized()) {
+ if(notInitialized == null) {
+ notInitialized = new ArrayList<>();
+ }
+
+ notInitialized.add(shardInfo.getShardName());
+ }
+ }
+
+ if(notInitialized != null) {
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(String.format(
+ "%d shard(s) %s are not initialized", notInitialized.size(), notInitialized))), getSelf());
+ return;
+ }
+
+ byte[] shardManagerSnapshot = null;
+ if(currentSnapshot != null) {
+ shardManagerSnapshot = SerializationUtils.serialize(currentSnapshot);
+ }
+
+ ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
+ new ArrayList<>(localShards.keySet()), type, shardManagerSnapshot , getSender(), persistenceId(),
+ datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
+
+ for(ShardInformation shardInfo: localShards.values()) {
+ shardInfo.getActor().tell(GetSnapshot.INSTANCE, replyActor);
+ }
+ }
+
+ private void onCreateShard(CreateShard createShard) {
+ LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
+
+ Object reply;
+ try {
+ String shardName = createShard.getModuleShardConfig().getShardName();
+ if(localShards.containsKey(shardName)) {
+ LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
+ reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
+ } else {
+ doCreateShard(createShard);
+ reply = new akka.actor.Status.Success(null);
+ }
+ } catch (Exception e) {
+ LOG.error("{}: onCreateShard failed", persistenceId(), e);
+ reply = new akka.actor.Status.Failure(e);
+ }
+
+ if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(reply, getSelf());
+ }
+ }
+
+ private void doCreateShard(CreateShard createShard) {
+ ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
+ String shardName = moduleShardConfig.getShardName();
+
+ configuration.addModuleShardConfiguration(moduleShardConfig);
+
+ DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
+ if(shardDatastoreContext == null) {
+ shardDatastoreContext = newShardDatastoreContext(shardName);
+ } else {
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
+ peerAddressResolver).build();
+ }
+
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ boolean shardWasInRecoveredSnapshot = currentSnapshot != null &&
+ currentSnapshot.getShardList().contains(shardName);
+
+ Map<String, String> peerAddresses;
+ boolean isActiveMember;
+ if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
+ contains(cluster.getCurrentMemberName())) {
+ peerAddresses = getPeerAddresses(shardName);
+ isActiveMember = true;
+ } else {
+ // The local member is not in the static shard member configuration and the shard did not
+ // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
+ // the shard with no peers and with elections disabled so it stays as follower. A
+ // subsequent AddServer request will be needed to make it an active member.
+ isActiveMember = false;
+ peerAddresses = Collections.emptyMap();
+ shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).
+ customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
+ }
+
+ LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
+ isActiveMember);
+
+ ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
+ shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
+ info.setActiveMember(isActiveMember);
+ localShards.put(info.getShardName(), info);
+
+ mBean.addLocalShard(shardId.toString());
+
+ if(schemaContext != null) {
+ info.setActor(newShardActor(schemaContext, info));
+ }
+ }
+
+ private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
+ return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
+ shardPeerAddressResolver(peerAddressResolver);
+ }
+
+ private DatastoreContext newShardDatastoreContext(String shardName) {
+ return newShardDatastoreContextBuilder(shardName).build();
+ }
+
+ private void checkReady(){
+ if (isReadyWithLeaderId()) {
+ LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+ persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+
+ waitTillReadyCountdownLatch.countDown();
+ }
+ }
+
+ private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
+ LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
+
+ ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
+ if(shardInformation != null) {
+ shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
+ shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
+ if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
+ primaryShardInfoCache.remove(shardInformation.getShardName());
+ }
+
+ checkReady();
+ } else {
+ LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
+ }
+ }
+
+ private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
+ ShardInformation shardInfo = message.getShardInfo();
+
+ LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
+ shardInfo.getShardName());
+
+ shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
+
+ if(!shardInfo.isShardInitialized()) {
+ LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+ message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
+ } else {
+ LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
+ message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
+ }
+ }
+
+ private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
+ LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
+ status.getName(), status.isInitialSyncDone());
+
+ ShardInformation shardInformation = findShardInformation(status.getName());
+
+ if(shardInformation != null) {
+ shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
+
+ mBean.setSyncStatus(isInSync());
+ }