BUG-8507: Fix replayed directCommit() on reconnect 71/57571/1
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 18 May 2017 21:24:15 +0000 (23:24 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Fri, 19 May 2017 18:17:09 +0000 (20:17 +0200)
After remote shard reconnect of a brief isolation, we have observed
a NPE indicating that we encounter a NPE when faced with a direct
commit.

Assuming state engine correctness, this can happen during the time
when we have completed preCommit and before we have recorded the
request result (i.e. after commit completes).

At any rate, this flushes out the need for transaction transitions
to be idempotent, which is something ShardDataTreeTransaction and
ShardDataTreeCohort do not provide.

Encapsulate FrontendReadWriteTransaction state into distinct state
objects. This allows us to accurately track the internal transaction
state and detect when a canCommit, directCommit, preCommit and
doCommit are no-ops because the request is being already handled.

Change-Id: Ib533ec9a4882f51f7914c5b11865ac093c6d6ad0
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 59fab9a9bc6dbf9ad538b3df4460eff146c63ce2)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransactionTest.java [new file with mode: 0644]

index 45edfc9333d9530b347e735756e48fc81c678b08..6cacb325a03fc441f99496d07552cd58cfe867df 100644 (file)
@@ -54,22 +54,106 @@ import org.slf4j.LoggerFactory;
  */
 @NotThreadSafe
 final class FrontendReadWriteTransaction extends FrontendTransaction {
  */
 @NotThreadSafe
 final class FrontendReadWriteTransaction extends FrontendTransaction {
+    private enum CommitStage {
+        READY,
+        CAN_COMMIT_PENDING,
+        CAN_COMMIT_COMPLETE,
+        PRE_COMMIT_PENDING,
+        PRE_COMMIT_COMPLETE,
+        COMMIT_PENDING,
+    }
+
+    private abstract static class State {
+        @Override
+        public abstract String toString();
+    }
+
+    private static final class Failed extends State {
+        final RequestException cause;
+
+        Failed(final RequestException cause) {
+            this.cause = Preconditions.checkNotNull(cause);
+        }
+
+        @Override
+        public String toString() {
+            return "FAILED (" + cause.getMessage() + ")";
+        }
+    }
+
+    private static final class Open extends State {
+        final ReadWriteShardDataTreeTransaction openTransaction;
+
+        Open(final ReadWriteShardDataTreeTransaction openTransaction) {
+            this.openTransaction = Preconditions.checkNotNull(openTransaction);
+        }
+
+        @Override
+        public String toString() {
+            return "OPEN";
+        }
+    }
+
+    private static final class Ready extends State {
+        final ShardDataTreeCohort readyCohort;
+        CommitStage stage;
+
+        Ready(final ShardDataTreeCohort readyCohort) {
+            this.readyCohort = Preconditions.checkNotNull(readyCohort);
+            this.stage = CommitStage.READY;
+        }
+
+        @Override
+        public String toString() {
+            return "READY (" + stage + ")";
+        }
+    }
+
+    private static final class Sealed extends State {
+        final DataTreeModification sealedModification;
+
+        Sealed(final DataTreeModification sealedModification) {
+            this.sealedModification = Preconditions.checkNotNull(sealedModification);
+        }
+
+        @Override
+        public String toString() {
+            return "SEALED";
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
     private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class);
+    private static final State ABORTED = new State() {
+        @Override
+        public String toString() {
+            return "ABORTED";
+        }
+    };
+    private static final State ABORTING = new State() {
+        @Override
+        public String toString() {
+            return "ABORTING";
+        }
+    };
+    private static final State COMMITTED = new State() {
+        @Override
+        public String toString() {
+            return "COMMITTED";
+        }
+    };
 
 
-    private ReadWriteShardDataTreeTransaction openTransaction;
-    private DataTreeModification sealedModification;
-    private ShardDataTreeCohort readyCohort;
+    private State state;
 
     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
             final ReadWriteShardDataTreeTransaction transaction) {
         super(history, id);
 
     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
             final ReadWriteShardDataTreeTransaction transaction) {
         super(history, id);
-        this.openTransaction = Preconditions.checkNotNull(transaction);
+        this.state = new Open(transaction);
     }
 
     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
             final DataTreeModification mod) {
         super(history, id);
     }
 
     private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id,
             final DataTreeModification mod) {
         super(history, id);
-        this.sealedModification = Preconditions.checkNotNull(mod);
+        this.state = new Sealed(mod);
     }
 
     static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history,
     }
 
     static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history,
