import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
+import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
-import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongSet;
+import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing
* in the frontend/backend conversation. This class is NOT thread-safe.
- *
- * @author Robert Varga
*/
-abstract class LeaderFrontendState implements Identifiable<ClientIdentifier> {
+abstract sealed class LeaderFrontendState implements Identifiable<ClientIdentifier> {
static final class Disabled extends LeaderFrontendState {
Disabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
super(persistenceId, clientId, tree);
// Histories which have not been purged
private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
- // RangeSet performs automatic merging, hence we keep minimal state tracking information
- private final UnsignedLongSet purgedHistories;
+ // UnsignedLongSet performs automatic merging, hence we keep minimal state tracking information
+ private final MutableUnsignedLongSet purgedHistories;
// Used for all standalone transactions
private final AbstractFrontendHistory standaloneHistory;
private Long lastSeenHistory = null;
Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
- this(persistenceId, clientId, tree, UnsignedLongSet.of(),
+ this(persistenceId, clientId, tree, MutableUnsignedLongSet.of(),
StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
}
Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
- final UnsignedLongSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
+ final MutableUnsignedLongSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
super(persistenceId, clientId, tree);
this.purgedHistories = requireNonNull(purgedHistories);
checkRequestSequence(envelope);
try {
- if (request instanceof CreateLocalHistoryRequest) {
- return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
- } else if (request instanceof DestroyLocalHistoryRequest) {
- return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
- } else if (request instanceof PurgeLocalHistoryRequest) {
- return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
+ if (request instanceof CreateLocalHistoryRequest req) {
+ return handleCreateHistory(req, envelope, now);
+ } else if (request instanceof DestroyLocalHistoryRequest req) {
+ return handleDestroyHistory(req, envelope, now);
+ } else if (request instanceof PurgeLocalHistoryRequest req) {
+ return handlePurgeHistory(req, envelope, now);
} else {
LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
throw new UnsupportedRequestException(request);
checkRequestSequence(envelope);
try {
- final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
+ final var lhId = request.getTarget().getHistoryId();
final AbstractFrontendHistory history;
if (lhId.getHistoryId() != 0) {
private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
- final LocalHistoryIdentifier historyId = request.getTarget();
- final AbstractFrontendHistory existing = localHistories.get(historyId);
+ final var historyId = request.getTarget();
+ final var existing = localHistories.get(historyId);
if (existing != null) {
// History already exists: report success
LOG.debug("{}: history {} already exists", persistenceId(), historyId);
}
// We have to send the response only after persistence has completed
- final ShardDataTreeTransactionChain chain = tree().ensureTransactionChain(historyId, () -> {
+ final var chain = tree().ensureTransactionChain(historyId, () -> {
LOG.debug("{}: persisted history {}", persistenceId(), historyId);
envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()),
tree().readTime() - now);
private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
final RequestEnvelope envelope, final long now) {
- final LocalHistoryIdentifier id = request.getTarget();
- final LocalFrontendHistory existing = localHistories.get(id);
+ final var id = request.getTarget();
+ final var existing = localHistories.get(id);
if (existing == null) {
// History does not exist: report success
LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId(), id);
private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
final RequestEnvelope envelope, final long now) {
- final LocalHistoryIdentifier id = request.getTarget();
- final LocalFrontendHistory existing = localHistories.remove(id);
+ final var id = request.getTarget();
+ final var existing = localHistories.remove(id);
if (existing == null) {
LOG.debug("{}: history {} has already been purged", persistenceId(), id);
return new LocalHistorySuccess(id, request.getSequence());
private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
- private final ShardDataTree tree;
- private final ClientIdentifier clientId;
- private final String persistenceId;
+ private final @NonNull ClientIdentifier clientId;
+ private final @NonNull String persistenceId;
+ private final @NonNull ShardDataTree tree;
private long lastConnectTicks;
private long lastSeenTicks;
void retire() {
// Hunt down any transactions associated with this frontend
- final Iterator<SimpleShardDataTreeCohort> it = tree.cohortIterator();
+ final var it = tree.cohortIterator();
while (it.hasNext()) {
- final SimpleShardDataTreeCohort cohort = it.next();
- if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
+ final var cohort = it.next();
+ final var transactionId = cohort.transactionId();
+ if (clientId.equals(transactionId.getHistoryId().getClientId())) {
if (cohort.getState() != State.COMMIT_PENDING) {
- LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
+ LOG.debug("{}: Retiring transaction {}", persistenceId, transactionId);
it.remove();
} else {
- LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
- cohort.getIdentifier());
+ LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId, transactionId);
}
}
}