BUG-8402: Record modification failures
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendReadWriteTransaction.java
index 9c4d4ddbca9f1cd1bdd81bb3654fc76d156b7fee..45edfc9333d9530b347e735756e48fc81c678b08 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import java.util.Collection;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
@@ -83,7 +84,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     // Sequence has already been checked
     @Override
-    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+    @Nullable TransactionSuccess<?> doHandleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
             final long now) throws RequestException {
         if (request instanceof ModifyTransactionRequest) {
             return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
@@ -101,18 +102,17 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
             handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionAbortRequest) {
-            handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+            handleTransactionAbort(request.getSequence(), envelope, now);
+            return null;
+        } else if (request instanceof AbortLocalTransactionRequest) {
+            handleLocalTransactionAbort(request.getSequence(), envelope, now);
             return null;
         } else {
+            LOG.warn("Rejecting unsupported request {}", request);
             throw new UnsupportedRequestException(request);
         }
     }
 
-    @Override
-    void purge(final Runnable callback) {
-        openTransaction.purge(callback);
-    }
-
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
         readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@@ -146,11 +146,17 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         });
     }
 
-    private void handleTransactionAbort(final TransactionAbortRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
+    private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
+        Preconditions.checkState(readyCohort == null, "Transaction {} encountered local abort with commit underway",
+                getIdentifier());
+        openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+            sequence)));
+    }
+
+    private void handleTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
         if (readyCohort == null) {
-            openTransaction.abort(() -> recordAndSendSuccess(envelope, now,
-                new TransactionAbortSuccess(getIdentifier(), request.getSequence())));
+            openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+                sequence)));
             return;
         }
 
@@ -158,8 +164,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
             @Override
             public void onSuccess(final Void result) {
                 readyCohort = null;
-                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
-                    request.getSequence()));
+                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence));
                 LOG.debug("Transaction {} aborted", getIdentifier());
             }
 
@@ -203,7 +208,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         });
     }
 
-    private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
+    void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
         readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
             @Override
             public void onSuccess(final DataTreeCandidate result) {
@@ -218,9 +223,8 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         });
     }
 
-    private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
+    void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
         readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
             @Override
             public void onSuccess(final UnsignedLong result) {
                 successfulCommit(envelope, startTime);
@@ -234,7 +238,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         });
     }
 
-    private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+    void successfulCommit(final RequestEnvelope envelope, final long startTime) {
         recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
             envelope.getMessage().getSequence()));
         readyCohort = null;
@@ -242,16 +246,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
-        if (sealedModification.equals(request.getModification())) {
+        if (!sealedModification.equals(request.getModification())) {
+            LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
+            throw new UnsupportedRequestException(request);
+        }
+
+        final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
+        if (optFailure.isPresent()) {
+            readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get());
+        } else {
             readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+        }
 
-            if (request.isCoordinated()) {
-                coordinatedCommit(envelope, now);
-            } else {
-                directCommit(envelope, now);
-            }
+        if (request.isCoordinated()) {
+            coordinatedCommit(envelope, now);
         } else {
-            throw new UnsupportedRequestException(request);
+            directCommit(envelope, now);
         }
     }
 
@@ -287,7 +297,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 } else if (m instanceof TransactionMerge) {
                     modification.merge(m.getPath(), ((TransactionMerge) m).getData());
                 } else {
-                    LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+                    LOG.warn("{}: ignoring unhandled modification {}", persistenceId(), m);
                 }
             }
         }
@@ -314,6 +324,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 coordinatedCommit(envelope, now);
                 return null;
             default:
+                LOG.warn("{}: rejecting unsupported protocol {}", persistenceId(), maybeProto.get());
                 throw new UnsupportedRequestException(request);
         }
     }
@@ -323,7 +334,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         // only once.
         if (readyCohort == null) {
             readyCohort = openTransaction.ready();
-            LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier());
+            LOG.debug("{}: transitioned {} to ready", persistenceId(), openTransaction.getIdentifier());
             openTransaction = null;
         }
     }