+ private void handleRequestAssemblerMessage(final Object message) {
+ dispatchers.getDispatcher(DispatcherType.Serialization).execute(() -> {
+ JavaSerializer.currentSystem().value_$eq((ExtendedActorSystem) context().system());
+ requestMessageAssembler.handleMessage(message, self());
+ });
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void handleRequestEnvelope(final RequestEnvelope envelope) {
+ final long now = ticker().read();
+ try {
+ final RequestSuccess<?, ?> success = handleRequest(envelope, now);
+ if (success != null) {
+ final long executionTimeNanos = ticker().read() - now;
+ if (success instanceof SliceableMessage) {
+ dispatchers.getDispatcher(DispatcherType.Serialization).execute(() ->
+ responseMessageSlicer.slice(SliceOptions.builder().identifier(success.getTarget())
+ .message(envelope.newSuccessEnvelope(success, executionTimeNanos))
+ .sendTo(envelope.getMessage().getReplyTo()).replyTo(self())
+ .onFailureCallback(t -> LOG.warn("Error slicing response {}", success, t)).build()));
+ } else {
+ envelope.sendSuccess(success, executionTimeNanos);
+ }
+ }
+ } catch (RequestException e) {
+ LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
+ envelope.sendFailure(e, ticker().read() - now);
+ } catch (Exception e) {
+ LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+ ticker().read() - now);
+ }
+ }
+
+ private void commitTimeoutCheck() {
+ store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
+ commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ requestMessageAssembler.checkExpiredAssembledMessageState();
+ }
+
+ private OptionalLong updateAccess(final SimpleShardDataTreeCohort cohort) {
+ final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
+ final LeaderFrontendState state = knownFrontends.get(frontend);
+ if (state == null) {
+ // Not tell-based protocol, do nothing
+ return OptionalLong.empty();
+ }
+
+ if (isIsolatedLeader()) {
+ // We are isolated and no new request can come through until we emerge from it. We are still updating
+ // liveness of frontend when we see it attempting to communicate. Use the last access timer.
+ return OptionalLong.of(state.getLastSeenTicks());
+ }
+
+ // If this frontend has freshly connected, give it some time to catch up before killing its transactions.
+ return OptionalLong.of(state.getLastConnectTicks());
+ }
+
+ private void disableTracking(final DisableTrackingPayload payload) {
+ final ClientIdentifier clientId = payload.getIdentifier();
+ LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId);
+ frontendMetadata.disableTracking(clientId);
+
+ if (isLeader()) {
+ final FrontendIdentifier frontendId = clientId.getFrontendId();
+ final LeaderFrontendState frontend = knownFrontends.get(frontendId);
+ if (frontend != null) {
+ if (clientId.equals(frontend.getIdentifier())) {
+ if (!(frontend instanceof LeaderFrontendState.Disabled)) {
+ verify(knownFrontends.replace(frontendId, frontend,
+ new LeaderFrontendState.Disabled(persistenceId(), clientId, store)));
+ LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId);
+ } else {
+ LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend);
+ }
+ } else {
+ LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId);
+ }
+ } else {
+ LOG.debug("{}: leader state for {} not found", persistenceId(), clientId);
+ knownFrontends.put(frontendId, new LeaderFrontendState.Disabled(persistenceId(), clientId,
+ getDataStore()));
+ }
+ }
+ }
+
+ private void onMakeLeaderLocal() {
+ LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
+ if (isLeader()) {
+ getSender().tell(new Status.Success(null), getSelf());
+ return;
+ }
+
+ final ActorSelection leader = getLeader();
+
+ if (leader == null) {
+ // Leader is not present. The cluster is most likely trying to
+ // elect a leader and we should let that run its normal course
+
+ // TODO we can wait for the election to complete and retry the
+ // request. We can also let the caller retry by sending a flag
+ // in the response indicating the request is "reTryable".
+ getSender().tell(new Failure(
+ new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
+ + "Currently there is no leader for " + persistenceId())),
+ getSelf());
+ return;
+ }
+
+ leader.tell(new RequestLeadership(getId(), getSender()), getSelf());
+ }
+