import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.Optional;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+ "implements writeReplace to delegate serialization to a Proxy class and thus instances of this class "
+ "aren't serialized. FindBugs does not recognize this.")
private final DataTreeModification mod;
+ private final Exception delayedFailure;
private final boolean coordinated;
public CommitLocalTransactionRequest(@Nonnull final TransactionIdentifier identifier, final long sequence,
- @Nonnull final ActorRef replyTo, @Nonnull final DataTreeModification mod, final boolean coordinated) {
+ @Nonnull final ActorRef replyTo, @Nonnull final DataTreeModification mod,
+ @Nullable final Exception delayedFailure, final boolean coordinated) {
super(identifier, sequence, replyTo);
this.mod = Preconditions.checkNotNull(mod);
+ this.delayedFailure = delayedFailure;
this.coordinated = coordinated;
}
+ /**
+ * Return the delayed error detected on the frontend. If this error is present, it will be reported as the result
+ * of the first step of the commit process.
+ *
+ * @return Delayed failure, if present.
+ */
+ public Optional<Exception> getDelayedFailure() {
+ return Optional.ofNullable(delayedFailure);
+ }
+
public DataTreeModification getModification() {
return mod;
}
@Override
protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
- return super.addToStringAttributes(toStringHelper).add("coordinated", coordinated);
+ return super.addToStringAttributes(toStringHelper).add("coordinated", coordinated)
+ .add("delayedError", delayedFailure);
}
}
private static final boolean COORDINATED = true;
private static final CommitLocalTransactionRequest OBJECT = new CommitLocalTransactionRequest(
- TRANSACTION, 0, ACTOR_REF, MODIFICATION, COORDINATED);
+ TRANSACTION, 0, ACTOR_REF, MODIFICATION, null, COORDINATED);
@Override
protected CommitLocalTransactionRequest object() {
return;
}
- mod.delete(path);
+ try {
+ mod.delete(path);
+ } catch (Exception e) {
+ LOG.debug("Transaction {} delete on {} incurred failure, delaying it until commit", getIdentifier(), path,
+ e);
+ recordedFailure = e;
+ }
}
@Override
return;
}
- mod.merge(path, data);
+ try {
+ mod.merge(path, data);
+ } catch (Exception e) {
+ LOG.debug("Transaction {} merge to {} incurred failure, delaying it until commit", getIdentifier(), path,
+ e);
+ recordedFailure = e;
+ }
}
@Override
return;
}
- mod.write(path, data);
+ try {
+ mod.write(path, data);
+ } catch (Exception e) {
+ LOG.debug("Transaction {} write to {} incurred failure, delaying it until commit", getIdentifier(), path,
+ e);
+ recordedFailure = e;
+ }
}
private RuntimeException abortedException() {
CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
final CursorAwareDataTreeModification mod = getModification();
final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(getIdentifier(), nextSequence(),
- localActor(), mod, coordinated);
+ localActor(), mod, recordedFailure, coordinated);
closedException = this::submittedException;
return ret;
}
final @Nullable Consumer<Response<?, ?>> callback) {
for (final TransactionModification mod : request.getModifications()) {
if (mod instanceof TransactionWrite) {
- getModification().write(mod.getPath(), ((TransactionWrite)mod).getData());
+ write(mod.getPath(), ((TransactionWrite)mod).getData());
} else if (mod instanceof TransactionMerge) {
- getModification().merge(mod.getPath(), ((TransactionMerge)mod).getData());
+ merge(mod.getPath(), ((TransactionMerge)mod).getData());
} else if (mod instanceof TransactionDelete) {
- getModification().delete(mod.getPath());
+ delete(mod.getPath());
} else {
throw new IllegalArgumentException("Unsupported modification " + mod);
}
abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod)
throws RequestException;
+ abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier id, DataTreeModification mod,
+ Exception failure);
+
abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
@Override
public State getState() {
return delegate.getState();
}
-
}
private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
@Override
public void onSuccess(final UnsignedLong result) {
successfulCommit(envelope, startTime);
private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
final RequestEnvelope envelope, final long now) throws RequestException {
- if (sealedModification.equals(request.getModification())) {
+ if (!sealedModification.equals(request.getModification())) {
+ LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
+ throw new UnsupportedRequestException(request);
+ }
+
+ final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
+ if (optFailure.isPresent()) {
+ readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get());
+ } else {
readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+ }
- if (request.isCoordinated()) {
- coordinatedCommit(envelope, now);
- } else {
- directCommit(envelope, now);
- }
+ if (request.isCoordinated()) {
+ coordinatedCommit(envelope, now);
} else {
- throw new UnsupportedRequestException(request);
+ directCommit(envelope, now);
}
}
return FrontendReadWriteTransaction.createReady(this, id, mod);
}
+ @Override
+ ShardDataTreeCohort createFailedCohort(final TransactionIdentifier id, final DataTreeModification mod,
+ final Exception failure) {
+ return chain.createFailedCohort(id, mod, failure);
+ }
+
@Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
return chain.createReadyCohort(id, mod);
LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
Exception cause;
try {
+ cohort.throwCanCommitFailure();
+
tip.validate(modification);
LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
cohort.successfulCanCommit();
cohortRegistry.process(sender, message);
}
+
+ @Override
+ ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final Exception failure) {
+ SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.DeadOnArrival(this, mod, txId, failure);
+ pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
+ return cohort;
+ }
+
@Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId,
- final DataTreeModification modification) {
- SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId,
+ final DataTreeModification mod) {
+ SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort.Normal(this, mod, txId,
cohortRegistry.createCohort(schemaContext, txId, COMMIT_STEP_TIMEOUT));
pendingTransactions.add(new CommitEntry(cohort, ticker().read()));
return cohort;
}
@Override
- ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification modification) {
- return dataTree.createReadyCohort(txId, modification);
+ ShardDataTreeCohort createFailedCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+ final Exception failure) {
+ return dataTree.createFailedCohort(txId, mod, failure);
+ }
+
+ @Override
+ ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+ return dataTree.createReadyCohort(txId, mod);
}
}
abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
- abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
+ abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier txId, DataTreeModification mod);
+
+ abstract ShardDataTreeCohort createFailedCohort(TransactionIdentifier txId, DataTreeModification mod,
+ Exception failure);
}
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
-final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+abstract class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+ static final class DeadOnArrival extends SimpleShardDataTreeCohort {
+ private final Exception failure;
+
+ DeadOnArrival(final ShardDataTree dataTree, final DataTreeModification transaction,
+ final TransactionIdentifier transactionId, final Exception failure) {
+ super(dataTree, transaction, transactionId, null);
+ this.failure = Preconditions.checkNotNull(failure);
+ }
+
+ @Override
+ void throwCanCommitFailure() throws Exception {
+ throw failure;
+ }
+ }
+
+ static final class Normal extends SimpleShardDataTreeCohort {
+ Normal(final ShardDataTree dataTree, final DataTreeModification transaction,
+ final TransactionIdentifier transactionId, final CompositeDataTreeCohort userCohorts) {
+ super(dataTree, transaction, transactionId, Preconditions.checkNotNull(userCohorts));
+ }
+
+ @Override
+ void throwCanCommitFailure() {
+ // No-op
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
private final DataTreeModification transaction;
this.dataTree = Preconditions.checkNotNull(dataTree);
this.transaction = Preconditions.checkNotNull(transaction);
this.transactionId = Preconditions.checkNotNull(transactionId);
- this.userCohorts = Preconditions.checkNotNull(userCohorts);
+ this.userCohorts = userCohorts;
}
@Override
return ret;
}
- void setNewCandidate(DataTreeCandidateTip dataTreeCandidate) {
+ void setNewCandidate(final DataTreeCandidateTip dataTreeCandidate) {
checkState(State.PRE_COMMIT_COMPLETE);
this.candidate = Verify.verifyNotNull(dataTreeCandidate);
}
this.nextFailure = Preconditions.checkNotNull(cause);
}
+ /**
+ * If there is an initial failure, throw it so the caller can process it.
+ *
+ * @throws Exception reported failure.
+ */
+ abstract void throwCanCommitFailure() throws Exception;
+
@Override
public boolean isFailed() {
return state == State.FAILED || nextFailure != null;
return FrontendReadWriteTransaction.createReady(this, id, mod);
}
+ @Override
+ ShardDataTreeCohort createFailedCohort(final TransactionIdentifier id, final DataTreeModification mod,
+ final Exception failure) {
+ return tree.createFailedCohort(id, mod, failure);
+ }
+
@Override
ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) {
return tree.createReadyCohort(id, mod);
final TestProbe probe = createProbe();
final CursorAwareDataTreeModification modification = mock(CursorAwareDataTreeModification.class);
final CommitLocalTransactionRequest request =
- new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), modification, true);
+ new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), modification, null, true);
doAnswer(this::applyToCursorAnswer).when(modification).applyToCursor(any());
final ModifyTransactionRequest modifyRequest = testForwardToRemote(request, ModifyTransactionRequest.class);
verify(modification).applyToCursor(any());
final TestProbe probe = createProbe();
final DataTreeModification mod = mock(DataTreeModification.class);
final TransactionRequest<?> request =
- new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), mod, false);
+ new CommitLocalTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), mod, null, false);
testForwardToLocal(request, CommitLocalTransactionRequest.class);
}
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
-import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@Test
public void testChainedTransactionFailureWithSingleShard() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
-
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
try (final AbstractDataStore dataStore = setupAbstractDataStore(
@Test
public void testChainedTransactionFailureWithMultipleShards() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
-
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
{
try (final AbstractDataStore dataStore = setupAbstractDataStore(
doNothing().when(mockUserCohorts).commit();
doReturn(Optional.empty()).when(mockUserCohorts).abort();
- cohort = new SimpleShardDataTreeCohort(mockShardDataTree, mockModification, nextTransactionId(),
+ cohort = new SimpleShardDataTreeCohort.Normal(mockShardDataTree, mockModification, nextTransactionId(),
mockUserCohorts);
}