@@ -102,8 +186,7 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
             handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionAbortRequest) {
             handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionAbortRequest) {
-            handleTransactionAbort(request.getSequence(), envelope, now);
-            return null;
+            return handleTransactionAbort(request.getSequence(), envelope, now);
         } else if (request instanceof AbortLocalTransactionRequest) {
             handleLocalTransactionAbort(request.getSequence(), envelope, now);
             return null;
         } else if (request instanceof AbortLocalTransactionRequest) {
             handleLocalTransactionAbort(request.getSequence(), envelope, now);
             return null;
@@ -115,101 +198,211 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
 
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
-        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
-            @Override
-            public void onSuccess(final DataTreeCandidate result) {
-                recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
-                    request.getSequence()));
-            }
+        throwIfFailed();
+
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case PRE_COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} is already preCommitting", persistenceId(), getIdentifier());
+                break;
+            case CAN_COMMIT_COMPLETE:
+                ready.stage = CommitStage.PRE_COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating preCommit", persistenceId(), getIdentifier());
+                ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+                    @Override
+                    public void onSuccess(final DataTreeCandidate result) {
+                        LOG.debug("{}: Transaction {} completed preCommit", persistenceId(), getIdentifier());
+                        recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(getIdentifier(),
+                            request.getSequence()));
+                        ready.stage = CommitStage.PRE_COMMIT_COMPLETE;
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("Precommit failed", failure));
+                    }
+                });
+                break;
+            case CAN_COMMIT_PENDING:
+            case COMMIT_PENDING:
+            case PRE_COMMIT_COMPLETE:
+            case READY:
+                throw new IllegalStateException("Attempted to preCommit in stage " + ready.stage);
+            default:
+                throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+        }
+    }
 
 
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
-                readyCohort = null;
-            }
-        });
+    private void failTransaction(final RequestEnvelope envelope, final long now, final RuntimeRequestException cause) {
+        recordAndSendFailure(envelope, now, cause);
+        state = new Failed(cause);
+        LOG.debug("{}: Transaction {} failed", persistenceId(), getIdentifier(), cause);
     }
 
     private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
             final long now) throws RequestException {
     }
 
     private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
             final long now) throws RequestException {
-        readyCohort.commit(new FutureCallback<UnsignedLong>() {
-            @Override
-            public void onSuccess(final UnsignedLong result) {
-                successfulCommit(envelope, now);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
-                readyCohort = null;
-            }
-        });
+        throwIfFailed();
+
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} is already committing", persistenceId(), getIdentifier());
+                break;
+            case PRE_COMMIT_COMPLETE:
+                ready.stage = CommitStage.COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating commit", persistenceId(), getIdentifier());
+                ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
+                    @Override
+                    public void onSuccess(final UnsignedLong result) {
+                        successfulCommit(envelope, now);
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("Commit failed", failure));
+                    }
+                });
+                break;
+            case CAN_COMMIT_COMPLETE:
+            case CAN_COMMIT_PENDING:
+            case PRE_COMMIT_PENDING:
+            case READY:
+                throw new IllegalStateException("Attempted to doCommit in stage " + ready.stage);
+            default:
+                throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+        }
     }
 
     private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
     }
 
     private void handleLocalTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
-        Preconditions.checkState(readyCohort == null, "Transaction {} encountered local abort with commit underway",
-                getIdentifier());
-        openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+        checkOpen().abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
             sequence)));
     }
 
             sequence)));
     }
 
