+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void handleConnectClient(final ConnectClientRequest message) {
+ try {
+ final ClientIdentifier clientId = message.getTarget();
+ final LeaderFrontendState existing = findFrontend(clientId);
+ if (existing != null) {
+ existing.touch();
+ }
+
+ if (!isLeader() || !isLeaderActive()) {
+ LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ + "isLeadershipTransferInProgress: {}.",
+ persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
+ throw new NotLeaderException(getSelf());
+ }
+
+ final ABIVersion selectedVersion = selectVersion(message);
+ final LeaderFrontendState frontend;
+ if (existing == null) {
+ frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+ knownFrontends.put(clientId.getFrontendId(), frontend);
+ LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
+ } else {
+ frontend = existing;
+ }
+
+ frontend.reconnect();
+ message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(),
+ ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion),
+ ActorRef.noSender());
+ } catch (RequestException | RuntimeException e) {
+ message.getReplyTo().tell(new Failure(e), ActorRef.noSender());
+ }