BUG-8507: Fix replayed directCommit() on reconnect
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendReadWriteTransaction.java
index 45edfc9..6cacb32 100644 (file)
@@ -54,22 +54,106 @@ import org.slf4j.LoggerFactory;
  */
 @NotThreadSafe
 final class FrontendReadWriteTransaction extends FrontendTransaction {
+    private enum CommitStage {
+        READY,
+        CAN_COMMIT_PENDING,
+        CAN_COMMIT_COMPLETE,
+        PRE_COMMIT_PENDING,
+        PRE_COMMIT_COMPLETE,
+        COMMIT_PENDING,
+    }
+
+    private abstract static class State {
+        @Override
+        public abstract String toString();
+    }
+
+    private static final class Failed extends State {
+        final RequestException cause;
+
+        Failed(final RequestException cause) {
+            this.cause = Preconditions.checkNotNull(cause);
+        }
+
+        @Override
+        public String toString() {
+            return "FAILED (" + cause.getMessage() + ")";
+        }
+    }
+
+    private static final class Open extends State {
+        final ReadWriteShardDataTreeTransaction openTransaction;
+
+        Open(final ReadWriteShardDataTreeTransaction openTransaction) {
+            this.openTransaction = Preconditions.checkNotNull(openTransaction);
+        }
+
+        @Override
+        public String toString() {
+            return "OPEN";
+        }
+    }
+
+    private static final class Ready extends State {
+        final ShardDataTreeCohort readyCohort;
+        CommitStage stage;
+
+        Ready(final ShardDataTreeCohort readyCohort) {
+            this.readyCohort = Preconditions.checkNotNull(readyCohort);
+            this.stage = CommitStage.READY;
+        }
+
+        @Override
+        public String toString() {
+            return "READY (" + stage + ")";
+        }
+    }
+
+    private static final class Sealed extends State {
+        final DataTreeModification sealedModification;
+
+        Sealed(final DataTreeModification sealedModification) {
+            this.sealedModification = Preconditions.checkNotNull(sealedModification);
+        }
+
+        @Override
+        public String toString() {
+            return "SEALED";
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
+    private static final State ABORTED = new State() {
+        @Override
+        public String toString() {
+            return "ABORTED";
+        }
+    };
+    private static final State ABORTING = new State() {
+        @Override
+        public String toString() {
+            return "ABORTING";
+        }
+    };
+    private static final State COMMITTED = new State() {
+        @Override
+        public String toString() {
+            return "COMMITTED";
+        }
+    };
 
-    private ReadWriteShardDataTreeTransaction openTransaction;
-    private DataTreeModification sealedModification;
-    private ShardDataTreeCohort readyCohort;
+    private State state;
 
     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
             final ReadWriteShardDataTreeTransaction transaction) {
         super(history, id);
-        this.openTransaction = Preconditions.checkNotNull(transaction);
+        this.state = new Open(transaction);
     }
 
     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
             final DataTreeModification mod) {
         super(history, id);
-        this.sealedModification = Preconditions.checkNotNull(mod);
+        this.state = new Sealed(mod);
     }
 
     static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history,
@@ -102,8 +186,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
             handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionAbortRequest) {
-            handleTransactionAbort(request.getSequence(), envelope, now);
-            return null;
+            return handleTransactionAbort(request.getSequence(), envelope, now);
         } else if (request instanceof AbortLocalTransactionRequest) {
             handleLocalTransactionAbort(request.getSequence(), envelope, now);
             return null;
@@ -115,101 +198,211 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
-        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
-            @Override
-            public void onSuccess(final DataTreeCandidate result) {
-                recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
-                    request.getSequence()));
-            }
+        throwIfFailed();
+
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case PRE_COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} is already preCommitting", persistenceId(), getIdentifier());
+                break;
+            case CAN_COMMIT_COMPLETE:
+                ready.stage = CommitStage.PRE_COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating preCommit", persistenceId(), getIdentifier());
+                ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+                    @Override
+                    public void onSuccess(final DataTreeCandidate result) {
+                        LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
+                        recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(getIdentifier(),
+                            request.getSequence()));
+                        ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("Precommit failed", failure));
+                    }
+                });
+                break;
+            case CAN_COMMIT_PENDING:
+            case COMMIT_PENDING:
+            case PRE_COMMIT_COMPLETE:
+            case READY:
+                throw new IllegalStateException("Attempted to preCommit in stage " + ready.stage);
+            default:
+                throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+        }
+    }
 
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
-                readyCohort = null;
-            }
-        });
+    private void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
+        recordAndSendFailure(envelope, now, cause);
+        state = new Failed(cause);
+        LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
     }
 
     private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
             final long now) throws RequestException {
-        readyCohort.commit(new FutureCallback<UnsignedLong>() {
-            @Override
-            public void onSuccess(final UnsignedLong result) {
-                successfulCommit(envelope, now);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
-                readyCohort = null;
-            }
-        });
+        throwIfFailed();
+
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} is already committing", persistenceId(), getIdentifier());
+                break;
+            case PRE_COMMIT_COMPLETE:
+                ready.stage = CommitStage.COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating commit", persistenceId(), getIdentifier());
+                ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
+                    @Override
+                    public void onSuccess(final UnsignedLong result) {
+                        successfulCommit(envelope, now);
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("Commit failed", failure));
+                    }
+                });
+                break;
+            case CAN_COMMIT_COMPLETE:
+            case CAN_COMMIT_PENDING:
+            case PRE_COMMIT_PENDING:
+            case READY:
+                throw new IllegalStateException("Attempted to doCommit in stage " + ready.stage);
+            default:
+                throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+        }
     }
 
     private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