-    private void handleTransactionAbort(final long sequence, final RequestEnvelope envelope, final long now) {
-        if (readyCohort == null) {
-            openTransaction.abort(() -> recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
-                sequence)));
-            return;
+    private void startAbort() {
+        state = ABORTING;
+        LOG.debug("{}: Transaction {} aborting", persistenceId(), getIdentifier());
+    }
+
+    private void finishAbort() {
+        state = ABORTED;
+        LOG.debug("{}: Transaction {} aborted", persistenceId(), getIdentifier());
+    }
+
+    private TransactionAbortSuccess handleTransactionAbort(final long sequence, final RequestEnvelope envelope,
+            final long now) {
+        if (state instanceof Open) {
+            final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
+            startAbort();
+            openTransaction.abort(() -> {
+                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(),
+                    sequence));
+                finishAbort();
+            });
+            return null;
+        }
+        if (ABORTING.equals(state)) {
+            LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
+            return null;
+        }
+        if (ABORTED.equals(state)) {
+            // We should have recorded the reply
+            LOG.warn("{}: Transaction {} already aborted", persistenceId(), getIdentifier());
+            return new TransactionAbortSuccess(getIdentifier(), sequence);
         }
 
         }
 
-        readyCohort.abort(new FutureCallback<Void>() {
+        final Ready ready = checkReady();
+        startAbort();
+        ready.readyCohort.abort(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
             @Override
             public void onSuccess(final Void result) {
-                readyCohort = null;
                 recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence));
                 recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), sequence));
-                LOG.debug("Transaction {} aborted", getIdentifier());
+                finishAbort();
             }
 
             @Override
             public void onFailure(final Throwable failure) {
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                readyCohort = null;
-                LOG.warn("Transaction {} abort failed", getIdentifier(), failure);
                 recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
                 recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
+                LOG.warn("{}: Transaction {} abort failed", persistenceId(), getIdentifier(), failure);
+                finishAbort();
             }
         });
             }
         });
+        return null;
     }
 
     }
 
-    private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
-        readyCohort.canCommit(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
-                    envelope.getMessage().getSequence()));
-            }
+    private void coordinatedCommit(final RequestEnvelope envelope, final long now) throws RequestException {
+        throwIfFailed();
 
 
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
-                readyCohort = null;
-            }
-        });
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case CAN_COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} is already canCommitting", persistenceId(), getIdentifier());
+                break;
+            case READY:
+                ready.stage = CommitStage.CAN_COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating canCommit", persistenceId(), getIdentifier());
+                checkReady().readyCohort.canCommit(new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess(final Void result) {
+                        recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(getIdentifier(),
+                            envelope.getMessage().getSequence()));
+                        ready.stage = CommitStage.CAN_COMMIT_COMPLETE;
+                        LOG.debug("{}: Transaction {} completed canCommit", persistenceId(), getIdentifier());
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+                    }
+                });
+                break;
+            case CAN_COMMIT_COMPLETE:
+            case COMMIT_PENDING:
+            case PRE_COMMIT_COMPLETE:
+            case PRE_COMMIT_PENDING:
+                throw new IllegalStateException("Attempted to canCommit in stage " + ready.stage);
+            default:
+                throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+        }
     }
 
     }
 
-    private void directCommit(final RequestEnvelope envelope, final long now) {
-        readyCohort.canCommit(new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(final Void result) {
-                successfulDirectCanCommit(envelope, now);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
-                readyCohort = null;
-            }
-        });
+    private void directCommit(final RequestEnvelope envelope, final long now) throws RequestException {
+        throwIfFailed();
+
+        final Ready ready = checkReady();
+        switch (ready.stage) {
+            case CAN_COMMIT_COMPLETE:
+            case CAN_COMMIT_PENDING:
+            case COMMIT_PENDING:
+            case PRE_COMMIT_COMPLETE:
+            case PRE_COMMIT_PENDING:
+                LOG.debug("{}: Transaction {} in state {}, not initiating direct commit for {}", persistenceId(),
+                    getIdentifier(), state, envelope);
+                break;
+            case READY:
+                ready.stage = CommitStage.CAN_COMMIT_PENDING;
+                LOG.debug("{}: Transaction {} initiating direct canCommit", persistenceId(), getIdentifier());
+                ready.readyCohort.canCommit(new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess(final Void result) {
+                        successfulDirectCanCommit(envelope, now);
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable failure) {
+                        failTransaction(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
+                    }
+                });
+                break;
+            default:
+                throw new IllegalStateException("Unhandled commit stage " + ready.stage);
+        }
     }
 
     void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
     }
 
     void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
-        readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
+        final Ready ready = checkReady();
+        ready.stage = CommitStage.PRE_COMMIT_PENDING;
+        LOG.debug("{}: Transaction {} initiating direct preCommit", persistenceId(), getIdentifier());
+        ready.readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
             @Override
             public void onSuccess(final DataTreeCandidate result) {
                 successfulDirectPreCommit(envelope, startTime);
             @Override
             public void onSuccess(final DataTreeCandidate result) {
                 successfulDirectPreCommit(envelope, startTime);
@@ -217,14 +410,16 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
             @Override
             public void onFailure(final Throwable failure) {
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
-                readyCohort = null;
+                failTransaction(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
             }
         });
     }
 
     void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
             }
         });
     }
 
     void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
-        readyCohort.commit(new FutureCallback<UnsignedLong>() {
+        final Ready ready = checkReady();
+        ready.stage = CommitStage.COMMIT_PENDING;
+        LOG.debug("{}: Transaction {} initiating direct commit", persistenceId(), getIdentifier());
+        ready.readyCohort.commit(new FutureCallback<UnsignedLong>() {
             @Override
             public void onSuccess(final UnsignedLong result) {
                 successfulCommit(envelope, startTime);
             @Override
             public void onSuccess(final UnsignedLong result) {
                 successfulCommit(envelope, startTime);
@@ -232,20 +427,20 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
             @Override
             public void onFailure(final Throwable failure) {
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
-                readyCohort = null;
+                failTransaction(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
             }
         });
     }
 
     void successfulCommit(final RequestEnvelope envelope, final long startTime) {
             }
         });
     }
 
     void successfulCommit(final RequestEnvelope envelope, final long startTime) {
-        recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+        recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(getIdentifier(),
             envelope.getMessage().getSequence()));
             envelope.getMessage().getSequence()));
-        readyCohort = null;
+        state = COMMITTED;
     }
 
     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
     }
 
     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
+        final DataTreeModification sealedModification = checkSealed();
         if (!sealedModification.equals(request.getModification())) {
             LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
             throw new UnsupportedRequestException(request);
         if (!sealedModification.equals(request.getModification())) {
             LOG.warn("Expecting modification {}, commit request has {}", sealedModification, request.getModification());
             throw new UnsupportedRequestException(request);
@@ -253,9 +448,9 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
         final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
         if (optFailure.isPresent()) {
 
         final java.util.Optional<Exception> optFailure = request.getDelayedFailure();
         if (optFailure.isPresent()) {
-            readyCohort = history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get());
+            state = new Ready(history().createFailedCohort(getIdentifier(), sealedModification, optFailure.get()));
         } else {
         } else {
-            readyCohort = history().createReadyCohort(getIdentifier(), sealedModification);
+            state = new Ready(history().createReadyCohort(getIdentifier(), sealedModification));
         }
 
         if (request.isCoordinated()) {
         }
 
         if (request.isCoordinated()) {
@@ -267,14 +462,14 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
 
     private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
             throws RequestException {
 
     private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request)
             throws RequestException {
-        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
         return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(),
             data.isPresent()));
     }
 
     private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
             throws RequestException {
         return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(),
             data.isPresent()));
     }
 
     private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
             throws RequestException {
-        final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
+        final Optional<NormalizedNode<?, ?>> data = checkOpen().getSnapshot().readNode(request.getPath());
         return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(),
             data));
     }
         return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(),
             data));
     }
@@ -283,13 +478,10 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence));
     }
 
         return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence));
     }
 
