In order to deal with IsolatedLeader state and transaction timeouts,
we need to maintain an accurate view of when we have seen the frontend
even if we are not accepting messages from it.
Add correspoding field and maintain it whenever we interact with
LeaderFrontend state. Also record last connect ticks for the same use.
Change-Id: I8e49037507fcd01470a03be8c0d611efca55dabf
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
7633a2a50144dad7cf987b29959dc06509575c05)
private final ClientIdentifier clientId;
private final String persistenceId;
private final ClientIdentifier clientId;
private final String persistenceId;
+ private long lastConnectTicks;
+ private long lastSeenTicks;
private long expectedTxSequence;
private Long lastSeenHistory = null;
private long expectedTxSequence;
private Long lastSeenHistory = null;
this.purgedHistories = Preconditions.checkNotNull(purgedHistories);
this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory);
this.localHistories = Preconditions.checkNotNull(localHistories);
this.purgedHistories = Preconditions.checkNotNull(purgedHistories);
this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory);
this.localHistories = Preconditions.checkNotNull(localHistories);
+ this.lastSeenTicks = tree.readTime();
void reconnect() {
expectedTxSequence = 0;
void reconnect() {
expectedTxSequence = 0;
+ lastConnectTicks = tree.readTime();
standaloneHistory.retire();
}
standaloneHistory.retire();
}
+ long getLastConnectTicks() {
+ return lastConnectTicks;
+ }
+
+ long getLastSeenTicks() {
+ return lastSeenTicks;
+ }
+
+ void touch() {
+ this.lastSeenTicks = tree.readTime();
+ }
+
@Override
public String toString() {
@Override
public String toString() {
- return MoreObjects.toStringHelper(LeaderFrontendState.class).add("clientId", clientId)
- .add("purgedHistories", purgedHistories).toString();
+ return MoreObjects.toStringHelper(LeaderFrontendState.class)
+ .add("clientId", clientId)
+ .add("nanosAgo", tree.readTime() - lastSeenTicks)
+ .add("purgedHistories", purgedHistories)
+ .toString();
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
+import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
}
// Acquire our frontend tracking handle and verify generation matches
}
// Acquire our frontend tracking handle and verify generation matches
- private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+ @Nullable
+ private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException {
final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
if (existing != null) {
final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
if (cmp == 0) {
final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
if (existing != null) {
final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
if (cmp == 0) {
return existing;
}
if (cmp > 0) {
return existing;
}
if (cmp > 0) {
LOG.debug("{}: client {} is not yet known", persistenceId(), clientId);
}
LOG.debug("{}: client {} is not yet known", persistenceId(), clientId);
}
- final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store);
- knownFrontends.put(clientId.getFrontendId(), ret);
- LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId);
- return ret;
+ return null;
+ }
+
+ private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+ final LeaderFrontendState ret = findFrontend(clientId);
+ if (ret != null) {
+ return ret;
+ }
+
+ // TODO: a dedicated exception would be better, but this is technically true, too
+ throw new OutOfSequenceEnvelopeException(0);
}
private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
}
private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
@SuppressWarnings("checkstyle:IllegalCatch")
private void handleConnectClient(final ConnectClientRequest message) {
try {
@SuppressWarnings("checkstyle:IllegalCatch")
private void handleConnectClient(final ConnectClientRequest message) {
try {
+ final ClientIdentifier clientId = message.getTarget();
+ final LeaderFrontendState existing = findFrontend(clientId);
+ if (existing != null) {
+ existing.touch();
+ }
+
if (!isLeader() || !isLeaderActive()) {
LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ "isLeadershipTransferInProgress: {}.",
if (!isLeader() || !isLeaderActive()) {
LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ "isLeadershipTransferInProgress: {}.",
}
final ABIVersion selectedVersion = selectVersion(message);
}
final ABIVersion selectedVersion = selectVersion(message);
- final LeaderFrontendState frontend = getFrontend(message.getTarget());
+ final LeaderFrontendState frontend;
+ if (existing == null) {
+ frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+ knownFrontends.put(clientId.getFrontendId(), frontend);
+ LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
+ } else {
+ frontend = existing;
+ }
+
frontend.reconnect();
message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(),
ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion),
frontend.reconnect();
message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(),
ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion),