import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.NotLeaderException;
+import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
PeerAddressResolved resolved = (PeerAddressResolved) message;
setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
} else if (TX_COMMIT_TIMEOUT_CHECK_MESSAGE.equals(message)) {
- store.checkForExpiredTransactions(transactionCommitTimeout);
- commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ commitTimeoutCheck();
} else if (message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
} else if (message instanceof RegisterRoleChangeListener) {
}
}
+ private void commitTimeoutCheck() {
+ store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
+ commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
+ }
+
+ private Optional<Long> updateAccess(final SimpleShardDataTreeCohort cohort) {
+ final FrontendIdentifier frontend = cohort.getIdentifier().getHistoryId().getClientId().getFrontendId();
+ // If this frontend has freshly connected, give it some time to catch up before killing its transactions.
+ final LeaderFrontendState state = knownFrontends.get(frontend);
+ return state == null ? Optional.absent() : Optional.of(state.getLastConnectTicks());
+ }
+
private void onMakeLeaderLocal() {
LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
if (isLeader()) {
}
// Acquire our frontend tracking handle and verify generation matches
- private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+ @Nullable
+ private LeaderFrontendState findFrontend(final ClientIdentifier clientId) throws RequestException {
final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId());
if (existing != null) {
final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration());
if (cmp == 0) {
+ existing.touch();
return existing;
}
if (cmp > 0) {
LOG.debug("{}: client {} is not yet known", persistenceId(), clientId);
}
- final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store);
- knownFrontends.put(clientId.getFrontendId(), ret);
- LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId);
- return ret;
+ return null;
+ }
+
+ private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException {
+ final LeaderFrontendState ret = findFrontend(clientId);
+ if (ret != null) {
+ return ret;
+ }
+
+ // TODO: a dedicated exception would be better, but this is technically true, too
+ throw new OutOfSequenceEnvelopeException(0);
}
private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) {
@SuppressWarnings("checkstyle:IllegalCatch")
private void handleConnectClient(final ConnectClientRequest message) {
try {
+ final ClientIdentifier clientId = message.getTarget();
+ final LeaderFrontendState existing = findFrontend(clientId);
+ if (existing != null) {
+ existing.touch();
+ }
+
if (!isLeader() || !isLeaderActive()) {
LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
+ "isLeadershipTransferInProgress: {}.",
}
final ABIVersion selectedVersion = selectVersion(message);
- final LeaderFrontendState frontend = getFrontend(message.getTarget());
+ final LeaderFrontendState frontend;
+ if (existing == null) {
+ frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+ knownFrontends.put(clientId.getFrontendId(), frontend);
+ LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
+ } else {
+ frontend = existing;
+ }
+
frontend.reconnect();
message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(),
ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion),