txCommitTimeoutCheckSchedule.cancel();
}
+ commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
+
shardMBean.unregisterMBean();
}
setPeerAddress(resolved.getPeerId().toString(),
resolved.getPeerAddress());
} else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
- handleTransactionCommitTimeoutCheck();
+ commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
} else if(message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
} else if(message instanceof RegisterRoleChangeListener){
updateConfigParams(datastoreContext.getShardRaftConfig());
}
- private void handleTransactionCommitTimeoutCheck() {
- CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
- if(cohortEntry != null) {
- if(cohortEntry.isExpired(transactionCommitTimeout)) {
- LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
- persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
-
- doAbortTransaction(cohortEntry.getTransactionID(), null);
- }
- }
-
- commitCoordinator.cleanupExpiredCohortEntries();
- }
-
private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
}
}
store.closeAllTransactionChains();
+
+ commitCoordinator.abortPendingTransactions(
+ "The transacton was aborted due to inflight leadership change.", this);
}
if(hasLeader && !isIsolatedLeader()) {