Backend state tracking relies on the transaction log to propagate
transaction state from the leader to followers. This includes purging
of transactions, i.e. the information that the frontend will not need
the state (and the final resolution of the transaction).
Tell-based protocol handles this on the frontend, ask-based needs to
do this on the backend (as it has no notion of transaction continuation).
Change-Id: I49e787b38998ef67b4a9ef504a70822263e1a340
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
// Write data to member-3's oper datastore and read/verify via member-2
writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
// Write data to member-3's oper datastore and read/verify via member-2
writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
- // Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
- // 2 ServerConfigurationPayload entries and the transaction payload entry.
+ // Verify all data has been replicated. We expect 4 log entries and thus last applied index of 3 -
+ // 2 ServerConfigurationPayload entries, the transaction payload entry plus a purge payload.
RaftStateVerifier verifier = raftState -> {
RaftStateVerifier verifier = raftState -> {
- assertEquals("Commit index", 2, raftState.getCommitIndex());
- assertEquals("Last applied index", 2, raftState.getLastApplied());
+ assertEquals("Commit index", 3, raftState.getCommitIndex());
+ assertEquals("Last applied index", 3, raftState.getLastApplied());
};
verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
};
verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
final TransactionIdentifier txId = cohortEntry.getTransactionId();
log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
sender);
final TransactionIdentifier txId = cohortEntry.getTransactionId();
log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
sender);
+ cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
cohortCache.remove(cohortEntry.getTransactionId());
sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
cohortCache.remove(cohortEntry.getTransactionId());
sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
@Override
public void onFailure(final Throwable failure) {
@Override
public void onFailure(final Throwable failure) {
- log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
- cohortEntry.getTransactionId(), failure);
+ final TransactionIdentifier txId = cohortEntry.getTransactionId();
+ log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
+ cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
cohortCache.remove(cohortEntry.getTransactionId());
sender.tell(new Failure(failure), cohortEntry.getShard().self());
cohortCache.remove(cohortEntry.getTransactionId());
sender.tell(new Failure(failure), cohortEntry.getShard().self());
cohortEntry.abort(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
cohortEntry.abort(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
+ shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
+
if (sender != null) {
sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
}
if (sender != null) {
sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
}
@Override
public void onFailure(final Throwable failure) {
log.error("{}: An exception happened during abort", name, failure);
@Override
public void onFailure(final Throwable failure) {
log.error("{}: An exception happened during abort", name, failure);
+ shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
if (sender != null) {
sender.tell(new Failure(failure), self);
if (sender != null) {
sender.tell(new Failure(failure), self);
verifyOuterListEntry(shard, 1);
verifyOuterListEntry(shard, 1);
- verifyLastApplied(shard, 2);
+ verifyLastApplied(shard, 5);
// Commit index should advance as we do not have an empty
// modification
// Commit index should advance as we do not have an empty
// modification
- assertEquals(0, shardStats.getCommitIndex());
+ assertEquals(1, shardStats.getCommitIndex());