import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
private final ActorContext actorContext;
private final List<Future<ActorSelection>> cohortFutures;
private volatile List<ActorSelection> cohorts;
private final String transactionId;
private final ActorContext actorContext;
private final List<Future<ActorSelection>> cohortFutures;
private volatile List<ActorSelection> cohorts;
private final String transactionId;
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<Future<ActorSelection>> cohortFutures, String transactionId) {
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
List<Future<ActorSelection>> cohortFutures, String transactionId) {
final Object message = new CanCommitTransaction(transactionId).toSerializable();
final Iterator<ActorSelection> iterator = cohorts.iterator();
final Object message = new CanCommitTransaction(transactionId).toSerializable();
final Iterator<ActorSelection> iterator = cohorts.iterator();
LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
}
returnFuture.setException(failure);
LOG.debug("Tx {}: a canCommit cohort Future failed: {}", transactionId, failure);
}
returnFuture.setException(failure);
boolean result = true;
if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
CanCommitTransactionReply reply =
boolean result = true;
if (response.getClass().equals(CanCommitTransactionReply.SERIALIZABLE_CLASS)) {
CanCommitTransactionReply reply =
public ListenableFuture<Void> preCommit() {
// We don't need to do anything here - preCommit is done atomically with the commit phase
// by the shard.
public ListenableFuture<Void> preCommit() {
// We don't need to do anything here - preCommit is done atomically with the commit phase
// by the shard.
- OperationCallback operationCallback = (cohortFutures.size() == 0) ? NO_OP_CALLBACK :
- new TransactionRateLimitingCallback(actorContext);
+ OperationCallback operationCallback = commitOperationCallback != null ? commitOperationCallback :
+ OperationCallback.NO_OP_CALLBACK;
return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
return voidOperation("commit", new CommitTransaction(transactionId).toSerializable(),
CommitTransactionReply.SERIALIZABLE_CLASS, true, operationCallback);
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
final Class<?> expectedResponseClass, final boolean propagateException) {
private ListenableFuture<Void> voidOperation(final String operationName, final Object message,
final Class<?> expectedResponseClass, final boolean propagateException) {
- return voidOperation(operationName, message, expectedResponseClass, propagateException, NO_OP_CALLBACK);
+ return voidOperation(operationName, message, expectedResponseClass, propagateException,
+ OperationCallback.NO_OP_CALLBACK);
List<Future<ActorSelection>> getCohortFutures() {
return Collections.unmodifiableList(cohortFutures);
}
List<Future<ActorSelection>> getCohortFutures() {
return Collections.unmodifiableList(cohortFutures);
}