BUG-8402: Record modification failures 76/57376/1
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 17 May 2017 14:56:57 +0000 (16:56 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 18 May 2017 15:33:57 +0000 (17:33 +0200)
When a modification fails to apply, we must record the resulting
failure, as we have partially applied the state and hence should
never attempt to try to do it again even if the client retransmits
the request.

Furthermore we should stop responding to any subsequent requests
including reads, as our responses are not accurate anyway (and the
requests may have been enqueued before the client saw the failure).

Enqueue the failure and respond to all subsequent requests with it,
forcing the transaction to fail the canCommit() phase.

Change-Id: I1d25f1b3a688e02f8a69f54f22a5d6d2dd43339c
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 63e6ab3f36e57954baf391855541cf3d42d38a0f)

opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java

index 413f4fbe534d99b2ce9735b6030c201d0e23e957..d71142201c2c25807e73fcd1e22128d16f5e7ca3 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.access.commands;
 
 import com.google.common.annotations.Beta;
 package org.opendaylight.controller.cluster.access.commands;
 
 import com.google.common.annotations.Beta;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeDataInput;
@@ -38,6 +39,11 @@ public abstract class TransactionModification {
         return path;
     }
 
         return path;
     }
 
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("path", path).toString();
+    }
+
     abstract byte getType();
 
     void writeTo(final NormalizedNodeDataOutput out) throws IOException {
     abstract byte getType();
 
     void writeTo(final NormalizedNodeDataOutput out) throws IOException {
index e5680c500c669453cefd99e5a5857d8411de2fe4..284acbab0b1c2ddfd27b371c093202a5bcb23fbc 100644 (file)
@@ -51,7 +51,7 @@ final class FrontendReadOnlyTransaction extends FrontendTransaction {
 
     // Sequence has already been checked
     @Override
 
     // Sequence has already been checked
     @Override
-    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+    @Nullable TransactionSuccess<?> doHandleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
             final long now) throws RequestException {
         if (request instanceof ExistsTransactionRequest) {
             return handleExistsTransaction((ExistsTransactionRequest) request);
             final long now) throws RequestException {
         if (request instanceof ExistsTransactionRequest) {
             return handleExistsTransaction((ExistsTransactionRequest) request);
index 9f1df4c8ab31b45fdea9d47bf7c11ddc20c25331..45edfc9333d9530b347e735756e48fc81c678b08 100644 (file)
@@ -84,7 +84,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     // Sequence has already been checked
     @Override
 
     // Sequence has already been checked
     @Override
-    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+    @Nullable TransactionSuccess<?> doHandleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
             final long now) throws RequestException {
         if (request instanceof ModifyTransactionRequest) {
             return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
             final long now) throws RequestException {
         if (request instanceof ModifyTransactionRequest) {
             return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
@@ -297,7 +297,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 } else if (m instanceof TransactionMerge) {
                     modification.merge(m.getPath(), ((TransactionMerge) m).getData());
                 } else {
                 } else if (m instanceof TransactionMerge) {
                     modification.merge(m.getPath(), ((TransactionMerge) m).getData());
                 } else {
-                    LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+                    LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m);
                 }
             }
         }
                 }
             }
         }
@@ -324,7 +324,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 coordinatedCommit(envelope, now);
                 return null;
             default:
                 coordinatedCommit(envelope, now);
                 return null;
             default:
-                LOG.warn("{}: rejecting unsupported protocol {}", history().persistenceId(), maybeProto.get());
+                LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get());
                 throw new UnsupportedRequestException(request);
         }
     }
                 throw new UnsupportedRequestException(request);
         }
     }
@@ -334,7 +334,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         // only once.
         if (readyCohort == null) {
             readyCohort = openTransaction.ready();
         // only once.
         if (readyCohort == null) {
             readyCohort = openTransaction.ready();
-            LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier());
+            LOG.debug("{}: transitioned {} to ready", persistenceId(), openTransaction.getIdentifier());
             openTransaction = null;
         }
     }
             openTransaction = null;
         }
     }
index 8846467b59d2d924f783014215d947551a498f21..c0249fd00089d1bd48abe9793ddb8564fb937a94 100644 (file)
@@ -23,6 +23,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Frontend common transaction state as observed by the shard leader.
 
 /**
  * Frontend common transaction state as observed by the shard leader.
@@ -31,6 +33,8 @@ import org.opendaylight.yangtools.concepts.Identifiable;
  */
 @NotThreadSafe
 abstract class FrontendTransaction implements Identifiable<TransactionIdentifier> {
  */
 @NotThreadSafe
 abstract class FrontendTransaction implements Identifiable<TransactionIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class);
+
     private final AbstractFrontendHistory history;
     private final TransactionIdentifier id;
 
     private final AbstractFrontendHistory history;
     private final TransactionIdentifier id;
 
@@ -44,6 +48,8 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
     private Long lastPurgedSequence;
     private long expectedSequence;
 
     private Long lastPurgedSequence;
     private long expectedSequence;
 
+    private RequestException previousFailure;
+
     FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id) {
         this.history = Preconditions.checkNotNull(history);
         this.id = Preconditions.checkNotNull(id);
     FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id) {
         this.history = Preconditions.checkNotNull(history);
         this.id = Preconditions.checkNotNull(id);
@@ -58,6 +64,10 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
         return history;
     }
 
         return history;
     }
 
+    final String persistenceId() {
+        return history().persistenceId();
+    }
+
     final java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
         // Fast path check: if the requested sequence is the next request, bail early
         if (expectedSequence == sequence) {
     final java.util.Optional<TransactionSuccess<?>> replaySequence(final long sequence) throws RequestException {
         // Fast path check: if the requested sequence is the next request, bail early
         if (expectedSequence == sequence) {
@@ -108,9 +118,30 @@ abstract class FrontendTransaction implements Identifiable<TransactionIdentifier
         lastPurgedSequence = sequence;
     }
 
         lastPurgedSequence = sequence;
     }
 
-    // Sequence has already been checked
-    abstract @Nullable TransactionSuccess<?> handleRequest(TransactionRequest<?> request,
-            RequestEnvelope envelope, long now) throws RequestException;
+    // Request order has already been checked by caller and replaySequence()
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    final @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        if (previousFailure != null) {
+            LOG.debug("{}: Rejecting request {} due to previous failure", persistenceId(), request, previousFailure);
+            throw previousFailure;
+        }
+
+        try {
+            return doHandleRequest(request, envelope, now);
+        } catch (RuntimeException e) {
+            /*
+             * The request failed to process, we should not attempt to ever apply it again. Furthermore we cannot
+             * accept any further requests from this connection, simply because the transaction state is undefined.
+             */
+            LOG.debug("{}: Request {} failed to process", persistenceId(), request, e);
+            previousFailure = new RuntimeRequestException("Request " + request + " failed to process", e);
+            throw previousFailure;
+        }
+    }
+
+    abstract @Nullable TransactionSuccess<?> doHandleRequest(TransactionRequest<?> request, RequestEnvelope envelope,
+            long now) throws RequestException;
 
     private void recordResponse(final long sequence, final Object response) {
         if (replayQueue.isEmpty()) {
 
     private void recordResponse(final long sequence, final Object response) {
         if (replayQueue.isEmpty()) {