import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
* @param shard the transaction's shard actor
*/
void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
- final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionId(),
- message.getModification());
+ final TransactionIdentifier txId = message.getTransactionId();
+ final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification());
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
- log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionId());
+ log.debug("{}: Applying local modifications for Tx {}", name, txId);
if (message.isDoCommitOnReady()) {
cohortEntry.setReplySender(sender);
handleCanCommit(cohortEntry);
}
- private void doCommit(final CohortEntry cohortEntry) {
+ void doCommit(final CohortEntry cohortEntry) {
log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId());
// We perform the preCommit phase here atomically with the commit phase. This is an
});
}
- private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
+ void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
cohortEntry.commit(new FutureCallback<UnsignedLong>() {
log.debug("{}: Aborting transaction {}", name, transactionID);
final ActorRef self = shard.getSelf();
- try {
- cohortEntry.abort();
+ cohortEntry.abort(new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ if (sender != null) {
+ sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+ }
+ }
- shard.getShardMBean().incrementAbortTransactionsCount();
+ @Override
+ public void onFailure(final Throwable failure) {
+ log.error("{}: An exception happened during abort", name, failure);
- if (sender != null) {
- sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+ if (sender != null) {
+ sender.tell(new Failure(failure), self);
+ }
}
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.error("{}: An exception happened during abort", name, e);
+ });
- if (sender != null) {
- sender.tell(new Failure(e), self);
- }
- }
+ shard.getShardMBean().incrementAbortTransactionsCount();
}
void checkForExpiredTransactions(final long timeout, final Shard shard) {