+
+ void setActorInitialized() {
+ LOG.debug("Shard {} is initialized", shardId);
+
+ this.actorInitialized = true;
+
+ notifyOnShardInitializedCallbacks();
+ }
+
+ private void notifyOnShardInitializedCallbacks() {
+ if(onShardInitializedSet.isEmpty()) {
+ return;
+ }
+
+ boolean ready = isShardReadyWithLeaderId();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Shard {} is {} - notifying {} OnShardInitialized callbacks", shardId,
+ ready ? "ready" : "initialized", onShardInitializedSet.size());
+ }
+
+ Iterator<OnShardInitialized> iter = onShardInitializedSet.iterator();
+ while(iter.hasNext()) {
+ OnShardInitialized onShardInitialized = iter.next();
+ if(!(onShardInitialized instanceof OnShardReady) || ready) {
+ iter.remove();
+ onShardInitialized.getTimeoutSchedule().cancel();
+ onShardInitialized.getReplyRunnable().run();
+ }
+ }
+ }
+
+ void addOnShardInitialized(OnShardInitialized onShardInitialized) {
+ onShardInitializedSet.add(onShardInitialized);
+ }
+
+ void removeOnShardInitialized(OnShardInitialized onShardInitialized) {
+ onShardInitializedSet.remove(onShardInitialized);
+ }
+
+ void setRole(String newRole) {
+ this.role = newRole;
+
+ notifyOnShardInitializedCallbacks();
+ }
+
+ void setFollowerSyncStatus(boolean syncStatus){
+ this.followerSyncStatus = syncStatus;
+ }
+
+ boolean isInSync(){
+ if(RaftState.Follower.name().equals(this.role)){
+ return followerSyncStatus;
+ } else if(RaftState.Leader.name().equals(this.role)){
+ return true;
+ }
+
+ return false;
+ }
+
+ boolean setLeaderId(String leaderId) {
+ boolean changed = !Objects.equal(this.leaderId, leaderId);
+ this.leaderId = leaderId;
+ if(leaderId != null) {
+ this.leaderAvailable = true;
+ }
+ notifyOnShardInitializedCallbacks();
+
+ return changed;
+ }
+
+ public String getLeaderId() {
+ return leaderId;
+ }
+
+ public void setLeaderAvailable(boolean leaderAvailable) {
+ this.leaderAvailable = leaderAvailable;
+ }
+ }
+
+ private static class ShardManagerCreator implements Creator<ShardManager> {
+ private static final long serialVersionUID = 1L;
+
+ final ClusterWrapper cluster;
+ final Configuration configuration;
+ final DatastoreContext datastoreContext;
+ private final CountDownLatch waitTillReadyCountdownLatch;
+ private final PrimaryShardInfoFutureCache primaryShardInfoCache;
+
+ ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
+ CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
+ this.cluster = cluster;
+ this.configuration = configuration;
+ this.datastoreContext = datastoreContext;
+ this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
+ this.primaryShardInfoCache = primaryShardInfoCache;
+ }
+
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
+ primaryShardInfoCache);
+ }
+ }
+
+ private static class OnShardInitialized {
+ private final Runnable replyRunnable;
+ private Cancellable timeoutSchedule;
+
+ OnShardInitialized(Runnable replyRunnable) {
+ this.replyRunnable = replyRunnable;
+ }
+
+ Runnable getReplyRunnable() {
+ return replyRunnable;
+ }
+
+ Cancellable getTimeoutSchedule() {
+ return timeoutSchedule;
+ }
+
+ void setTimeoutSchedule(Cancellable timeoutSchedule) {
+ this.timeoutSchedule = timeoutSchedule;
+ }
+ }
+
+ private static class OnShardReady extends OnShardInitialized {
+ OnShardReady(Runnable replyRunnable) {
+ super(replyRunnable);
+ }
+ }
+
+ private static class ShardNotInitializedTimeout {
+ private final ActorRef sender;
+ private final ShardInformation shardInfo;
+ private final OnShardInitialized onShardInitialized;
+
+ ShardNotInitializedTimeout(ShardInformation shardInfo, OnShardInitialized onShardInitialized, ActorRef sender) {
+ this.sender = sender;
+ this.shardInfo = shardInfo;
+ this.onShardInitialized = onShardInitialized;
+ }
+
+ ActorRef getSender() {
+ return sender;
+ }
+
+ ShardInformation getShardInfo() {
+ return shardInfo;
+ }
+
+ OnShardInitialized getOnShardInitialized() {
+ return onShardInitialized;
+ }
+ }
+
+ static class SchemaContextModules implements Serializable {
+ private static final long serialVersionUID = -8884620101025936590L;
+
+ private final Set<String> modules;
+
+ SchemaContextModules(Set<String> modules){
+ this.modules = modules;
+ }
+
+ public Set<String> getModules() {
+ return modules;
+ }