+ private void onMakeLeaderLocal() {
+ LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
+ if (isLeader()) {
+ getSender().tell(new Status.Success(null), getSelf());
+ return;
+ }
+
+ final ActorSelection leader = getLeader();
+
+ if (leader == null) {
+ // Leader is not present. The cluster is most likely trying to
+ // elect a leader and we should let that run its normal course
+
+ // TODO we can wait for the election to complete and retry the
+ // request. We can also let the caller retry by sending a flag
+ // in the response indicating the request is "reTryable".
+ getSender().tell(new Failure(
+ new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
+ + "Currently there is no leader for " + persistenceId())),
+ getSelf());
+ return;
+ }
+
+ leader.tell(new RequestLeadership(getId(), getSender()), getSelf());
+ }
+
+ // Acquire our frontend tracking handle and verify generation matches
+ private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+ final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
+ if (existing != null) {
+ final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
+ if (cmp == 0) {
+ return existing;
+ }
+ if (cmp > 0) {
+ LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
+ throw new RetiredGenerationException(existing.getIdentifier().getGeneration());
+ }
+
+ LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
+ existing.retire();
+ knownFrontends.remove(clientId.getFrontendId());
+ } else {
+ LOG.debug("{}: client {} is not yet known", persistenceId(), clientId);
+ }
+
+ final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store);
+ knownFrontends.put(clientId.getFrontendId(), ret);
+ LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId);
+ return ret;
+ }
+
+ private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
+ final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
+ for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
+ if (clientRange.contains(v)) {
+ return v;
+ }
+ }
+
+ throw new IllegalArgumentException(String.format(
+ "No common version between backend versions %s and client versions %s", SUPPORTED_ABIVERSIONS,
+ clientRange));
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void handleConnectClient(final ConnectClientRequest message) {
+ try {
+ if (!isLeader() || !isLeaderActive()) {
+ LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message);
+ throw new NotLeaderException(getSelf());
+ }
+
+ final ABIVersion selectedVersion = selectVersion(message);
+ final LeaderFrontendState frontend = getFrontend(message.getTarget());
+ frontend.reconnect();
+ message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(),
+ ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion),
+ ActorRef.noSender());
+ } catch (RequestException | RuntimeException e) {
+ message.getReplyTo().tell(new Failure(e), ActorRef.noSender());
+ }
+ }
+
+ private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+ throws RequestException {
+ // We are not the leader, hence we want to fail-fast.
+ if (!isLeader() || !isLeaderActive()) {
+ LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
+ throw new NotLeaderException(getSelf());
+ }
+
+ final Request<?, ?> request = envelope.getMessage();
+ if (request instanceof TransactionRequest) {
+ final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
+ final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
+ return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
+ } else if (request instanceof LocalHistoryRequest) {
+ final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
+ final ClientIdentifier clientId = lhReq.getTarget().getClientId();
+ return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
+ } else {
+ LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
+ throw new UnsupportedRequestException(request);
+ }
+ }
+