-    private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-
-        final Collection<TransactionModification> mods = request.getModifications();
-        if (!mods.isEmpty()) {
-            final DataTreeModification modification = openTransaction.getSnapshot();
-            for (TransactionModification m : mods) {
+    private void applyModifications(final Collection<TransactionModification> modifications) {
+        if (!modifications.isEmpty()) {
+            final DataTreeModification modification = checkOpen().getSnapshot();
+            for (TransactionModification m : modifications) {
                 if (m instanceof TransactionDelete) {
                     modification.delete(m.getPath());
                 } else if (m instanceof TransactionWrite) {
                 if (m instanceof TransactionDelete) {
                     modification.delete(m.getPath());
                 } else if (m instanceof TransactionWrite) {
@@ -301,26 +493,41 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 }
             }
         }
                 }
             }
         }
+    }
 
 
+    private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        // We need to examine the persistence protocol first to see if this is an idempotent request. If there is no
+        // protocol, there is nothing for us to do.
         final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
         if (!maybeProto.isPresent()) {
         final java.util.Optional<PersistenceProtocol> maybeProto = request.getPersistenceProtocol();
         if (!maybeProto.isPresent()) {
+            applyModifications(request.getModifications());
             return replyModifySuccess(request.getSequence());
         }
 
         switch (maybeProto.get()) {
             case ABORT:
             return replyModifySuccess(request.getSequence());
         }
 
         switch (maybeProto.get()) {
             case ABORT:
-                openTransaction.abort(() -> replyModifySuccess(request.getSequence()));
-                openTransaction = null;
+                if (ABORTING.equals(state)) {
+                    LOG.debug("{}: Transaction {} already aborting", persistenceId(), getIdentifier());
+                    return null;
+                }
+                final ReadWriteShardDataTreeTransaction openTransaction = checkOpen();
+                startAbort();
+                openTransaction.abort(() -> {
+                    recordAndSendSuccess(envelope, now, new ModifyTransactionSuccess(getIdentifier(),
+                        request.getSequence()));
+                    finishAbort();
+                });
                 return null;
             case READY:
                 return null;
             case READY:
-                ensureReady();
+                ensureReady(request.getModifications());
                 return replyModifySuccess(request.getSequence());
             case SIMPLE:
                 return replyModifySuccess(request.getSequence());
             case SIMPLE:
-                ensureReady();
+                ensureReady(request.getModifications());
                 directCommit(envelope, now);
                 return null;
             case THREE_PHASE:
                 directCommit(envelope, now);
                 return null;
             case THREE_PHASE:
-                ensureReady();
+                ensureReady(request.getModifications());
                 coordinatedCommit(envelope, now);
                 return null;
             default:
                 coordinatedCommit(envelope, now);
                 return null;
             default:
@@ -329,13 +536,41 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
         }
     }
 
         }
     }
 
-    private void ensureReady() {
+    private void ensureReady(final Collection<TransactionModification> modifications) {
         // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
         // only once.
         // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
         // only once.
-        if (readyCohort == null) {
-            readyCohort = openTransaction.ready();
-            LOG.debug("{}: transitioned {} to ready", persistenceId(), openTransaction.getIdentifier());
-            openTransaction = null;
+        if (state instanceof Ready) {
+            LOG.debug("{}: {} is already in state {}", persistenceId(), getIdentifier(), state);
+            return;
+        }
+
+        applyModifications(modifications);
+        state = new Ready(checkOpen().ready());
+        LOG.debug("{}: transitioned {} to ready", persistenceId(), getIdentifier());
+    }
+
+    private void throwIfFailed() throws RequestException {
+        if (state instanceof Failed) {
+            LOG.debug("{}: {} has failed, rejecting request", persistenceId(), getIdentifier());
+            throw ((Failed) state).cause;
         }
     }
         }
     }
+
+    private ReadWriteShardDataTreeTransaction checkOpen() {
+        Preconditions.checkState(state instanceof Open, "%s expect to be open, is in state %s", getIdentifier(),
+            state);
+        return ((Open) state).openTransaction;
+    }
+
+    private Ready checkReady() {
+        Preconditions.checkState(state instanceof Ready, "%s expect to be ready, is in state %s", getIdentifier(),
+            state);
+        return (Ready) state;
+    }
+
+    private DataTreeModification checkSealed() {
+        Preconditions.checkState(state instanceof Sealed, "%s expect to be sealed, is in state %s", getIdentifier(),
+            state);
+        return ((Sealed) state).sealedModification;
+    }
 }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransactionTest.java
