X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FLocalReadWriteProxyTransaction.java;h=a41bef9c9e992518c672c418da82769ea6cfdfc9;hp=db24e3c73c867020ab46d210ce52d1b633cf2715;hb=b8e58ad33db2a15162d548ba13f241409ffaadd9;hpb=d6ed0a044d591d65847714451d97d80345154089 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java index db24e3c73c..a41bef9c9e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java @@ -10,9 +10,9 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import java.util.function.Consumer; +import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; @@ -57,9 +57,27 @@ import org.slf4j.LoggerFactory; final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(LocalReadWriteProxyTransaction.class); - private CursorAwareDataTreeModification modification; + /** + * This field needs to be accessed via {@link #getModification()}, which performs state checking to ensure + * the modification can actually be accessed. + */ + private final CursorAwareDataTreeModification modification; + + private Supplier extends RuntimeException> closedException; + private CursorAwareDataTreeModification sealedModification; + /** + * Recorded failure from previous operations. Normally we would want to propagate the error directly to the + * offending call site, but that exposes inconsistency in behavior during initial connection, when we go through + * {@link RemoteProxyTransaction}, which detects this sort of issues at canCommit/directCommit time on the backend. + * + *
+ * We therefore do not report incurred exceptions directly, but report them once the user attempts to commit
+ * this transaction.
+ */
+ private Exception recordedFailure;
+
LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
final DataTreeSnapshot snapshot) {
super(parent, identifier);
@@ -73,22 +91,61 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
@Override
CursorAwareDataTreeSnapshot readOnlyView() {
- return modification;
+ return getModification();
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
void doDelete(final YangInstanceIdentifier path) {
- modification.delete(path);
+ final CursorAwareDataTreeModification mod = getModification();
+ if (recordedFailure != null) {
+ LOG.debug("Transaction {} recorded failure, ignoring delete of {}", getIdentifier(), path);
+ return;
+ }
+
+ try {
+ mod.delete(path);
+ } catch (Exception e) {
+ LOG.debug("Transaction {} delete on {} incurred failure, delaying it until commit", getIdentifier(), path,
+ e);
+ recordedFailure = e;
+ }
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
void doMerge(final YangInstanceIdentifier path, final NormalizedNode, ?> data) {
- modification.merge(path, data);
+ final CursorAwareDataTreeModification mod = getModification();
+ if (recordedFailure != null) {
+ LOG.debug("Transaction {} recorded failure, ignoring merge to {}", getIdentifier(), path);
+ return;
+ }
+
+ try {
+ mod.merge(path, data);
+ } catch (Exception e) {
+ LOG.debug("Transaction {} merge to {} incurred failure, delaying it until commit", getIdentifier(), path,
+ e);
+ recordedFailure = e;
+ }
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
void doWrite(final YangInstanceIdentifier path, final NormalizedNode, ?> data) {
- modification.write(path, data);
+ final CursorAwareDataTreeModification mod = getModification();
+ if (recordedFailure != null) {
+ LOG.debug("Transaction {} recorded failure, ignoring write to {}", getIdentifier(), path);
+ return;
+ }
+
+ try {
+ mod.write(path, data);
+ } catch (Exception e) {
+ LOG.debug("Transaction {} write to {} incurred failure, delaying it until commit", getIdentifier(), path,
+ e);
+ recordedFailure = e;
+ }
}
private RuntimeException abortedException() {
@@ -101,16 +158,19 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
@Override
CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
+ final CursorAwareDataTreeModification mod = getModification();
final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(getIdentifier(), nextSequence(),
- localActor(), modification, coordinated);
- modification = new FailedDataTreeModification(this::submittedException);
+ localActor(), mod, recordedFailure, coordinated);
+ closedException = this::submittedException;
return ret;
}
@Override
void doSeal() {
- modification.ready();
- sealedModification = modification;
+ Preconditions.checkState(sealedModification == null, "Transaction %s is already sealed", getIdentifier());
+ final CursorAwareDataTreeModification mod = getModification();
+ mod.ready();
+ sealedModification = mod;
}
@Override
@@ -141,13 +201,13 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
@Override
void applyModifyTransactionRequest(final ModifyTransactionRequest request,
final @Nullable Consumer