}
commitCoordinator = new ShardCommitCoordinator(store,
- TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+ datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
setTransactionCommitTimeout();
private void setTransactionCommitTimeout() {
transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
- datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
}
public static Props props(final ShardIdentifier name,
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {
- long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
- if(elapsed > transactionCommitTimeout) {
+ if(cohortEntry.isExpired(transactionCommitTimeout)) {
LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
doAbortTransaction(cohortEntry.getTransactionID(), null);
}
}
+
+ commitCoordinator.cleanupExpiredCohortEntries();
}
private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
// or if cohortEntry has no modifications
// we can apply modification to the state immediately
if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
- applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
+ applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
} else {
- Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+ Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
DataTreeCandidatePayload.create(candidate));
}
}