+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+ store.processCohortRegistryCommand(getSender(),
+ (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
+ } else if (message instanceof PersistAbortTransactionPayload) {
+ final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
+ persistPayload(txId, AbortTransactionPayload.create(txId), true);
+ } else if (message instanceof MakeLeaderLocal) {
+ onMakeLeaderLocal();
+ } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
+ store.resumeNextPendingTransaction();
+ } else if (!responseMessageSlicer.handleMessage(message)) {
+ super.handleNonRaftCommand(message);
+ }
+ }
+ }
+
+ private void handleRequestAssemblerMessage(final Object message) {
+ dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> {
+ JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system());
+ requestMessageAssembler.handleMessage(message, self());
+ });
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void handleRequestEnvelope(final RequestEnvelope envelope) {
+ final long now = ticker().read();
+ try {
+ final RequestSuccess<?, ?> success = handleRequest(envelope, now);
+ if (success != null) {
+ final long executionTimeNanos = ticker().read() - now;
+ if (success instanceof SliceableMessage) {
+ dispatchers.getDispatcher(DispatcherType.Serialization).execute(() ->
+ responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
+ .message(envelope.newSuccessEnvelope(success, executionTimeNanos))
+ .sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
+ .onFailureCallback(t -> {
+ LOG.warn("Error slicing response {}", success, t);
+ }).build()));
+ } else {
+ envelope.sendSuccess(success, executionTimeNanos);
+ }
+ }
+ } catch (RequestException e) {
+ LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
+ envelope.sendFailure(e, ticker().read() - now);
+ } catch (Exception e) {
+ LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+ ticker().read() - now);
+ }
+ }
+
+ private void commitTimeoutCheck() {
+ store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
+ commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ requestMessageAssembler.checkExpiredAssembledMessageState();
+ }
+
+ private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
+ final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
+ final LeaderFrontendState state = knownFrontends.get(frontend);
+ if (state == null) {
+ // Not tell-based protocol, do nothing
+ return Optional.absent();
+ }
+
+ if (isIsolatedLeader()) {
+ // We are isolated and no new request can come through until we emerge from it. We are still updating
+ // liveness of frontend when we see it attempting to communicate. Use the last access timer.
+ return Optional.of(state.getLastSeenTicks());
+ }
+
+ // If this frontend has freshly connected, give it some time to catch up before killing its transactions.
+ return Optional.of(state.getLastConnectTicks());
+ }
+
+ private void onMakeLeaderLocal() {
+ LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
+ if (isLeader()) {
+ getSender().tell(new Status.Success(null), getSelf());
+ return;
+ }
+
+ final ActorSelection leader = getLeader();
+
+ if (leader == null) {
+ // Leader is not present. The cluster is most likely trying to
+ // elect a leader and we should let that run its normal course
+
+ // TODO we can wait for the election to complete and retry the
+ // request. We can also let the caller retry by sending a flag
+ // in the response indicating the request is "reTryable".
+ getSender().tell(new Failure(
+ new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
+ + "Currently there is no leader for " + persistenceId())),
+ getSelf());
+ return;
+ }
+
+ leader.tell(new RequestLeadership(getId(), getSender()), getSelf());
+ }
+
+ // Acquire our frontend tracking handle and verify generation matches
+ @Nullable
+ private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException {
+ final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
+ if (existing != null) {
+ final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
+ if (cmp == 0) {
+ existing.touch();
+ return existing;
+ }
+ if (cmp > 0) {
+ LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
+ throw new RetiredGenerationException(existing.getIdentifier().getGeneration());
+ }
+
+ LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
+ existing.retire();
+ knownFrontends.remove(clientId.getFrontendId());
+ } else {
+ LOG.debug("{}: client {} is not yet known", persistenceId(), clientId);
+ }
+
+ return null;
+ }
+
+ private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+ final LeaderFrontendState ret = findFrontend(clientId);
+ if (ret != null) {
+ return ret;
+ }
+
+ // TODO: a dedicated exception would be better, but this is technically true, too
+ throw new OutOfSequenceEnvelopeException(0);
+ }
+
+ @Nonnull
+ private static ABIVersion selectVersion(final ConnectClientRequest message) {
+ final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
+ for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
+ if (clientRange.contains(v)) {
+ return v;
+ }
+ }
+
+ throw new IllegalArgumentException(String.format(
+ "No common version between backend versions %s and client versions %s", SUPPORTED_ABIVERSIONS,
+ clientRange));
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void handleConnectClient(final ConnectClientRequest message) {
+ try {
+ final ClientIdentifier clientId = message.getTarget();
+ final LeaderFrontendState existing = findFrontend(clientId);
+ if (existing != null) {
+ existing.touch();
+ }
+
+ if (!isLeader() || !isLeaderActive()) {
+ LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}.",
+ persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
+ throw new NotLeaderException(getSelf());
+ }
+
+ final ABIVersion selectedVersion = selectVersion(message);
+ final LeaderFrontendState frontend;
+ if (existing == null) {
+ frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+ knownFrontends.put(clientId.getFrontendId(), frontend);
+ LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);