import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Range;
-import com.google.common.collect.RangeSet;
-import com.google.common.collect.TreeRangeSet;
-import com.google.common.primitives.UnsignedLong;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
// RangeSet performs automatic merging, hence we keep minimal state tracking information
- private final RangeSet<UnsignedLong> purgedHistories;
+ private final UnsignedLongRangeSet purgedHistories;
// Used for all standalone transactions
private final AbstractFrontendHistory standaloneHistory;
private final ClientIdentifier clientId;
private final String persistenceId;
+ private long lastConnectTicks;
+ private long lastSeenTicks;
private long expectedTxSequence;
private Long lastSeenHistory = null;
// - per-RequestException throw counters
LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
- this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId,
- clientId, tree), new HashMap<>());
+ this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(),
+ StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
}
LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
- final RangeSet<UnsignedLong> purgedHistories, final AbstractFrontendHistory standaloneHistory,
+ final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
this.persistenceId = Preconditions.checkNotNull(persistenceId);
this.clientId = Preconditions.checkNotNull(clientId);
this.purgedHistories = Preconditions.checkNotNull(purgedHistories);
this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory);
this.localHistories = Preconditions.checkNotNull(localHistories);
+ this.lastSeenTicks = tree.readTime();
}
@Override
// We have not found the history. Before we create it we need to check history ID sequencing so that we do not
// end up resurrecting a purged history.
- if (purgedHistories.contains(UnsignedLong.fromLongBits(historyId.getHistoryId()))) {
+ if (purgedHistories.contains(historyId.getHistoryId())) {
LOG.debug("{}: rejecting purged request {}", persistenceId, request);
- throw new DeadHistoryException(purgedHistories);
+ throw new DeadHistoryException(purgedHistories.toImmutable());
}
// Update last history we have seen
}
LOG.debug("{}: purging history {}", persistenceId, id);
- final UnsignedLong ul = UnsignedLong.fromLongBits(id.getHistoryId());
- purgedHistories.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
+ purgedHistories.add(id.getHistoryId());
existing.purge(request.getSequence(), envelope, now);
return null;
}
if (lhId.getHistoryId() != 0) {
history = localHistories.get(lhId);
if (history == null) {
- if (purgedHistories.contains(UnsignedLong.fromLongBits(lhId.getHistoryId()))) {
+ if (purgedHistories.contains(lhId.getHistoryId())) {
LOG.warn("{}: rejecting request {} to purged history", persistenceId, request);
- throw new DeadHistoryException(purgedHistories);
+ throw new DeadHistoryException(purgedHistories.toImmutable());
}
LOG.warn("{}: rejecting unknown history request {}", persistenceId, request);
void reconnect() {
expectedTxSequence = 0;
+ lastConnectTicks = tree.readTime();
}
void retire() {
- // FIXME: flush all state
+ // Hunt down any transactions associated with this frontend
+ final Iterator<SimpleShardDataTreeCohort> it = tree.cohortIterator();
+ while (it.hasNext()) {
+ final SimpleShardDataTreeCohort cohort = it.next();
+ if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
+ if (cohort.getState() != State.COMMIT_PENDING) {
+ LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
+ it.remove();
+ } else {
+ LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
+ cohort.getIdentifier());
+ }
+ }
+ }
+
+ // Clear out all transaction chains
+ localHistories.values().forEach(AbstractFrontendHistory::retire);
+ localHistories.clear();
+ standaloneHistory.retire();
+ }
+
+ long getLastConnectTicks() {
+ return lastConnectTicks;
+ }
+
+ long getLastSeenTicks() {
+ return lastSeenTicks;
+ }
+
+ void touch() {
+ this.lastSeenTicks = tree.readTime();
}
@Override
public String toString() {
- return MoreObjects.toStringHelper(LeaderFrontendState.class).add("clientId", clientId)
- .add("purgedHistories", purgedHistories).toString();
+ return MoreObjects.toStringHelper(LeaderFrontendState.class)
+ .add("clientId", clientId)
+ .add("nanosAgo", tree.readTime() - lastSeenTicks)
+ .add("purgedHistories", purgedHistories)
+ .toString();
}
}