private final List<Future<ActorSelection>> cohortFutures;
private volatile List<ActorSelection> cohorts;
private final String transactionId;
+ private volatile OperationCallback commitOperationCallback;
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<Future<ActorSelection>> cohortFutures, String transactionId) {
}
return null;
}
- }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
+ }, TransactionReadyReplyMapper.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
}
@Override
return;
}
- final Object message = new CanCommitTransaction(transactionId).toSerializable();
+ commitOperationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
+ new TransactionRateLimitingCallback(actorContext);
+
+ commitOperationCallback.run();
+
+ final Object message = new CanCommitTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable();
final Iterator<ActorSelection> iterator = cohorts.iterator();
LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
}
returnFuture.setException(failure);
+ commitOperationCallback.failure();
return;
}
+ // Only the first call to pause takes effect - subsequent calls before resume are no-ops. So
+ // this means we'll only time the first transaction canCommit which should be fine.
+ commitOperationCallback.pause();
+
boolean result = true;
- if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
+ if (CanCommitTransactionReply.isSerializedType(response)) {
CanCommitTransactionReply reply =
CanCommitTransactionReply.fromSerializable(response);
if (!reply.getCanCommit()) {
// exception then that exception will supersede and suppress the original exception. But
// it's the original exception that is the root cause and of more interest to the client.
- return voidOperation("abort", new AbortTransaction(transactionId).toSerializable(),
- AbortTransactionReply.SERIALIZABLE_CLASS, false);
+ return voidOperation("abort", new AbortTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable(),
+ AbortTransactionReply.class, false);
}
@Override
public ListenableFuture<Void> commit() {
- OperationCallback operationCallback = cohortFutures.isEmpty() ? OperationCallback.NO_OP_CALLBACK :
- new TransactionRateLimitingCallback(actorContext);
+ OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
+ OperationCallback.NO_OP_CALLBACK;
- return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
- CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
+ return voidOperation("commit", new CommitTransaction(transactionId, DataStoreVersions.CURRENT_VERSION).toSerializable(),
+ CommitTransactionReply.class, true, operationCallback);
}
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
LOG.debug("Tx {} finish {}", transactionId, operationName);
}
- callback.run();
+ callback.resume();
Future<Iterable<Object>> combinedFuture = invokeCohorts(message);