package org.opendaylight.controller.cluster.datastore;
import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
-final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+ static final class DeadOnArrival extends SimpleShardDataTreeCohort {
+ private final Exception failure;
+
+ DeadOnArrival(final ShardDataTree dataTree, final DataTreeModification transaction,
+ final TransactionIdentifier transactionId, final Exception failure) {
+ super(dataTree, transaction, transactionId, null);
+ this.failure = Preconditions.checkNotNull(failure);
+ }
+
+ @Override
+ void throwCanCommitFailure() throws Exception {
+ throw failure;
+ }
+ }
+
+ static final class Normal extends SimpleShardDataTreeCohort {
+ Normal(final ShardDataTree dataTree, final DataTreeModification transaction,
+ final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
+ super(dataTree, transaction, transactionId, Preconditions.checkNotNull(userCohorts));
+ }
+
+ @Override
+ void throwCanCommitFailure() {
+ // No-op
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
private final DataTreeModification transaction;
this.dataTree = Preconditions.checkNotNull(dataTree);
this.transaction = Preconditions.checkNotNull(transaction);
this.transactionId = Preconditions.checkNotNull(transactionId);
- this.userCohorts = Preconditions.checkNotNull(userCohorts);
+ this.userCohorts = userCohorts;
}
@Override
candidate = null;
state = State.ABORTED;
- final Optional<Future<Iterable<Object>>> maybeAborts = userCohorts.abort();
+ final Optional<List<Future<Object>>> maybeAborts = userCohorts.abort();
if (!maybeAborts.isPresent()) {
abortCallback.onSuccess(null);
return;
}
- final Future<Iterable<Object>> aborts = maybeAborts.get();
+ final Future<Iterable<Object>> aborts = Futures.sequence(maybeAborts.get(), ExecutionContexts.global());
if (aborts.isCompleted()) {
abortCallback.onSuccess(null);
return;
return ret;
}
- void setNewCandidate(DataTreeCandidateTip dataTreeCandidate) {
+ void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) {
checkState(State.PRE_COMMIT_COMPLETE);
this.candidate = Verify.verifyNotNull(dataTreeCandidate);
}
this.nextFailure = Preconditions.checkNotNull(cause);
}
+ /**
+ * If there is an initial failure, throw it so the caller can process it.
+ *
+ * @throws Exception reported failure.
+ */
+ abstract void throwCanCommitFailure() throws Exception;
+
@Override
public boolean isFailed() {
return state == State.FAILED || nextFailure != null;