-        Preconditions.checkState(readyCohort == null, "Transaction {} encountered local abort with commit underway",
-                getIdentifier());
-        openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+        checkOpen().abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
             sequence)));
     }
 
-    private void handleTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
-        if (readyCohort == null) {
-            openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
-                sequence)));
-            return;
+    private void startAbort() {
+        state = ABORTING;
+        LOG.debug("{}: Transaction {} aborting", persistenceId(), getIdentifier());
+    }
+
+    private void finishAbort() {
+        state = ABORTED;
+        LOG.debug("{}: Transaction {} aborted", persistenceId(), getIdentifier());
+    }
+
+    private TransactionAbortSuccess handleTransactionAbort(final long sequence, final RequestEnvelope envelope,
+            final long now) {
+        if (state instanceof Open) {
+            final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
+            startAbort();
+            openTransaction.abort(() -> {
+                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+                    sequence));
+                finishAbort();
+            });
+            return null;
+        }
+        if (ABORTING.equals(state)) {
+            LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
+            return null;
+        }
+        if (ABORTED.equals(state)) {
+            // We should have recorded the reply
+            LOG.warn("{}: Transaction {} already aborted", persistenceId(), getIdentifier());
+            return new TransactionAbortSuccess(getIdentifier(), sequence);
         }
 
-        readyCohort.abort(new FutureCallback<Void>() {
+        final Ready ready = checkReady();
+        startAbort();
+        ready.readyCohort.abort(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                readyCohort = null;
                 recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence));
-                LOG.debug("Transaction {} aborted", getIdentifier());
+                finishAbort();
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                readyCohort = null;
-                LOG.warn("Transaction {} abort failed", getIdentifier(), failure);
                 recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
+                LOG.warn("{}: Transaction {} abort failed", persistenceId(), getIdentifier(), failure);
+                finishAbort();
             }
         });
+        return null;
     }
 
-    private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
-        readyCohort.canCommit(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
-                    envelope.getMessage().getSequence()));
-            }
+    private void coordinatedCommit(final RequestEnvelope envelope, final long now) throws RequestException {
+        throwIfFailed();
 
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
-                readyCohort = null;
-            }
-        });
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case CAN_COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} is already canCommitting", persistenceId(), getIdentifier());
+                break;
+            case READY:
+                ready.stage = CommitStage.CAN_COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating canCommit", persistenceId(), getIdentifier());
+                checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess(final Void result) {
+                        recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(getIdentifier(),
+                            envelope.getMessage().getSequence()));
+                        ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
+                        LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+                    }
+                });
+                break;
+            case CAN_COMMIT_COMPLETE:
+            case COMMIT_PENDING:
+            case PRE_COMMIT_COMPLETE:
+            case PRE_COMMIT_PENDING:
+                throw new IllegalStateException("Attempted to canCommit in stage " + ready.stage);
+            default:
+                throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+        }
     }
 
-    private void directCommit(final RequestEnvelope envelope, final long now) {
-        readyCohort.canCommit(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                successfulDirectCanCommit(envelope, now);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
-                readyCohort = null;
-            }
-        });
+    private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
+        throwIfFailed();
+
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case CAN_COMMIT_COMPLETE:
+            case CAN_COMMIT_PENDING:
+            case COMMIT_PENDING:
+            case PRE_COMMIT_COMPLETE:
+            case PRE_COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} in state {}, not initiating direct commit for {}", persistenceId(),
+                    getIdentifier(), state, envelope);
+                break;
+            case READY:
+                ready.stage = CommitStage.CAN_COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating direct canCommit", persistenceId(), getIdentifier());
+                ready.readyCohort.canCommit(new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess(final Void result) {
+                        successfulDirectCanCommit(envelope, now);
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+                    }
+                });
+                break;
+            default:
+                throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+        }
     }
 
     void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
-        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+        final Ready ready = checkReady();
+        ready.stage = CommitStage.PRE_COMMIT_PENDING;
+        LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
+        ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
             @Override
             public void onSuccess(final DataTreeCandidate result) {
                 successfulDirectPreCommit(envelope, startTime);
@@ -217,14 +410,16 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
-                readyCohort = null;
+                failTransaction(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
             }
         });
     }
 
     void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
