Relax visibility on FrontendReadWriteTransaction methods
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendReadWriteTransaction.java
index 79d555c85695d7b437bf4831036ca6a8b9070f9b..d80488ca2207ec1e2342cb44029ec90e1e9eaab6 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
+import java.util.Collection;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
@@ -100,7 +101,8 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
             handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionAbortRequest) {
-            return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+            handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+            return null;
         } else {
             throw new UnsupportedRequestException(request);
         }
@@ -139,11 +141,12 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         });
     }
 
-    private TransactionSuccess<?> handleTransactionAbort(final TransactionAbortRequest request,
+    private void handleTransactionAbort(final TransactionAbortRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
         if (readyCohort == null) {
-            openTransaction.abort();
-            return new TransactionAbortSuccess(getIdentifier(), request.getSequence());
+            openTransaction.abort(() -> recordAndSendSuccess(envelope, now,
+                new TransactionAbortSuccess(getIdentifier(), request.getSequence())));
+            return;
         }
 
         readyCohort.abort(new FutureCallback<Void>() {
@@ -162,7 +165,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
             }
         });
-        return null;
     }
 
     private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
@@ -194,10 +196,9 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 readyCohort = null;
             }
         });
-
     }
 
-    private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
+    void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
         readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
             @Override
             public void onSuccess(final DataTreeCandidate result) {
@@ -212,9 +213,8 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         });
     }
 
-    private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
+    void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
         readyCohort.commit(new FutureCallback<UnsignedLong>() {
-
             @Override
             public void onSuccess(final UnsignedLong result) {
                 successfulCommit(envelope, startTime);
@@ -228,7 +228,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         });
     }
 
-    private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+    void successfulCommit(final RequestEnvelope envelope, final long startTime) {
         recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
             envelope.getMessage().getSequence()));
         readyCohort = null;
@@ -236,16 +236,22 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
-        if (sealedModification.equals(request.getModification())) {
+        if (!sealedModification.equals(request.getModification())) {
+            LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
+            throw new UnsupportedRequestException(request);
+        }
+
+        final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
+        if (optFailure.isPresent()) {
+            readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get());
+        } else {
             readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+        }
 
-            if (request.isCoordinated()) {
-                coordinatedCommit(envelope, now);
-            } else {
-                directCommit(envelope, now);
-            }
+        if (request.isCoordinated()) {
+            coordinatedCommit(envelope, now);
         } else {
-            throw new UnsupportedRequestException(request);
+            directCommit(envelope, now);
         }
     }
 
@@ -270,16 +276,19 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
     private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
 
-        final DataTreeModification modification = openTransaction.getSnapshot();
-        for (TransactionModification m : request.getModifications()) {
-            if (m instanceof TransactionDelete) {
-                modification.delete(m.getPath());
-            } else if (m instanceof TransactionWrite) {
-                modification.write(m.getPath(), ((TransactionWrite) m).getData());
-            } else if (m instanceof TransactionMerge) {
-                modification.merge(m.getPath(), ((TransactionMerge) m).getData());
-            } else {
-                LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+        final Collection<TransactionModification> mods = request.getModifications();
+        if (!mods.isEmpty()) {
+            final DataTreeModification modification = openTransaction.getSnapshot();
+            for (TransactionModification m : mods) {
+                if (m instanceof TransactionDelete) {
+                    modification.delete(m.getPath());
+                } else if (m instanceof TransactionWrite) {
+                    modification.write(m.getPath(), ((TransactionWrite) m).getData());
+                } else if (m instanceof TransactionMerge) {
+                    modification.merge(m.getPath(), ((TransactionMerge) m).getData());
+                } else {
+                    LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+                }
             }
         }
 
@@ -290,21 +299,32 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
         switch (maybeProto.get()) {
             case ABORT:
-                openTransaction.abort();
+                openTransaction.abort(() -> replyModifySuccess(request.getSequence()));
                 openTransaction = null;
+                return null;
+            case READY:
+                ensureReady();
                 return replyModifySuccess(request.getSequence());
             case SIMPLE:
-                readyCohort = openTransaction.ready();
-                openTransaction = null;
+                ensureReady();
                 directCommit(envelope, now);
                 return null;
             case THREE_PHASE:
-                readyCohort = openTransaction.ready();
-                openTransaction = null;
+                ensureReady();
                 coordinatedCommit(envelope, now);
                 return null;
             default:
                 throw new UnsupportedRequestException(request);
         }
     }
+
+    private void ensureReady() {
+        // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
+        // only once.
+        if (readyCohort == null) {
+            readyCohort = openTransaction.ready();
+            LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier());
+            openTransaction = null;
+        }
+    }
 }