+
+ info.peerUp(memberName, getShardIdentifier(memberName, info.getShardName()).toString(), getSelf());
+ }
+ }
+
+ private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+ datastoreContextFactory = factory;
+ for (ShardInformation info : localShards.values()) {
+ info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
+ }
+ }
+
+ private void onSwitchShardBehavior(SwitchShardBehavior message) {
+ ShardIdentifier identifier = ShardIdentifier.builder().fromShardIdString(message.getShardName()).build();
+
+ ShardInformation shardInformation = localShards.get(identifier.getShardName());
+
+ if(shardInformation != null && shardInformation.getActor() != null) {
+ shardInformation.getActor().tell(
+ new SwitchBehavior(RaftState.valueOf(message.getNewState()), message.getTerm()), getSelf());
+ } else {
+ LOG.warn("Could not switch the behavior of shard {} to {} - shard is not yet available",
+ message.getShardName(), message.getNewState());
+ }
+ }
+
+ /**
+ * Notifies all the local shards of a change in the schema context
+ *
+ * @param message
+ */
+ private void updateSchemaContext(final Object message) {
+ schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+ LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getAllModuleIdentifiers().size());
+
+ for (ShardInformation info : localShards.values()) {
+ if (info.getActor() == null) {
+ LOG.debug("Creating Shard {}", info.getShardId());
+ info.setActor(newShardActor(schemaContext, info));
+ } else {
+ info.getActor().tell(message, getSelf());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ protected ClusterWrapper getCluster() {
+ return cluster;
+ }
+
+ @VisibleForTesting
+ protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
+ return getContext().actorOf(info.newProps(schemaContext)
+ .withDispatcher(shardDispatcherPath), info.getShardId().toString());
+ }
+
+ private void findPrimary(FindPrimary message) {
+ LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
+
+ final String shardName = message.getShardName();
+ final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
+
+ // First see if the there is a local replica for the shard
+ final ShardInformation info = localShards.get(shardName);
+ if (info != null && info.isActiveMember()) {
+ sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
+ @Override
+ public Object get() {
+ String primaryPath = info.getSerializedLeaderActor();
+ Object found = canReturnLocalShardState && info.isLeader() ?
+ new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+ new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+ }
+
+ return found;
+ }
+ });
+
+ return;
+ }
+
+ Collection<String> visitedAddresses;
+ if(message instanceof RemoteFindPrimary) {
+ visitedAddresses = ((RemoteFindPrimary)message).getVisitedAddresses();
+ } else {
+ visitedAddresses = new ArrayList<>();
+ }
+
+ visitedAddresses.add(peerAddressResolver.getShardManagerActorPathBuilder(cluster.getSelfAddress()).toString());
+
+ for(String address: peerAddressResolver.getShardManagerPeerActorAddresses()) {
+ if(visitedAddresses.contains(address)) {
+ continue;
+ }
+
+ LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+ shardName, address);
+
+ getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
+ message.isWaitUntilReady(), visitedAddresses), getContext());
+ return;
+ }
+
+ LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
+
+ getSender().tell(new PrimaryNotFoundException(
+ String.format("No primary shard found for %s.", shardName)), getSelf());
+ }
+
+ /**
+ * Construct the name of the shard actor given the name of the member on
+ * which the shard resides and the name of the shard
+ *
+ * @param memberName
+ * @param shardName
+ * @return
+ */
+ private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+ return peerAddressResolver.getShardIdentifier(memberName, shardName);
+ }
+
+ /**
+ * Create shards that are local to the member on which the ShardManager
+ * runs
+ *
+ */
+ private void createLocalShards() {
+ String memberName = this.cluster.getCurrentMemberName();
+ Collection<String> memberShardNames = this.configuration.getMemberShardNames(memberName);
+
+ Map<String, DatastoreSnapshot.ShardSnapshot> shardSnapshots = new HashMap<>();
+ if(restoreFromSnapshot != null)
+ {
+ for(DatastoreSnapshot.ShardSnapshot snapshot: restoreFromSnapshot.getShardSnapshots()) {
+ shardSnapshots.put(snapshot.getName(), snapshot);
+ }
+ }
+
+ restoreFromSnapshot = null; // null out to GC
+
+ for(String shardName : memberShardNames){
+ ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+
+ LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
+
+ Map<String, String> peerAddresses = getPeerAddresses(shardName);
+ localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
+ newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
+ shardSnapshots.get(shardName)), peerAddressResolver));
+ mBean.addLocalShard(shardId.toString());
+ }
+ }
+
+ /**
+ * Given the name of the shard find the addresses of all it's peers
+ *
+ * @param shardName
+ */
+ private Map<String, String> getPeerAddresses(String shardName) {
+ Collection<String> members = configuration.getMembersFromShardName(shardName);
+ Map<String, String> peerAddresses = new HashMap<>();
+
+ String currentMemberName = this.cluster.getCurrentMemberName();
+
+ for(String memberName : members) {
+ if(!currentMemberName.equals(memberName)) {
+ ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+ String address = peerAddressResolver.getShardActorAddress(shardName, memberName);
+ peerAddresses.put(shardId.toString(), address);
+ }
+ }
+ return peerAddresses;
+ }
+
+ @Override
+ public SupervisorStrategy supervisorStrategy() {
+
+ return new OneForOneStrategy(10, Duration.create("1 minute"),
+ new Function<Throwable, SupervisorStrategy.Directive>() {
+ @Override
+ public SupervisorStrategy.Directive apply(Throwable t) {
+ LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
+ return SupervisorStrategy.resume();
+ }
+ }
+ );
+
+ }
+
+ @Override
+ public String persistenceId() {
+ return persistenceId;
+ }
+
+ @VisibleForTesting
+ ShardManagerInfoMBean getMBean(){
+ return mBean;
+ }
+
+ private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
+ if (shardReplicaOperationsInProgress.contains(shardName)) {
+ String msg = String.format("A shard replica operation for %s is already in progress", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ sender.tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ return true;
+ }
+
+ return false;
+ }
+
+ private void onAddShardReplica (final AddShardReplica shardReplicaMsg) {
+ final String shardName = shardReplicaMsg.getShardName();
+
+ LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+ // verify the shard with the specified name is present in the cluster configuration
+ if (!(this.configuration.isShardConfigured(shardName))) {
+ String msg = String.format("No module configuration exists for shard %s", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ return;
+ }
+
+ // Create the localShard
+ if (schemaContext == null) {
+ String msg = String.format(
+ "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ return;
+ }
+
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound message) {
+ sendLocalReplicaAlreadyExistsReply(getShardName(), getTargetActor());
+ }
+
+ });
+ }
+
+ private void sendLocalReplicaAlreadyExistsReply(String shardName, ActorRef sender) {
+ String msg = String.format("Local shard %s already exists", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ sender.tell(new akka.actor.Status.Failure(new AlreadyExistsException(msg)), getSelf());
+ }
+
+ private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
+ if(isShardReplicaOperationInProgress(shardName, sender)) {
+ return;
+ }
+
+ shardReplicaOperationsInProgress.add(shardName);
+
+ final ShardInformation shardInfo;
+ final boolean removeShardOnFailure;
+ ShardInformation existingShardInfo = localShards.get(shardName);
+ if(existingShardInfo == null) {
+ removeShardOnFailure = true;
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+ DisableElectionsRaftPolicy.class.getName()).build();
+
+ shardInfo = new ShardInformation(shardName, shardId, getPeerAddresses(shardName), datastoreContext,
+ Shard.builder(), peerAddressResolver);
+ shardInfo.setActiveMember(false);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+ } else {
+ removeShardOnFailure = false;
+ shardInfo = existingShardInfo;
+ }
+
+ String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+
+ //inform ShardLeader to add this shard as a replica by sending an AddServer message
+ LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+ response.getPrimaryPath(), shardInfo.getShardId());
+
+ Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext().getShardLeaderElectionTimeout().
+ duration());
+ Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+ new AddServer(shardInfo.getShardId().toString(), localShardAddress, true), addServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object addServerResponse) {
+ if (failure != null) {
+ LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+ response.getPrimaryPath(), shardName, failure);
+
+ String msg = String.format("AddServer request to leader %s for shard %s failed",
+ response.getPrimaryPath(), shardName);
+ self().tell(new ForwardedAddServerFailure(shardName, msg, failure, removeShardOnFailure), sender);
+ } else {
+ self().tell(new ForwardedAddServerReply(shardInfo, (AddServerReply)addServerResponse,
+ response.getPrimaryPath(), removeShardOnFailure), sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void onAddServerFailure(String shardName, String message, Throwable failure, ActorRef sender,
+ boolean removeShardOnFailure) {
+ shardReplicaOperationsInProgress.remove(shardName);
+
+ if(removeShardOnFailure) {
+ ShardInformation shardInfo = localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ sender.tell(new akka.actor.Status.Failure(message == null ? failure :
+ new RuntimeException(message, failure)), getSelf());
+ }
+
+ private void onAddServerReply(ShardInformation shardInfo, AddServerReply replyMsg, ActorRef sender,
+ String leaderPath, boolean removeShardOnFailure) {
+ String shardName = shardInfo.getShardName();
+ shardReplicaOperationsInProgress.remove(shardName);
+
+ LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+
+ // Make the local shard voting capable
+ shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+ shardInfo.setActiveMember(true);
+ persistShardList();
+
+ mBean.addLocalShard(shardInfo.getShardId().toString());
+ sender.tell(new akka.actor.Status.Success(null), getSelf());
+ } else if(replyMsg.getStatus() == ServerChangeStatus.ALREADY_EXISTS) {
+ sendLocalReplicaAlreadyExistsReply(shardName, sender);
+ } else {
+ LOG.warn ("{}: Leader failed to add shard replica {} with status {}",
+ persistenceId(), shardName, replyMsg.getStatus());
+
+ Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath, shardInfo.getShardId());
+
+ onAddServerFailure(shardName, null, failure, sender, removeShardOnFailure);
+ }
+ }
+
+ private Exception getServerChangeException(Class<?> serverChange, ServerChangeStatus serverChangeStatus,
+ String leaderPath, ShardIdentifier shardId) {
+ Exception failure;
+ switch (serverChangeStatus) {
+ case TIMEOUT:
+ failure = new TimeoutException(String.format(
+ "The shard leader %s timed out trying to replicate the initial data to the new shard %s." +
+ "Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+ leaderPath, shardId.getShardName()));
+ break;
+ case NO_LEADER:
+ failure = createNoShardLeaderException(shardId);
+ break;
+ case NOT_SUPPORTED:
+ failure = new UnsupportedOperationException(String.format("%s request is not supported for shard %s",
+ serverChange.getSimpleName(), shardId.getShardName()));
+ break;
+ default :
+ failure = new RuntimeException(String.format(
+ "%s request to leader %s for shard %s failed with status %s",
+ serverChange.getSimpleName(), leaderPath, shardId.getShardName(), serverChangeStatus));
+ }
+ return failure;
+ }
+
+ private void onRemoveShardReplica (final RemoveShardReplica shardReplicaMsg) {
+ LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
+
+ findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
+ shardReplicaMsg.getShardName(), persistenceId(), getSelf()) {
+ @Override
+ public void onRemotePrimaryShardFound(RemotePrimaryShardFound response) {
+ getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ }
+
+ @Override
+ public void onLocalPrimaryFound(LocalPrimaryShardFound response) {
+ getSelf().tell(new PrimaryShardFoundForContext(getShardName(), shardReplicaMsg, response), getTargetActor());
+ }
+ });
+ }
+
+ private void persistShardList() {
+ List<String> shardList = new ArrayList<>(localShards.keySet());
+ for (ShardInformation shardInfo : localShards.values()) {
+ if (!shardInfo.isActiveMember()) {
+ shardList.remove(shardInfo.getShardName());
+ }
+ }
+ LOG.debug ("{}: persisting the shard list {}", persistenceId(), shardList);
+ saveSnapshot(updateShardManagerSnapshot(shardList));
+ }
+
+ private ShardManagerSnapshot updateShardManagerSnapshot(List<String> shardList) {
+ currentSnapshot = new ShardManagerSnapshot(shardList);
+ return currentSnapshot;
+ }
+
+ private void applyShardManagerSnapshot(ShardManagerSnapshot snapshot) {
+ currentSnapshot = snapshot;
+
+ LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
+
+ String currentMember = cluster.getCurrentMemberName();
+ Set<String> configuredShardList =
+ new HashSet<>(configuration.getMemberShardNames(currentMember));
+ for (String shard : currentSnapshot.getShardList()) {
+ if (!configuredShardList.contains(shard)) {
+ // add the current member as a replica for the shard
+ LOG.debug ("{}: adding shard {}", persistenceId(), shard);
+ configuration.addMemberReplicaForShard(shard, currentMember);
+ } else {
+ configuredShardList.remove(shard);
+ }
+ }
+ for (String shard : configuredShardList) {
+ // remove the member as a replica for the shard
+ LOG.debug ("{}: removing shard {}", persistenceId(), shard);
+ configuration.removeMemberReplicaForShard(shard, currentMember);
+ }
+ }
+
+ private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
+ LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+ persistenceId());
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+ }
+
+ private static class ForwardedAddServerReply {
+ ShardInformation shardInfo;
+ AddServerReply addServerReply;
+ String leaderPath;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerReply(ShardInformation shardInfo, AddServerReply addServerReply, String leaderPath,
+ boolean removeShardOnFailure) {
+ this.shardInfo = shardInfo;
+ this.addServerReply = addServerReply;
+ this.leaderPath = leaderPath;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }
+ }
+
+ private static class ForwardedAddServerFailure {
+ String shardName;
+ String failureMessage;
+ Throwable failure;
+ boolean removeShardOnFailure;
+
+ ForwardedAddServerFailure(String shardName, String failureMessage, Throwable failure,
+ boolean removeShardOnFailure) {
+ this.shardName = shardName;
+ this.failureMessage = failureMessage;
+ this.failure = failure;
+ this.removeShardOnFailure = removeShardOnFailure;
+ }
+ }
+
+ @VisibleForTesting
+ protected static class ShardInformation {
+ private final ShardIdentifier shardId;
+ private final String shardName;
+ private ActorRef actor;
+ private ActorPath actorPath;
+ private final Map<String, String> initialPeerAddresses;
+ private Optional<DataTree> localShardDataTree;
+ private boolean leaderAvailable = false;
+
+ // flag that determines if the actor is ready for business
+ private boolean actorInitialized = false;
+
+ private boolean followerSyncStatus = false;
+
+ private final Set<OnShardInitialized> onShardInitializedSet = Sets.newHashSet();
+ private String role ;
+ private String leaderId;
+ private short leaderVersion;
+
+ private DatastoreContext datastoreContext;
+ private Shard.AbstractBuilder<?, ?> builder;
+ private final ShardPeerAddressResolver addressResolver;
+ private boolean isActiveMember = true;
+
+ private ShardInformation(String shardName, ShardIdentifier shardId,
+ Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
+ Shard.AbstractBuilder<?, ?> builder, ShardPeerAddressResolver addressResolver) {
+ this.shardName = shardName;
+ this.shardId = shardId;
+ this.initialPeerAddresses = initialPeerAddresses;
+ this.datastoreContext = datastoreContext;
+ this.builder = builder;
+ this.addressResolver = addressResolver;
+ }
+
+ Props newProps(SchemaContext schemaContext) {
+ Preconditions.checkNotNull(builder);
+ Props props = builder.id(shardId).peerAddresses(initialPeerAddresses).datastoreContext(datastoreContext).
+ schemaContext(schemaContext).props();
+ builder = null;
+ return props;
+ }
+
+ String getShardName() {
+ return shardName;
+ }
+
+ @Nullable
+ ActorRef getActor(){
+ return actor;
+ }
+
+ ActorPath getActorPath() {
+ return actorPath;
+ }
+
+ void setActor(ActorRef actor) {
+ this.actor = actor;
+ this.actorPath = actor.path();
+ }
+
+ ShardIdentifier getShardId() {
+ return shardId;
+ }
+
+ void setLocalDataTree(Optional<DataTree> localShardDataTree) {
+ this.localShardDataTree = localShardDataTree;
+ }
+
+ Optional<DataTree> getLocalShardDataTree() {
+ return localShardDataTree;
+ }
+
+ DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
+
+ void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+ this.datastoreContext = datastoreContext;
+ if (actor != null) {
+ LOG.debug ("Sending new DatastoreContext to {}", shardId);
+ actor.tell(this.datastoreContext, sender);
+ }
+ }
+
+ void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
+ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+
+ if(actor != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
+ peerId, peerAddress, actor.path());
+ }
+
+ actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
+ }
+
+ notifyOnShardInitializedCallbacks();
+ }
+
+ void peerDown(String memberName, String peerId, ActorRef sender) {
+ if(actor != null) {
+ actor.tell(new PeerDown(memberName, peerId), sender);
+ }
+ }
+
+ void peerUp(String memberName, String peerId, ActorRef sender) {
+ if(actor != null) {
+ actor.tell(new PeerUp(memberName, peerId), sender);
+ }
+ }
+
+ boolean isShardReady() {
+ return !RaftState.Candidate.name().equals(role) && !Strings.isNullOrEmpty(role);
+ }
+
+ boolean isShardReadyWithLeaderId() {
+ return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
+ (isLeader() || addressResolver.resolve(leaderId) != null);
+ }
+
+ boolean isShardInitialized() {
+ return getActor() != null && actorInitialized;
+ }
+
+ boolean isLeader() {
+ return Objects.equal(leaderId, shardId.toString());
+ }
+
+ String getSerializedLeaderActor() {
+ if(isLeader()) {
+ return Serialization.serializedActorPath(getActor());
+ } else {
+ return addressResolver.resolve(leaderId);
+ }
+ }
+
+ void setActorInitialized() {
+ LOG.debug("Shard {} is initialized", shardId);
+
+ this.actorInitialized = true;
+
+ notifyOnShardInitializedCallbacks();
+ }
+
+ private void notifyOnShardInitializedCallbacks() {
+ if(onShardInitializedSet.isEmpty()) {
+ return;
+ }
+
+ boolean ready = isShardReadyWithLeaderId();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
+ ready ? "ready" : "initialized", onShardInitializedSet.size());
+ }
+
+ Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
+ while(iter.hasNext()) {
+ OnShardInitialized onShardInitialized = iter.next();
+ if(!(onShardInitialized instanceof OnShardReady) || ready) {
+ iter.remove();
+ onShardInitialized.getTimeoutSchedule().cancel();
+ onShardInitialized.getReplyRunnable().run();
+ }
+ }
+ }
+
+ void addOnShardInitialized(OnShardInitialized onShardInitialized) {
+ onShardInitializedSet.add(onShardInitialized);
+ }
+
+ void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
+ onShardInitializedSet.remove(onShardInitialized);
+ }
+
+ void setRole(String newRole) {
+ this.role = newRole;
+
+ notifyOnShardInitializedCallbacks();
+ }
+
+ void setFollowerSyncStatus(boolean syncStatus){
+ this.followerSyncStatus = syncStatus;
+ }
+
+ boolean isInSync(){
+ if(RaftState.Follower.name().equals(this.role)){
+ return followerSyncStatus;
+ } else if(RaftState.Leader.name().equals(this.role)){
+ return true;
+ }
+
+ return false;
+ }
+
+ boolean setLeaderId(String leaderId) {
+ boolean changed = !Objects.equal(this.leaderId, leaderId);
+ this.leaderId = leaderId;
+ if(leaderId != null) {
+ this.leaderAvailable = true;
+ }
+ notifyOnShardInitializedCallbacks();
+
+ return changed;
+ }
+
+ String getLeaderId() {
+ return leaderId;
+ }
+
+ void setLeaderAvailable(boolean leaderAvailable) {
+ this.leaderAvailable = leaderAvailable;
+
+ if(leaderAvailable) {
+ notifyOnShardInitializedCallbacks();
+ }
+ }
+
+ short getLeaderVersion() {
+ return leaderVersion;
+ }
+
+ void setLeaderVersion(short leaderVersion) {
+ this.leaderVersion = leaderVersion;
+ }
+
+ boolean isActiveMember() {
+ return isActiveMember;
+ }
+
+ void setActiveMember(boolean isActiveMember) {
+ this.isActiveMember = isActiveMember;
+ }
+ }
+
+ private static class OnShardInitialized {
+ private final Runnable replyRunnable;
+ private Cancellable timeoutSchedule;
+
+ OnShardInitialized(Runnable replyRunnable) {
+ this.replyRunnable = replyRunnable;
+ }
+
+ Runnable getReplyRunnable() {
+ return replyRunnable;
+ }
+
+ Cancellable getTimeoutSchedule() {
+ return timeoutSchedule;
+ }
+
+ void setTimeoutSchedule(Cancellable timeoutSchedule) {
+ this.timeoutSchedule = timeoutSchedule;
+ }
+ }
+
+ private static class OnShardReady extends OnShardInitialized {
+ OnShardReady(Runnable replyRunnable) {
+ super(replyRunnable);
+ }
+ }
+
+ private static class ShardNotInitializedTimeout {
+ private final ActorRef sender;
+ private final ShardInformation shardInfo;
+ private final OnShardInitialized onShardInitialized;
+
+ ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+ this.sender = sender;
+ this.shardInfo = shardInfo;
+ this.onShardInitialized = onShardInitialized;
+ }
+
+ ActorRef getSender() {
+ return sender;
+ }
+
+ ShardInformation getShardInfo() {
+ return shardInfo;
+ }
+
+ OnShardInitialized getOnShardInitialized() {
+ return onShardInitialized;
+ }
+ }
+
+ /**
+ * We no longer persist SchemaContextModules but keep this class around for now for backwards
+ * compatibility so we don't get de-serialization failures on upgrade from Helium.
+ */
+ @Deprecated
+ static class SchemaContextModules implements Serializable {
+ private static final long serialVersionUID = -8884620101025936590L;
+
+ private final Set<String> modules;
+
+ SchemaContextModules(Set<String> modules){
+ this.modules = modules;
+ }
+
+ public Set<String> getModules() {
+ return modules;
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
+ private ClusterWrapper cluster;
+ private Configuration configuration;
+ private DatastoreContextFactory datastoreContextFactory;
+ private CountDownLatch waitTillReadyCountdownLatch;
+ private PrimaryShardInfoFutureCache primaryShardInfoCache;
+ private DatastoreSnapshot restoreFromSnapshot;
+
+ private volatile boolean sealed;
+
+ @SuppressWarnings("unchecked")
+ private T self() {
+ return (T) this;
+ }
+
+ protected void checkSealed() {
+ Preconditions.checkState(!sealed, "Builder is already sealed - further modifications are not allowed");
+ }
+
+ public T cluster(ClusterWrapper cluster) {
+ checkSealed();
+ this.cluster = cluster;
+ return self();
+ }
+
+ public T configuration(Configuration configuration) {
+ checkSealed();
+ this.configuration = configuration;
+ return self();
+ }
+
+ public T datastoreContextFactory(DatastoreContextFactory datastoreContextFactory) {
+ checkSealed();
+ this.datastoreContextFactory = datastoreContextFactory;
+ return self();
+ }
+
+ public T waitTillReadyCountdownLatch(CountDownLatch waitTillReadyCountdownLatch) {
+ checkSealed();
+ this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+ return self();
+ }
+
+ public T primaryShardInfoCache(PrimaryShardInfoFutureCache primaryShardInfoCache) {
+ checkSealed();
+ this.primaryShardInfoCache = primaryShardInfoCache;
+ return self();
+ }
+
+ public T restoreFromSnapshot(DatastoreSnapshot restoreFromSnapshot) {
+ checkSealed();
+ this.restoreFromSnapshot = restoreFromSnapshot;
+ return self();
+ }
+
+ protected void verify() {
+ sealed = true;
+ Preconditions.checkNotNull(cluster, "cluster should not be null");
+ Preconditions.checkNotNull(configuration, "configuration should not be null");
+ Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+ Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
+ Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
+ }
+
+ public Props props() {
+ verify();
+ return Props.create(ShardManager.class, this);
+ }
+ }
+
+ public static class Builder extends AbstractBuilder<Builder> {
+ }
+
+ private void findPrimary(final String shardName, final FindPrimaryResponseHandler handler) {
+ Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+ getShardInitializationTimeout().duration().$times(2));
+
+
+ Future<Object> futureObj = ask(getSelf(), new FindPrimary(shardName, true), findPrimaryTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ handler.onFailure(failure);
+ } else {
+ if(response instanceof RemotePrimaryShardFound) {
+ handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
+ } else if(response instanceof LocalPrimaryShardFound) {
+ handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
+ } else {
+ handler.onUnknownResponse(response);
+ }
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ /**
+ * The FindPrimaryResponseHandler provides specific callback methods which are invoked when a response to the
+ * a remote or local find primary message is processed
+ */
+ private static interface FindPrimaryResponseHandler {
+ /**
+ * Invoked when a Failure message is received as a response
+ *
+ * @param failure
+ */
+ void onFailure(Throwable failure);
+
+ /**
+ * Invoked when a RemotePrimaryShardFound response is received
+ *
+ * @param response
+ */
+ void onRemotePrimaryShardFound(RemotePrimaryShardFound response);
+
+ /**
+ * Invoked when a LocalPrimaryShardFound response is received
+ * @param response
+ */
+ void onLocalPrimaryFound(LocalPrimaryShardFound response);
+
+ /**
+ * Invoked when an unknown response is received. This is another type of failure.
+ *
+ * @param response
+ */
+ void onUnknownResponse(Object response);
+ }
+
+ /**
+ * The AutoFindPrimaryFailureResponseHandler automatically processes Failure responses when finding a primary
+ * replica and sends a wrapped Failure response to some targetActor
+ */
+ private static abstract class AutoFindPrimaryFailureResponseHandler implements FindPrimaryResponseHandler {
+ private final ActorRef targetActor;
+ private final String shardName;
+ private final String persistenceId;
+ private final ActorRef shardManagerActor;
+
+ /**
+ * @param targetActor The actor to whom the Failure response should be sent when a FindPrimary failure occurs
+ * @param shardName The name of the shard for which the primary replica had to be found
+ * @param persistenceId The persistenceId for the ShardManager
+ * @param shardManagerActor The ShardManager actor which triggered the call to FindPrimary
+ */
+ protected AutoFindPrimaryFailureResponseHandler(ActorRef targetActor, String shardName, String persistenceId, ActorRef shardManagerActor){
+ this.targetActor = Preconditions.checkNotNull(targetActor);
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.persistenceId = Preconditions.checkNotNull(persistenceId);
+ this.shardManagerActor = Preconditions.checkNotNull(shardManagerActor);
+ }
+
+ public ActorRef getTargetActor() {
+ return targetActor;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId, shardName, failure);
+ targetActor.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("Failed to find leader for shard %s", shardName), failure)), shardManagerActor);
+ }
+
+ @Override
+ public void onUnknownResponse(Object response) {
+ String msg = String.format("Failed to find leader for shard %s: received response: %s",
+ shardName, response);
+ LOG.debug ("{}: {}", persistenceId, msg);
+ targetActor.tell(new akka.actor.Status.Failure(response instanceof Throwable ? (Throwable) response :
+ new RuntimeException(msg)), shardManagerActor);
+ }
+ }
+
+
+ /**
+ * The PrimaryShardFoundForContext is a DTO which puts together a message (aka 'Context' message) which needs to be
+ * forwarded to the primary replica of a shard and the message (aka 'PrimaryShardFound' message) that is received
+ * as a successful response to find primary.
+ */
+ private static class PrimaryShardFoundForContext {
+ private final String shardName;
+ private final Object contextMessage;
+ private final RemotePrimaryShardFound remotePrimaryShardFound;
+ private final LocalPrimaryShardFound localPrimaryShardFound;
+
+ public PrimaryShardFoundForContext(@Nonnull String shardName, @Nonnull Object contextMessage,
+ @Nonnull Object primaryFoundMessage) {
+ this.shardName = Preconditions.checkNotNull(shardName);
+ this.contextMessage = Preconditions.checkNotNull(contextMessage);
+ Preconditions.checkNotNull(primaryFoundMessage);
+ this.remotePrimaryShardFound = (primaryFoundMessage instanceof RemotePrimaryShardFound) ?
+ (RemotePrimaryShardFound) primaryFoundMessage : null;
+ this.localPrimaryShardFound = (primaryFoundMessage instanceof LocalPrimaryShardFound) ?
+ (LocalPrimaryShardFound) primaryFoundMessage : null;
+ }
+
+ @Nonnull
+ String getPrimaryPath(){
+ if(remotePrimaryShardFound != null) {
+ return remotePrimaryShardFound.getPrimaryPath();
+ }
+ return localPrimaryShardFound.getPrimaryPath();
+ }
+
+ @Nonnull
+ Object getContextMessage() {
+ return contextMessage;
+ }
+
+ @Nullable
+ RemotePrimaryShardFound getRemotePrimaryShardFound() {
+ return remotePrimaryShardFound;
+ }
+
+ @Nonnull
+ String getShardName() {
+ return shardName;
+ }
+ }
+
+ /**
+ * The WrappedShardResponse class wraps a response from a Shard.
+ */
+ private static class WrappedShardResponse {
+ private final String shardName;
+ private final Object response;
+
+ private WrappedShardResponse(String shardName, Object response) {
+ this.shardName = shardName;
+ this.response = response;
+ }
+
+ String getShardName() {
+ return shardName;
+ }
+
+ Object getResponse() {
+ return response;