-        readyCohort.commit(new FutureCallback<UnsignedLong>() {
+        final Ready ready = checkReady();
+        ready.stage = CommitStage.COMMIT_PENDING;
+        LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
+        ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
             @Override
             public void onSuccess(final UnsignedLong result) {
                 successfulCommit(envelope, startTime);
@@ -232,20 +427,20 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
-                readyCohort = null;
+                failTransaction(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
             }
         });
     }
 
     void successfulCommit(final RequestEnvelope envelope, final long startTime) {
-        recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+        recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
             envelope.getMessage().getSequence()));
-        readyCohort = null;
+        state = COMMITTED;
     }
 
     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
+        final DataTreeModification sealedModification = checkSealed();
         if (!sealedModification.equals(request.getModification())) {
             LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
             throw new UnsupportedRequestException(request);
@@ -253,9 +448,9 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
         final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
         if (optFailure.isPresent()) {
-            readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get());
+            state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()));
         } else {
-            readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+            state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification));
         }
 
         if (request.isCoordinated()) {
@@ -267,14 +462,14 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
             throws RequestException {
-        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
         return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(),
             data.isPresent()));
     }
 
     private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
             throws RequestException {
-        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
         return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(),
             data));
     }
@@ -283,13 +478,10 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence));
     }
 
-    private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-
-        final Collection<TransactionModification> mods = request.getModifications();
-        if (!mods.isEmpty()) {
-            final DataTreeModification modification = openTransaction.getSnapshot();
-            for (TransactionModification m : mods) {
+    private void applyModifications(final Collection<TransactionModification> modifications) {
+        if (!modifications.isEmpty()) {
+            final DataTreeModification modification = checkOpen().getSnapshot();
+            for (TransactionModification m : modifications) {
                 if (m instanceof TransactionDelete) {
                     modification.delete(m.getPath());
                 } else if (m instanceof TransactionWrite) {
@@ -301,26 +493,41 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 }
             }
         }
+    }
 
+    private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        // We need to examine the persistence protocol first to see if this is an idempotent request. If there is no
+        // protocol, there is nothing for us to do.
         final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
         if (!maybeProto.isPresent()) {
+            applyModifications(request.getModifications());
             return replyModifySuccess(request.getSequence());
         }
 
         switch (maybeProto.get()) {
             case ABORT:
-                openTransaction.abort(() -> replyModifySuccess(request.getSequence()));
-                openTransaction = null;
+                if (ABORTING.equals(state)) {
+                    LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
+                    return null;
+                }
+                final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
+                startAbort();
+                openTransaction.abort(() -> {
+                    recordAndSendSuccess(envelope, now, new ModifyTransactionSuccess(getIdentifier(),
+                        request.getSequence()));
+                    finishAbort();
+                });
                 return null;
             case READY:
-                ensureReady();
+                ensureReady(request.getModifications());
                 return replyModifySuccess(request.getSequence());
             case SIMPLE:
-                ensureReady();
+                ensureReady(request.getModifications());
                 directCommit(envelope, now);
                 return null;
             case THREE_PHASE:
-                ensureReady();
+                ensureReady(request.getModifications());
                 coordinatedCommit(envelope, now);
                 return null;
             default:
@@ -329,13 +536,41 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         }
     }
 
-    private void ensureReady() {
+    private void ensureReady(final Collection<TransactionModification> modifications) {
         // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
         // only once.
-        if (readyCohort == null) {
-            readyCohort = openTransaction.ready();
-            LOG.debug("{}: transitioned {} to ready", persistenceId(), openTransaction.getIdentifier());
-            openTransaction = null;
+        if (state instanceof Ready) {
+            LOG.debug("{}: {} is already in state {}", persistenceId(), getIdentifier(), state);
+            return;
+        }
+
+        applyModifications(modifications);
+        state = new Ready(checkOpen().ready());
+        LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier());
+    }
+
+    private void throwIfFailed() throws RequestException {
+        if (state instanceof Failed) {
+            LOG.debug("{}: {} has failed, rejecting request", persistenceId(), getIdentifier());
+            throw ((Failed) state).cause;
         }
     }
+
+    private ReadWriteShardDataTreeTransaction checkOpen() {
+        Preconditions.checkState(state instanceof Open, "%s expect to be open, is in state %s", getIdentifier(),
+            state);
+        return ((Open) state).openTransaction;
+    }
+
+    private Ready checkReady() {
+        Preconditions.checkState(state instanceof Ready, "%s expect to be ready, is in state %s", getIdentifier(),
+            state);
+        return (Ready) state;
+    }
+
+    private DataTreeModification checkSealed() {
+        Preconditions.checkState(state instanceof Sealed, "%s expect to be sealed, is in state %s", getIdentifier(),
+            state);
+        return ((Sealed) state).sealedModification;
+    }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.