*/
void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId());
- if (cohortEntry == null) {
+ if (cohortEntry == null || cohortEntry.isSealed()) {
cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()),
batched.getVersion());
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
@Override
public void onFailure(final Throwable failure) {
- log.debug("{}: An exception occurred during canCommit for {}: {}", name,
- cohortEntry.getTransactionId(), failure);
+ log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(),
+ failure);
cohortCache.remove(cohortEntry.getTransactionId());
cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
// between canCommit and ready and the entry was expired from the cache or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
- log.error(ex.getMessage());
+ log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex);
sender.tell(new Failure(ex), shard.self());
return;
}
final TransactionIdentifier txId = cohortEntry.getTransactionId();
log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
sender);
- cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
cohortCache.remove(cohortEntry.getTransactionId());
sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
public void onFailure(final Throwable failure) {
final TransactionIdentifier txId = cohortEntry.getTransactionId();
log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
- cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
cohortCache.remove(cohortEntry.getTransactionId());
sender.tell(new Failure(failure), cohortEntry.getShard().self());
// or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
- log.error(ex.getMessage());
+ log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex);
sender.tell(new Failure(ex), shard.self());
return;
}
cohortEntry.abort(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
-
if (sender != null) {
sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
}
@Override
public void onFailure(final Throwable failure) {
log.error("{}: An exception happened during abort", name, failure);
- shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
if (sender != null) {
sender.tell(new Failure(failure), self);