import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
-import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
// Histories which have not been purged
- private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories = new HashMap<>();
+ private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
// RangeSet performs automatic merging, hence we keep minimal state tracking information
- private final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
+ private final RangeSet<UnsignedLong> purgedHistories;
// Used for all standalone transactions
private final AbstractFrontendHistory standaloneHistory;
private long expectedTxSequence;
private Long lastSeenHistory = null;
-
// TODO: explicit failover notification
// Record the ActorRef for the originating actor and when we switch to being a leader send a notification
// to the frontend client -- that way it can immediately start sending requests
// - per-RequestException throw counters
LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+ this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId,
+ clientId, tree), new HashMap<>());
+ }
+
+ LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
+ final RangeSet<UnsignedLong> purgedHistories, final AbstractFrontendHistory standaloneHistory,
+ final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
this.persistenceId = Preconditions.checkNotNull(persistenceId);
this.clientId = Preconditions.checkNotNull(clientId);
this.tree = Preconditions.checkNotNull(tree);
- standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree);
+ this.purgedHistories = Preconditions.checkNotNull(purgedHistories);
+ this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory);
+ this.localHistories = Preconditions.checkNotNull(localHistories);
}
@Override
return clientId;
}
- private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException {
+ private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
if (expectedTxSequence != envelope.getTxSequence()) {
- throw new OutOfOrderRequestException(expectedTxSequence);
+ throw new OutOfSequenceEnvelopeException(expectedTxSequence);
}
}
}
@Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
checkRequestSequence(envelope);
try {
if (request instanceof CreateLocalHistoryRequest) {
- return handleCreateHistory((CreateLocalHistoryRequest) request);
+ return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
} else if (request instanceof DestroyLocalHistoryRequest) {
- return handleDestroyHistory((DestroyLocalHistoryRequest) request);
+ return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
} else if (request instanceof PurgeLocalHistoryRequest) {
- return handlePurgeHistory((PurgeLocalHistoryRequest)request);
+ return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
} else {
+ LOG.warn("{}: rejecting unsupported request {}", persistenceId, request);
throw new UnsupportedRequestException(request);
}
} finally {
}
}
- private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
- final LocalHistoryIdentifier id = request.getTarget();
- final AbstractFrontendHistory existing = localHistories.get(id);
+ private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
+ final LocalHistoryIdentifier historyId = request.getTarget();
+ final AbstractFrontendHistory existing = localHistories.get(historyId);
if (existing != null) {
// History already exists: report success
- LOG.debug("{}: history {} already exists", persistenceId, id);
- return new LocalHistorySuccess(id, request.getSequence());
+ LOG.debug("{}: history {} already exists", persistenceId, historyId);
+ return new LocalHistorySuccess(historyId, request.getSequence());
}
// We have not found the history. Before we create it we need to check history ID sequencing so that we do not
// end up resurrecting a purged history.
- if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+ if (purgedHistories.contains(UnsignedLong.fromLongBits(historyId.getHistoryId()))) {
LOG.debug("{}: rejecting purged request {}", persistenceId, request);
- throw new DeadHistoryException(lastSeenHistory.longValue());
+ throw new DeadHistoryException(purgedHistories);
}
// Update last history we have seen
- if (lastSeenHistory != null && Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
- lastSeenHistory = id.getHistoryId();
+ if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+ lastSeenHistory = historyId.getHistoryId();
}
- localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id)));
- LOG.debug("{}: created history {}", persistenceId, id);
- return new LocalHistorySuccess(id, request.getSequence());
+ // We have to send the response only after persistence has completed
+ final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
+ LOG.debug("{}: persisted history {}", persistenceId, historyId);
+ envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now);
+ });
+
+ localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
+ LOG.debug("{}: created history {}", persistenceId, historyId);
+ return null;
}
- private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException {
+ private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
+ final RequestEnvelope envelope, final long now)
+ throws RequestException {
final LocalHistoryIdentifier id = request.getTarget();
final LocalFrontendHistory existing = localHistories.get(id);
if (existing == null) {
return new LocalHistorySuccess(id, request.getSequence());
}
- return existing.destroy(request.getSequence());
+ existing.destroy(request.getSequence(), envelope, now);
+ return null;
}
- private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException {
+ private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
+ final RequestEnvelope envelope, final long now) throws RequestException {
final LocalHistoryIdentifier id = request.getTarget();
final LocalFrontendHistory existing = localHistories.remove(id);
- if (existing != null) {
- purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
-
- if (!existing.isDestroyed()) {
- LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
- existing.destroy(request.getSequence());
- }
-
- // FIXME: record a PURGE tombstone in the journal
-
- LOG.debug("{}: purged history {}", persistenceId, id);
- } else {
+ if (existing == null) {
LOG.debug("{}: history {} has already been purged", persistenceId, id);
+ return new LocalHistorySuccess(id, request.getSequence());
}
- return new LocalHistorySuccess(id, request.getSequence());
+ LOG.debug("{}: purging history {}", persistenceId, id);
+ final UnsignedLong ul = UnsignedLong.fromLongBits(id.getHistoryId());
+ purgedHistories.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
+ existing.purge(request.getSequence(), envelope, now);
+ return null;
}
@Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
checkRequestSequence(envelope);
try {
if (lhId.getHistoryId() != 0) {
history = localHistories.get(lhId);
if (history == null) {
- LOG.debug("{}: rejecting unknown history request {}", persistenceId, request);
+ if (purgedHistories.contains(UnsignedLong.fromLongBits(lhId.getHistoryId()))) {
+ LOG.warn("{}: rejecting request {} to purged history", persistenceId, request);
+ throw new DeadHistoryException(purgedHistories);
+ }
+
+ LOG.warn("{}: rejecting unknown history request {}", persistenceId, request);
throw new UnknownHistoryException(lastSeenHistory);
}
} else {
history = standaloneHistory;
}
- return history.handleTransactionRequest(request, envelope);
+ return history.handleTransactionRequest(request, envelope, now);
} finally {
expectNextRequest();
}