BUG-2006: Rework state tracking 17/11417/5
authorRobert Varga <rovarga@cisco.com>
Thu, 18 Sep 2014 13:53:24 +0000 (15:53 +0200)
committerTony Tkacik <ttkacik@cisco.com>
Thu, 16 Oct 2014 13:01:26 +0000 (13:01 +0000)
Separate out the two embedded classes to allow easier analysis of
interactions. Introduce a state-tracking class within
DOMStoreTransactionChainImpl with three distinct states: Idle, Allocated
and Shutdown. The allocated state has an internal sub-state when the
transaction has been readied. Perform atomic state transitions using
compare-and-swap.

Change-Id: I4b852d42455d7e07076c56af3626f804ba572770
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java

diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java
new file mode 100644 (file)
index 0000000..5b0f739
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+final class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
+    private final SnapshotBackedWriteTransaction transaction;
+    private final DOMStoreThreePhaseCommitCohort delegate;
+    private final DOMStoreTransactionChainImpl txChain;
+
+    protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
+            final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
+        this.transaction = Preconditions.checkNotNull(transaction);
+        this.delegate = Preconditions.checkNotNull(delegate);
+        this.txChain = Preconditions.checkNotNull(txChain);
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        return delegate.canCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        return delegate.preCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        return delegate.abort();
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        ListenableFuture<Void> commitFuture = delegate.commit();
+        Futures.addCallback(commitFuture, new FutureCallback<Void>() {
+            @Override
+            public void onFailure(final Throwable t) {
+                txChain.onTransactionFailed(transaction, t);
+            }
+
+            @Override
+            public void onSuccess(final Void result) {
+                txChain.onTransactionCommited(transaction);
+            }
+        });
+        return commitFuture;
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java
new file mode 100644 (file)
index 0000000..3f731cf
--- /dev/null
@@ -0,0 +1,224 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
+    private static abstract class State {
+        /**
+         * Allocate a new snapshot.
+         *
+         * @return A new snapshot
+         */
+        protected abstract DataTreeSnapshot getSnapshot();
+    }
+
+    private static final class Idle extends State {
+        private final InMemoryDOMDataStore store;
+
+        Idle(final InMemoryDOMDataStore store) {
+            this.store = Preconditions.checkNotNull(store);
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            return store.takeSnapshot();
+        }
+    }
+
+    /**
+     * We have a transaction out there.
+     */
+    private static final class Allocated extends State {
+        private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
+        private final DOMStoreWriteTransaction transaction;
+        private volatile DataTreeSnapshot snapshot;
+
+        Allocated(final DOMStoreWriteTransaction transaction) {
+            this.transaction = Preconditions.checkNotNull(transaction);
+        }
+
+        public DOMStoreWriteTransaction getTransaction() {
+            return transaction;
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            final DataTreeSnapshot ret = snapshot;
+            Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
+            return ret;
+        }
+
+        void setSnapshot(final DataTreeSnapshot snapshot) {
+            final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
+            Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier());
+        }
+    }
+
+    /**
+     * Chain is logically shut down, no further allocation allowed.
+     */
+    private static final class Shutdown extends State {
+        private final String message;
+
+        Shutdown(final String message) {
+            this.message = Preconditions.checkNotNull(message);
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            throw new IllegalStateException(message);
+        }
+    }
+
+    private static final AtomicReferenceFieldUpdater<DOMStoreTransactionChainImpl, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(DOMStoreTransactionChainImpl.class, State.class, "state");
+    private static final Logger LOG = LoggerFactory.getLogger(DOMStoreTransactionChainImpl.class);
+    private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed");
+    private static final Shutdown FAILED = new Shutdown("Transaction chain has failed");
+    private final InMemoryDOMDataStore store;
+    private final Idle idleState;
+    private volatile State state;
+
+    DOMStoreTransactionChainImpl(final InMemoryDOMDataStore store) {
+        this.store = Preconditions.checkNotNull(store);
+        idleState = new Idle(store);
+        state = idleState;
+    }
+
+    private Entry<State, DataTreeSnapshot> getSnapshot() {
+        final State localState = state;
+        return new SimpleEntry<>(localState, localState.getSnapshot());
+    }
+
+    private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
+        final State state = new Allocated(transaction);
+        return STATE_UPDATER.compareAndSet(this, expected, state);
+    }
+
+    @Override
+    public DOMStoreReadTransaction newReadOnlyTransaction() {
+        final Entry<State, DataTreeSnapshot> entry = getSnapshot();
+        return new SnapshotBackedReadTransaction(store.nextIdentifier(), store.getDebugTransactions(), entry.getValue());
+    }
+
+    @Override
+    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        Entry<State, DataTreeSnapshot> entry;
+        DOMStoreReadWriteTransaction ret;
+
+        do {
+            entry = getSnapshot();
+            ret = new SnapshotBackedReadWriteTransaction(store.nextIdentifier(),
+                store.getDebugTransactions(), entry.getValue(), this);
+        } while (!recordTransaction(entry.getKey(), ret));
+
+        return ret;
+    }
+
+    @Override
+    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        Entry<State, DataTreeSnapshot> entry;
+        DOMStoreWriteTransaction ret;
+
+        do {
+            entry = getSnapshot();
+            ret = new SnapshotBackedWriteTransaction(store.nextIdentifier(),
+                store.getDebugTransactions(), entry.getValue(), this);
+        } while (!recordTransaction(entry.getKey(), ret));
+
+        return ret;
+    }
+
+    @Override
+    protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+        final State localState = state;
+        if (localState instanceof Allocated) {
+            final Allocated allocated = (Allocated)localState;
+            if (allocated.getTransaction().equals(tx)) {
+                final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
+                if (!success) {
+                    LOG.info("State already transitioned from {} to {}", localState, state);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
+        final State localState = state;
+
+        if (localState instanceof Allocated) {
+            final Allocated allocated = (Allocated)localState;
+            final DOMStoreWriteTransaction transaction = allocated.getTransaction();
+            Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
+            allocated.setSnapshot(tree);
+        } else {
+            LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
+        }
+
+        return new ChainedTransactionCommitImpl(tx, store.transactionReady(tx, tree), this);
+    }
+
+    @Override
+    public void close() {
+        final State localState = state;
+
+        do {
+            Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this);
+
+            if (FAILED.equals(localState)) {
+                LOG.debug("Ignoring user close in failed state");
+                return;
+            }
+        } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED));
+    }
+
+    void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, final Throwable t) {
+        LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, t);
+        state = FAILED;
+    }
+
+    void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
+        // If the committed transaction was the one we allocated last,
+        // we clear it and the ready snapshot, so the next transaction
+        // allocated refers to the data tree directly.
+        final State localState = state;
+
+        if (!(localState instanceof Allocated)) {
+            LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
+            return;
+        }
+
+        final Allocated allocated = (Allocated)localState;
+        final DOMStoreWriteTransaction tx = allocated.getTransaction();
+        if (!tx.equals(transaction)) {
+            LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
+            return;
+        }
+
+        if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
+            LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state);
+        }
+    }
+}
\ No newline at end of file
index 74fa73afb92f869f7cb2e945a625d489b71e71c2..213f60e951cc41794864fcb1fd813a1958469728 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.controller.md.sal.dom.store.impl;
 import static com.google.common.base.Preconditions.checkState;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -18,7 +17,6 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
