+
+ LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
+ existing.retire();
+ knownFrontends.remove(clientId.getFrontendId());
+ } else {
+ LOG.debug("{}: client {} is not yet known", persistenceId(), clientId);
+ }
+
+ final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store);
+ knownFrontends.put(clientId.getFrontendId(), ret);
+ LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId);
+ return ret;
+ }
+
+ private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
+ final Range<ABIVersion> clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion());
+ for (ABIVersion v : SUPPORTED_ABIVERSIONS) {
+ if (clientRange.contains(v)) {
+ return v;
+ }
+ }
+
+ throw new IllegalArgumentException(String.format(
+ "No common version between backend versions %s and client versions %s", SUPPORTED_ABIVERSIONS,
+ clientRange));
+ }
+
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void handleConnectClient(final ConnectClientRequest message) {
+ try {
+ if (!isLeader() || !isLeaderActive()) {
+ LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}.",
+ persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
+ throw new NotLeaderException(getSelf());
+ }
+
+ final ABIVersion selectedVersion = selectVersion(message);
+ final LeaderFrontendState frontend = getFrontend(message.getTarget());
+ frontend.reconnect();
+ message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(),
+ ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion),
+ ActorRef.noSender());
+ } catch (RequestException | RuntimeException e) {
+ message.getReplyTo().tell(new Failure(e), ActorRef.noSender());
+ }
+ }
+
+ private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+ throws RequestException {
+ // We are not the leader, hence we want to fail-fast.
+ if (!isLeader() || !isLeaderActive()) {
+ LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}.",
+ persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
+ throw new NotLeaderException(getSelf());
+ }
+
+ final Request<?, ?> request = envelope.getMessage();
+ if (request instanceof TransactionRequest) {
+ final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
+ final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
+ return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
+ } else if (request instanceof LocalHistoryRequest) {
+ final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
+ final ClientIdentifier clientId = lhReq.getTarget().getClientId();
+ return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
+ } else {
+ LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
+ throw new UnsupportedRequestException(request);