+ private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+ throws RequestException {
+ // We are not the leader, hence we want to fail-fast.
+ if (!isLeader() || paused || !isLeaderActive()) {
+ LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}, paused: {}",
+ persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused);
+ throw new NotLeaderException(getSelf());
+ }
+
+ final Request<?, ?> request = envelope.getMessage();
+ if (request instanceof TransactionRequest) {
+ final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
+ final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
+ return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
+ } else if (request instanceof LocalHistoryRequest) {
+ final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
+ final ClientIdentifier clientId = lhReq.getTarget().getClientId();
+ return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
+ } else {
+ LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
+ throw new UnsupportedRequestException(request);
+ }
+ }
+
+ private boolean hasLeader() {
+ return getLeaderId() != null;
+ }
+
+ public int getPendingTxCommitQueueSize() {
+ return store.getQueueSize();
+ }
+
+ public int getCohortCacheSize() {
+ return commitCoordinator.getCohortCacheSize();
+ }
+
+ @Override
+ protected Optional<ActorRef> getRoleChangeNotifier() {
+ return roleChangeNotifier;
+ }
+
+ String getShardName() {
+ return shardName;
+ }
+
+ @Override
+ protected LeaderStateChanged newLeaderStateChanged(final String memberId, final String leaderId,
+ final short leaderPayloadVersion) {
+ return isLeader() ? new ShardLeaderStateChanged(memberId, leaderId, store.getDataTree(), leaderPayloadVersion)
+ : new ShardLeaderStateChanged(memberId, leaderId, leaderPayloadVersion);
+ }
+
+ protected void onDatastoreContext(final DatastoreContext context) {
+ datastoreContext = context;
+
+ setTransactionCommitTimeout();
+
+ setPersistence(datastoreContext.isPersistent());
+
+ updateConfigParams(datastoreContext.getShardRaftConfig());
+ }
+
+ // applyState() will be invoked once consensus is reached on the payload
+ void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
+ boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+ if (canSkipPayload) {
+ applyState(self(), id, payload);
+ } else {
+ // We are faking the sender
+ persistData(self(), id, payload, batchHint);
+ }
+ }
+
+ private void handleCommitTransaction(final CommitTransaction commit) {
+ if (isLeader()) {
+ commitCoordinator.handleCommit(commit.getTransactionId(), getSender(), this);
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader == null) {
+ messageRetrySupport.addMessageToRetry(commit, getSender(),
+ "Could not commit transaction " + commit.getTransactionId());