@@ -142,7 +140,7 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
 
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new DOMStoreTransactionChainImpl();
+        return new DOMStoreTransactionChainImpl(this);
     }
 
     @Override
@@ -164,10 +162,14 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
         }
     }
 
-    boolean getDebugTransactions() {
+    public final boolean getDebugTransactions() {
         return debugTransactions;
     }
 
+    final DataTreeSnapshot takeSnapshot() {
+        return dataTree.takeSnapshot();
+    }
+
     @Override
     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
             final YangInstanceIdentifier path, final L listener, final DataChangeScope scope) {
@@ -219,156 +221,11 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
         return new ThreePhaseCommitImpl(tx, tree);
     }
 
-    private Object nextIdentifier() {
+    Object nextIdentifier() {
         return name + "-" + txCounter.getAndIncrement();
     }
 
-    private class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
-        @GuardedBy("this")
-        private SnapshotBackedWriteTransaction allocatedTransaction;
-        @GuardedBy("this")
-        private DataTreeSnapshot readySnapshot;
-        @GuardedBy("this")
-        private boolean chainFailed = false;
-
-        @GuardedBy("this")
-        private void checkFailed() {
-            Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
-        }
-
-        @GuardedBy("this")
-        private DataTreeSnapshot getSnapshot() {
-            checkFailed();
-
-            if (allocatedTransaction != null) {
-                Preconditions.checkState(readySnapshot != null, "Previous transaction %s is not ready yet", allocatedTransaction.getIdentifier());
-                return readySnapshot;
-            } else {
-                return dataTree.takeSnapshot();
-            }
-        }
-
-        @GuardedBy("this")
-        private <T extends SnapshotBackedWriteTransaction> T recordTransaction(final T transaction) {
-            allocatedTransaction = transaction;
-            readySnapshot = null;
-            return transaction;
-        }
-
-        @Override
-        public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
-            final DataTreeSnapshot snapshot = getSnapshot();
-            return new SnapshotBackedReadTransaction(nextIdentifier(), getDebugTransactions(), snapshot);
-        }
-
-        @Override
-        public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
-            final DataTreeSnapshot snapshot = getSnapshot();
-            return recordTransaction(new SnapshotBackedReadWriteTransaction(nextIdentifier(),
-                    getDebugTransactions(), snapshot, this));
-        }
-
-        @Override
-        public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
-            final DataTreeSnapshot snapshot = getSnapshot();
-            return recordTransaction(new SnapshotBackedWriteTransaction(nextIdentifier(),
-                    getDebugTransactions(), snapshot, this));
-        }
-
-        @Override
-        protected synchronized void transactionAborted(final SnapshotBackedWriteTransaction tx) {
-            if (tx.equals(allocatedTransaction)) {
-                Preconditions.checkState(readySnapshot == null, "Unexpected abort of transaction %s with ready snapshot %s", tx, readySnapshot);
-                allocatedTransaction = null;
-            }
-        }
-
-        @Override
-        protected synchronized DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
-            Preconditions.checkState(tx.equals(allocatedTransaction), "Mis-ordered ready transaction %s last allocated was %s", tx, allocatedTransaction);
-            if (readySnapshot != null) {
-                // The snapshot should have been cleared
-                LOG.warn("Uncleared snapshot {} encountered, overwritten with transaction {} snapshot {}", readySnapshot, tx, tree);
-            }
-
-            final DOMStoreThreePhaseCommitCohort cohort = InMemoryDOMDataStore.this.transactionReady(tx, tree);
-            readySnapshot = tree;
-            return new ChainedTransactionCommitImpl(tx, cohort, this);
-        }
-
-        @Override
-        public void close() {
-            // FIXME: this call doesn't look right here - listeningExecutor is shared and owned
-            // by the outer class.
-            //listeningExecutor.shutdownNow();
-        }
-
-        protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
-                final Throwable t) {
-            chainFailed = true;
-        }
-
-        public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
-            // If the committed transaction was the one we allocated last,
-            // we clear it and the ready snapshot, so the next transaction
-            // allocated refers to the data tree directly.
-            if (transaction.equals(allocatedTransaction)) {
-                if (readySnapshot == null) {
-                    LOG.warn("Transaction {} committed while no ready snapshot present", transaction);
-                }
-
-                allocatedTransaction = null;
-                readySnapshot = null;
-            }
-        }
-    }
-
-    private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
-        private final SnapshotBackedWriteTransaction transaction;
-        private final DOMStoreThreePhaseCommitCohort delegate;
-        private final DOMStoreTransactionChainImpl txChain;
-
-        protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
-                final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
-            this.transaction = transaction;
-            this.delegate = delegate;
-            this.txChain = txChain;
-        }
-
-        @Override
-        public ListenableFuture<Boolean> canCommit() {
-            return delegate.canCommit();
-        }
-
-        @Override
-        public ListenableFuture<Void> preCommit() {
-            return delegate.preCommit();
-        }
-
-        @Override
-        public ListenableFuture<Void> abort() {
-            return delegate.abort();
-        }
-
-        @Override
-        public ListenableFuture<Void> commit() {
-            ListenableFuture<Void> commitFuture = delegate.commit();
-            Futures.addCallback(commitFuture, new FutureCallback<Void>() {
-                @Override
-                public void onFailure(final Throwable t) {
-                    txChain.onTransactionFailed(transaction, t);
-                }
-
-                @Override
-                public void onSuccess(final Void result) {
-                    txChain.onTransactionCommited(transaction);
-                }
-            });
-            return commitFuture;
-        }
-    }
-
-    private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
+    private final class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
         private final SnapshotBackedWriteTransaction transaction;
         private final DataTreeModification modification;