new file mode 100644 (file)
index 0000000..c3c6206
--- /dev/null
@@ -0,0 +1,163 @@
+/*
+ * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import akka.actor.ActorRef;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+
+public class FrontendReadWriteTransactionTest {
+
+    private static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FrontendIdentifier.create(
+        MemberName.forName("mock"), FrontendType.forName("mock")), 0);
+    private static final LocalHistoryIdentifier HISTORY_ID = new LocalHistoryIdentifier(CLIENT_ID, 0);
+    private static final TransactionIdentifier TX_ID = new TransactionIdentifier(HISTORY_ID, 0);
+
+    private AbstractFrontendHistory mockHistory;
+    private ReadWriteShardDataTreeTransaction shardTransaction;
+    private DataTreeModification mockModification;
+    private ShardDataTreeTransactionParent mockParent;
+    private FrontendReadWriteTransaction openTx;
+    private ShardDataTreeCohort mockCohort;
+
+    @Before
+    public void setup() {
+        mockHistory = mock(AbstractFrontendHistory.class);
+        mockParent = mock(ShardDataTreeTransactionParent.class);
+        mockModification = mock(DataTreeModification.class);
+        mockCohort = mock(ShardDataTreeCohort.class);
+
+        shardTransaction = new ReadWriteShardDataTreeTransaction(mockParent, TX_ID, mockModification);
+        openTx = FrontendReadWriteTransaction.createOpen(mockHistory, shardTransaction);
+
+        when(mockParent.finishTransaction(same(shardTransaction))).thenReturn(mockCohort);
+    }
+
+    private TransactionSuccess<?> handleRequest(final TransactionRequest<?> request) throws RequestException {
+        return openTx.doHandleRequest(request, new RequestEnvelope(request, 0, 0), 0);
+    }
+
+    @Test
+    public void testDuplicateModifyAbort() throws RequestException {
+        final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+        b.setSequence(0);
+        b.setAbort();
+        final TransactionRequest<?> abortReq = b.build();
+        assertNull(handleRequest(abortReq));
+        verify(mockParent).abortTransaction(same(shardTransaction), any(Runnable.class));
+
+        assertNull(handleRequest(abortReq));
+        verifyNoMoreInteractions(mockParent);
+    }
+
+    @Test
+    public void testDuplicateReady() throws RequestException {
+        final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+        b.setSequence(0);
+        b.setReady();
+        final TransactionRequest<?> readyReq = b.build();
+
+        assertNotNull(handleRequest(readyReq));
+        verify(mockParent).finishTransaction(same(shardTransaction));
+
+        assertNotNull(handleRequest(readyReq));
+        verifyNoMoreInteractions(mockParent);
+    }
+
+    @Test
+    public void testDuplicateDirect() throws RequestException {
+        final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+        b.setSequence(0);
+        b.setCommit(false);
+        final TransactionRequest<?> readyReq = b.build();
+
+        assertNull(handleRequest(readyReq));
+        verify(mockParent).finishTransaction(same(shardTransaction));
+
+        assertNull(handleRequest(readyReq));
+        verifyNoMoreInteractions(mockParent);
+    }
+
+    @Test
+    public void testDuplicateCoordinated() throws RequestException {
+        final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+        b.setSequence(0);
+        b.setCommit(true);
+        final TransactionRequest<?> readyReq = b.build();
+
+        assertNull(handleRequest(readyReq));
+        verify(mockParent).finishTransaction(same(shardTransaction));
+
+        assertNull(handleRequest(readyReq));
+        verifyNoMoreInteractions(mockParent);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testReadAfterReady() throws RequestException {
+        final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+        b.setSequence(0);
+        b.setReady();
+        final TransactionRequest<?> readyReq = b.build();
+
+        assertNotNull(handleRequest(readyReq));
+        verify(mockParent).finishTransaction(same(shardTransaction));
+
+        handleRequest(new ReadTransactionRequest(TX_ID, 0, mock(ActorRef.class), YangInstanceIdentifier.EMPTY, true));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testModifyAfterReady() throws RequestException {
+        final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+        b.setSequence(0);
+        b.setReady();
+        final TransactionRequest<?> readyReq = b.build();
+
+        assertNotNull(handleRequest(readyReq));
+        verify(mockParent).finishTransaction(same(shardTransaction));
+
+        b.setSequence(1);
+        b.addModification(mock(TransactionModification.class));
+        handleRequest(b.build());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testReadAfterAbort() throws RequestException {
+        final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(TX_ID, mock(ActorRef.class));
+        b.setSequence(0);
+        b.setAbort();
+        final TransactionRequest<?> abortReq = b.build();
+        assertNull(handleRequest(abortReq));
+        verify(mockParent).abortTransaction(same(shardTransaction), any(Runnable.class));
+
+        handleRequest(new ReadTransactionRequest(TX_ID, 0, mock(ActorRef.class), YangInstanceIdentifier.EMPTY, true));
+    }
+}