import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableRangeSet;
-import com.google.common.collect.RangeSet;
-import com.google.common.primitives.UnsignedLong;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
-import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet;
-import org.opendaylight.yangtools.concepts.Builder;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
+import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is NOT thread-safe.
*/
-abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMetadata>,
- Identifiable<ClientIdentifier> {
+// FIXME: sealed when we have JDK17+
+abstract class FrontendClientMetadataBuilder implements Identifiable<ClientIdentifier> {
static final class Disabled extends FrontendClientMetadataBuilder {
Disabled(final String shardName, final ClientIdentifier identifier) {
super(shardName, identifier);
}
@Override
- public FrontendClientMetadata build() {
- return new FrontendClientMetadata(getIdentifier(), ImmutableRangeSet.of(), ImmutableList.of());
+ FrontendClientMetadata build() {
+ return new FrontendClientMetadata(getIdentifier(), ImmutableUnsignedLongSet.of(), ImmutableList.of());
}
@Override
// No-op
}
+ @Override
+ void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
+ // No-op
+ }
+
@Override
LeaderFrontendState toLeaderState(final Shard shard) {
return new LeaderFrontendState.Disabled(shard.persistenceId(), getIdentifier(), shard.getDataStore());
}
static final class Enabled extends FrontendClientMetadataBuilder {
-
private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
- private final UnsignedLongRangeSet purgedHistories;
+ private final MutableUnsignedLongSet purgedHistories;
private final LocalHistoryIdentifier standaloneId;
Enabled(final String shardName, final ClientIdentifier identifier) {
super(shardName, identifier);
- purgedHistories = UnsignedLongRangeSet.create();
+ purgedHistories = MutableUnsignedLongSet.of();
// History for stand-alone transactions is always present
standaloneId = standaloneHistoryId();
Enabled(final String shardName, final FrontendClientMetadata meta) {
super(shardName, meta.getIdentifier());
- purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories());
+ purgedHistories = meta.getPurgedHistories().mutableCopy();
for (FrontendHistoryMetadata h : meta.getCurrentHistories()) {
final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(getIdentifier(), h);
currentHistories.put(b.getIdentifier(), b);
}
@Override
- public FrontendClientMetadata build() {
- return new FrontendClientMetadata(getIdentifier(), purgedHistories.toImmutable(),
+ FrontendClientMetadata build() {
+ return new FrontendClientMetadata(getIdentifier(), purgedHistories.immutableCopy(),
Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build));
}
}
}
+ @Override
+ void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
+ final FrontendHistoryMetadataBuilder history = getHistory(historyId);
+ if (history != null) {
+ history.onTransactionsSkipped(txIds);
+ LOG.debug("{}: History {} skipped transactions {}", shardName(), historyId, txIds);
+ } else {
+ LOG.warn("{}: Unknown history {} for skipped transactions, ignoring", shardName(), historyId);
+ }
+ }
+
@Override
LeaderFrontendState toLeaderState(final Shard shard) {
// Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
}
return new LeaderFrontendState.Enabled(shard.persistenceId(), getIdentifier(), shard.getDataStore(),
- purgedHistories.copy(), singleHistory, histories);
+ purgedHistories.mutableCopy(), singleHistory, histories);
}
@Override
}
private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
- LocalHistoryIdentifier historyId = txId.getHistoryId();
+ return getHistory(txId.getHistoryId());
+ }
+
+ private FrontendHistoryMetadataBuilder getHistory(final LocalHistoryIdentifier historyId) {
+ final LocalHistoryIdentifier local;
if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) {
// We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup
// needs to account for that.
LOG.debug("{}: looking up {} instead of {}", shardName(), standaloneId, historyId);
- historyId = standaloneId;
+ local = standaloneId;
+ } else {
+ local = historyId;
}
- return currentHistories.get(historyId);
+ return currentHistories.get(local);
}
private LocalHistoryIdentifier standaloneHistoryId() {
}
static FrontendClientMetadataBuilder of(final String shardName, final FrontendClientMetadata meta) {
- final Collection<FrontendHistoryMetadata> current = meta.getCurrentHistories();
- final RangeSet<UnsignedLong> purged = meta.getPurgedHistories();
-
// Completely empty histories imply disabled state, as otherwise we'd have a record of the single history --
// either purged or active
- return current.isEmpty() && purged.isEmpty() ? new Disabled(shardName, meta.getIdentifier())
- : new Enabled(shardName, meta);
+ return meta.getCurrentHistories().isEmpty() && meta.getPurgedHistories().isEmpty()
+ ? new Disabled(shardName, meta.getIdentifier()) : new Enabled(shardName, meta);
}
@Override
return shardName;
}
+ abstract FrontendClientMetadata build();
+
abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
abstract void onTransactionPurged(TransactionIdentifier txId);
+ abstract void onTransactionsSkipped(LocalHistoryIdentifier historyId, ImmutableUnsignedLongSet txIds);
+
/**
* Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
*