void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender,
final Shard shard) {
log.debug("{}: Readying transaction {}, client version {}", name,
- ready.getTransactionID(), ready.getTxnClientVersion());
+ ready.getTransactionId(), ready.getTxnClientVersion());
final ShardDataTreeCohort cohort = ready.getTransaction().ready();
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
- cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
+ cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
if (ready.isDoImmediateCommit()) {
cohortEntry.setDoImmediateCommit(true);
* @param sender the sender of the message
*/
void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
- CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
+ CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId());
if (cohortEntry == null) {
- cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionID()),
+ cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()),
batched.getVersion());
- cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
+ cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
}
if (log.isDebugEnabled()) {
log.debug("{}: Applying {} batched modifications for Tx {}", name,
- batched.getModifications().size(), batched.getTransactionID());
+ batched.getModifications().size(), batched.getTransactionId());
}
cohortEntry.applyModifications(batched.getModifications());
if (batched.isReady()) {
if (cohortEntry.getLastBatchedModificationsException() != null) {
- cohortCache.remove(cohortEntry.getTransactionID());
+ cohortCache.remove(cohortEntry.getTransactionId());
throw cohortEntry.getLastBatchedModificationsException();
}
if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
- cohortCache.remove(cohortEntry.getTransactionID());
+ cohortCache.remove(cohortEntry.getTransactionId());
throw new IllegalStateException(String.format(
"The total number of batched messages received %d does not match the number sent %d",
cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
if (log.isDebugEnabled()) {
log.debug("{}: Readying Tx {}, client version {}", name,
- batched.getTransactionID(), batched.getVersion());
+ batched.getTransactionId(), batched.getVersion());
}
cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
* @param shard the transaction's shard actor
*/
void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
- final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionID(),
- message.getModification());
+ final TransactionIdentifier txId = message.getTransactionId();
+ final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification());
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
- cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
+ cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
- log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
+ log.debug("{}: Applying local modifications for Tx {}", name, txId);
if (message.isDoCommitOnReady()) {
cohortEntry.setReplySender(sender);
Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
final int maxModificationsPerBatch) {
- CohortEntry cohortEntry = cohortCache.remove(from.getTransactionID());
+ CohortEntry cohortEntry = cohortCache.remove(from.getTransactionId());
if (cohortEntry == null || cohortEntry.getTransaction() == null) {
return Collections.singletonList(from);
}
protected BatchedModifications getModifications() {
if (newModifications.isEmpty()
|| newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
- newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
+ newModifications.add(new BatchedModifications(from.getTransactionId(), from.getVersion()));
}
return newModifications.getLast();
cohortEntry.canCommit(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionID());
+ log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionId());
if (cohortEntry.isDoImmediateCommit()) {
doCommit(cohortEntry);
@Override
public void onFailure(final Throwable failure) {
log.debug("{}: An exception occurred during canCommit for {}: {}", name,
- cohortEntry.getTransactionID(), failure);
+ cohortEntry.getTransactionId(), failure);
- cohortCache.remove(cohortEntry.getTransactionID());
+ cohortCache.remove(cohortEntry.getTransactionId());
cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
}
});
handleCanCommit(cohortEntry);
}
- private void doCommit(final CohortEntry cohortEntry) {
- log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
+ void doCommit(final CohortEntry cohortEntry) {
+ log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId());
// We perform the preCommit phase here atomically with the commit phase. This is an
// optimization to eliminate the overhead of an extra preCommit message. We lose front-end
@Override
public void onFailure(final Throwable failure) {
log.error("{} An exception occurred while preCommitting transaction {}", name,
- cohortEntry.getTransactionID(), failure);
+ cohortEntry.getTransactionId(), failure);
- cohortCache.remove(cohortEntry.getTransactionID());
+ cohortCache.remove(cohortEntry.getTransactionId());
cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
}
});
}
- private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
- log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
+ void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
+ log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
cohortEntry.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
- final TransactionIdentifier txId = cohortEntry.getTransactionID();
+ final TransactionIdentifier txId = cohortEntry.getTransactionId();
log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
sender);
- cohortCache.remove(cohortEntry.getTransactionID());
+ cohortCache.remove(cohortEntry.getTransactionId());
sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
cohortEntry.getShard().self());
}
@Override
public void onFailure(final Throwable failure) {
log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
- cohortEntry.getTransactionID(), failure);
+ cohortEntry.getTransactionId(), failure);
- cohortCache.remove(cohortEntry.getTransactionID());
+ cohortCache.remove(cohortEntry.getTransactionId());
sender.tell(new Failure(failure), cohortEntry.getShard().self());
}
});
log.debug("{}: Aborting transaction {}", name, transactionID);
final ActorRef self = shard.getSelf();
- try {
- cohortEntry.abort();
+ cohortEntry.abort(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ if (sender != null) {
+ sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+ }
+ }
- shard.getShardMBean().incrementAbortTransactionsCount();
+ @Override
+ public void onFailure(final Throwable failure) {
+ log.error("{}: An exception happened during abort", name, failure);
- if (sender != null) {
- sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+ if (sender != null) {
+ sender.tell(new Failure(failure), self);
+ }
}
- } catch (Exception e) {
- log.error("{}: An exception happened during abort", name, e);
+ });
- if (sender != null) {
- sender.tell(new Failure(e), self);
- }
- }
+ shard.getShardMBean().incrementAbortTransactionsCount();
}
void checkForExpiredTransactions(final long timeout, final Shard shard) {
}
// Allocate a new message
- final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionID(),
+ final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionId(),
cohortEntry.getClientVersion());
newMessages.add(ret);
return ret;
switch (cohort.getState()) {
case CAN_COMMIT_COMPLETE:
case CAN_COMMIT_PENDING:
- messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+ messages.add(new CanCommitTransaction(cohortEntry.getTransactionId(),
cohortEntry.getClientVersion()));
break;
case PRE_COMMIT_COMPLETE:
case PRE_COMMIT_PENDING:
- messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+ messages.add(new CommitTransaction(cohortEntry.getTransactionId(),
cohortEntry.getClientVersion()));
break;
default: