Merge "Replaced Helium Notification Broker with new Notifcation Broker."
authorTom Pantelis <tpanteli@brocade.com>
Fri, 24 Apr 2015 02:51:38 +0000 (02:51 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 24 Apr 2015 02:51:38 +0000 (02:51 +0000)
64 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeInputStreamReader.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/cluster/datastore/node/utils/stream/NormalizedNodeOutputStreamWriter.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractDOMStoreTransaction.java
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedReadTransaction.java [moved from opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadTransaction.java with 80% similarity]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedReadWriteTransaction.java [moved from opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedReadWriteTransaction.java with 78% similarity]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedTransactions.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedWriteTransaction.java [moved from opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/SnapshotBackedWriteTransaction.java with 81% similarity]
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/ChainedTransactionCommitImpl.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/DOMStoreTransactionChainImpl.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMDataStore.java
opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreThreePhaseCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-inmemory-datastore/src/test/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDataStoreTest.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonNormalizedNodeBodyReader.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/XmlNormalizedNodeBodyReader.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.java
opendaylight/netconf/netconf-testtool/src/main/java/org/opendaylight/controller/netconf/test/tool/NetconfDeviceSimulator.java

index 847954816ccd33f1c916ead7641b57200e2303e2..9a916625c9331413685d6263bfe053930b6795bf 100644 (file)
@@ -201,14 +201,16 @@ public class SnapshotManager implements SnapshotState {
 
             LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
 
+            SnapshotManager.this.currentState = CREATING;
+
             try {
                 createSnapshotProcedure.apply(null);
             } catch (Exception e) {
+                SnapshotManager.this.currentState = IDLE;
                 LOG.error("Error creating snapshot", e);
                 return false;
             }
 
-            SnapshotManager.this.currentState = CREATING;
             return true;
         }
 
index 59cc03625ae2f450373c712a9d10f84b62a0e68a..2246b51a3e1428ddca70220c1608a368f9174956 100644 (file)
@@ -337,7 +337,7 @@ public class NormalizedNodeInputStreamReader implements NormalizedNodeStreamRead
         return children;
     }
 
-    private PathArgument readPathArgument() throws IOException {
+    public PathArgument readPathArgument() throws IOException {
         // read Type
         int type = input.readByte();
 
index 055ccfe0ceeeed6ee101b60f4dcda6d0a2a83a9d..1ea94e9862a455bf7b4b5d2fa976d2c065525aa8 100644 (file)
@@ -203,6 +203,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
 
     @Override
     public void close() throws IOException {
+        flush();
     }
 
     @Override
@@ -278,7 +279,7 @@ public class NormalizedNodeOutputStreamWriter implements NormalizedNodeStreamWri
         }
     }
 
-    private void writePathArgument(YangInstanceIdentifier.PathArgument pathArgument) throws IOException {
+    public void writePathArgument(YangInstanceIdentifier.PathArgument pathArgument) throws IOException {
 
         byte type = PathArgumentTypes.getSerializablePathArgumentType(pathArgument);
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java
new file mode 100644 (file)
index 0000000..c3940e5
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+
+/**
+ * Abstract base class for our internal implementation of {@link DataTreeCandidateNode},
+ * which we instantiate from a serialized stream. We do not retain the before-image and
+ * do not implement {@link #getModifiedChild(PathArgument)}, as that method is only
+ * useful for end users. Instances based on this class should never be leaked outside of
+ * this component.
+ */
+abstract class AbstractDataTreeCandidateNode implements DataTreeCandidateNode {
+    private final ModificationType type;
+
+    protected AbstractDataTreeCandidateNode(final ModificationType type) {
+        this.type = Preconditions.checkNotNull(type);
+    }
+
+    @Override
+    public final DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
+        throw new UnsupportedOperationException("Not implemented");
+    }
+
+    @Override
+    public final ModificationType getModificationType() {
+        return type;
+    }
+
+    @Override
+    public final Optional<NormalizedNode<?, ?>> getDataBefore() {
+        throw new UnsupportedOperationException("Before-image not available after serialization");
+    }
+
+    static DataTreeCandidateNode createUnmodified() {
+        return new AbstractDataTreeCandidateNode(ModificationType.UNMODIFIED) {
+            @Override
+            public PathArgument getIdentifier() {
+                throw new UnsupportedOperationException("Root node does not have an identifier");
+            }
+
+            @Override
+            public Optional<NormalizedNode<?, ?>> getDataAfter() {
+                throw new UnsupportedOperationException("After-image not available after serialization");
+            }
+
+            @Override
+            public Collection<DataTreeCandidateNode> getChildNodes() {
+                throw new UnsupportedOperationException("Children not available after serialization");
+            }
+        };
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java
new file mode 100644 (file)
index 0000000..dd8ec0b
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+/**
+ * Abstract base for transactions running on SharrdDataTree.
+ *
+ * @param <T> Backing transaction type.
+ */
+@NotThreadSafe
+abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot> {
+    private final T snapshot;
+    private final String id;
+    private boolean closed;
+
+    protected AbstractShardDataTreeTransaction(final String id, final T snapshot) {
+        this.snapshot = Preconditions.checkNotNull(snapshot);
+        this.id = Preconditions.checkNotNull(id);
+    }
+
+    final T getSnapshot() {
+        return snapshot;
+    }
+
+    final boolean isClosed() {
+        return closed;
+    }
+
+    /**
+     * Close this transaction and mark it as closed, allowing idempotent invocations.
+     *
+     * @return True if the transaction got closed by this method invocation.
+     */
+    protected final boolean close() {
+        if (closed) {
+            return false;
+        }
+
+        closed = true;
+        return true;
+    }
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot).toString();
+    }
+
+    abstract void abort();
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java
new file mode 100644 (file)
index 0000000..4b471cf
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ChainedCommitCohort extends ShardDataTreeCohort {
+    private static final Logger LOG = LoggerFactory.getLogger(ChainedCommitCohort.class);
+    private final ReadWriteShardDataTreeTransaction transaction;
+    private final ShardDataTreeTransactionChain chain;
+    private final ShardDataTreeCohort delegate;
+
+    ChainedCommitCohort(final ShardDataTreeTransactionChain chain, final ReadWriteShardDataTreeTransaction transaction, final ShardDataTreeCohort delegate) {
+        this.transaction = Preconditions.checkNotNull(transaction);
+        this.delegate = Preconditions.checkNotNull(delegate);
+        this.chain = Preconditions.checkNotNull(chain);
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        final ListenableFuture<Void> ret = delegate.commit();
+
+        Futures.addCallback(ret, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess(Void result) {
+                chain.clearTransaction(transaction);
+                LOG.debug("Committed transaction {}", transaction);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOG.error("Transaction {} commit failed, cannot recover", transaction, t);
+            }
+        });
+
+        return ret;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        return delegate.canCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        return delegate.preCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        return delegate.abort();
+    }
+
+    @Override
+    DataTreeCandidateTip getCandidate() {
+        return delegate.getCandidate();
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java
deleted file mode 100644 (file)
index f243620..0000000
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
-import org.slf4j.Logger;
-
-/**
- * A factory for creating DOM transactions, either normal or chained.
- *
- * @author Thomas Pantelis
- */
-public class DOMTransactionFactory {
-
-    private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
-    private final InMemoryDOMDataStore store;
-    private final ShardStats shardMBean;
-    private final Logger log;
-    private final String name;
-
-    public DOMTransactionFactory(InMemoryDOMDataStore store, ShardStats shardMBean, Logger log, String name) {
-        this.store = store;
-        this.shardMBean = shardMBean;
-        this.log = log;
-        this.name = name;
-    }
-
-    @SuppressWarnings("unchecked")
-    public <T extends DOMStoreTransaction> T newTransaction(TransactionProxy.TransactionType type,
-            String transactionID, String transactionChainID) {
-
-        DOMStoreTransactionFactory factory = store;
-
-        if(!transactionChainID.isEmpty()) {
-            factory = transactionChains.get(transactionChainID);
-            if(factory == null) {
-                if(log.isDebugEnabled()) {
-                    log.debug("{}: Creating transaction with ID {} from chain {}", name, transactionID,
-                            transactionChainID);
-                }
-
-                DOMStoreTransactionChain transactionChain = store.createTransactionChain();
-                transactionChains.put(transactionChainID, transactionChain);
-                factory = transactionChain;
-            }
-        } else {
-            log.debug("{}: Creating transaction with ID {}", name, transactionID);
-        }
-
-        T transaction = null;
-        switch(type) {
-            case READ_ONLY:
-                transaction = (T) factory.newReadOnlyTransaction();
-                shardMBean.incrementReadOnlyTransactionCount();
-                break;
-            case READ_WRITE:
-                transaction = (T) factory.newReadWriteTransaction();
-                shardMBean.incrementReadWriteTransactionCount();
-                break;
-            case WRITE_ONLY:
-                transaction = (T) factory.newWriteOnlyTransaction();
-                shardMBean.incrementWriteOnlyTransactionCount();
-                break;
-        }
-
-        return transaction;
-    }
-
-    public void closeTransactionChain(String transactionChainID) {
-        DOMStoreTransactionChain chain =
-                transactionChains.remove(transactionChainID);
-
-        if(chain != null) {
-            chain.close();
-        }
-    }
-
-    public void closeAllTransactionChains() {
-        for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
-            entry.getValue().close();
-        }
-
-        transactionChains.clear();
-    }
-}
index 939ddf8fad842ac947b427f28a09ee3810f8de42..e6f63d7154ba6d5f301ed7fd4b77f111d0ea8034 100644 (file)
@@ -7,21 +7,24 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.ArrayList;
-import java.util.List;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener, ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
+final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener, ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> {
     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
     private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
     private final List<ActorSelection> dataChangeListeners =  new ArrayList<>();
@@ -39,7 +42,12 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
         if (isLeader) {
             for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
                 if(!reg.isClosed()) {
-                    reg.setDelegate(createDelegate(reg.getRegisterChangeListener()));
+                    final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
+                            createDelegate(reg.getRegisterChangeListener());
+                    reg.setDelegate(res.getKey());
+                    if (res.getValue() != null) {
+                        reg.getInstance().onDataChanged(res.getValue());
+                    }
                 }
             }
 
@@ -52,16 +60,21 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
 
         LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader);
 
-        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                                      NormalizedNode<?, ?>>> registration;
+        final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
         if (isLeader) {
-            registration = createDelegate(message);
+            final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
+                    createDelegate(message);
+            registration = res.getKey();
+            event = res.getValue();
         } else {
             LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
             DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
             delayedListenerRegistrations.add(delayedReg);
             registration = delayedReg;
+            event = null;
         }
 
         ActorRef listenerRegistration = createActor(DataChangeListenerRegistration.props(registration));
@@ -70,10 +83,13 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
                 persistenceId(), listenerRegistration.path());
 
         tellSender(new RegisterChangeListenerReply(listenerRegistration));
+        if (event != null) {
+            registration.getInstance().onDataChanged(event);
+        }
     }
 
     @Override
-    ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> createDelegate(
+    Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> createDelegate(
             final RegisterChangeListener message) {
         ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java
new file mode 100644 (file)
index 0000000..54167b2
--- /dev/null
@@ -0,0 +1,222 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.GeneratedMessage.GeneratedExtension;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages.AppendEntries;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DataTreeCandidatePayload extends Payload implements Externalizable {
+    private static final Logger LOG = LoggerFactory.getLogger(DataTreeCandidatePayload.class);
+    private static final long serialVersionUID = 1L;
+    private static final byte DELETE = 0;
+    private static final byte SUBTREE_MODIFIED = 1;
+    private static final byte UNMODIFIED = 2;
+    private static final byte WRITE = 3;
+
+    private transient byte[] serialized;
+
+    public DataTreeCandidatePayload() {
+        // Required by Externalizable
+    }
+
+    private DataTreeCandidatePayload(final byte[] serialized) {
+        this.serialized = Preconditions.checkNotNull(serialized);
+    }
+
+    private static void writeChildren(final NormalizedNodeOutputStreamWriter writer, final DataOutput out,
+            final Collection<DataTreeCandidateNode> children) throws IOException {
+        out.writeInt(children.size());
+        for (DataTreeCandidateNode child : children) {
+            writeNode(writer, out, child);
+        }
+    }
+
+    private static void writeNode(final NormalizedNodeOutputStreamWriter writer, final DataOutput out,
+            final DataTreeCandidateNode node) throws IOException {
+        switch (node.getModificationType()) {
+        case DELETE:
+            out.writeByte(DELETE);
+            writer.writePathArgument(node.getIdentifier());
+            break;
+        case SUBTREE_MODIFIED:
+            out.writeByte(SUBTREE_MODIFIED);
+            writer.writePathArgument(node.getIdentifier());
+            writeChildren(writer, out, node.getChildNodes());
+            break;
+        case WRITE:
+            out.writeByte(WRITE);
+            writer.writeNormalizedNode(node.getDataAfter().get());
+            break;
+        case UNMODIFIED:
+            throw new IllegalArgumentException("Unmodified candidate should never be in the payload");
+        default:
+            throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
+        }
+    }
+
+    static DataTreeCandidatePayload create(DataTreeCandidate candidate) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        try (final NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(out)) {
+            writer.writeYangInstanceIdentifier(candidate.getRootPath());
+
+            final DataTreeCandidateNode node = candidate.getRootNode();
+            switch (node.getModificationType()) {
+            case DELETE:
+                out.writeByte(DELETE);
+                break;
+            case SUBTREE_MODIFIED:
+                out.writeByte(SUBTREE_MODIFIED);
+                writeChildren(writer, out, node.getChildNodes());
+                break;
+            case UNMODIFIED:
+                out.writeByte(UNMODIFIED);
+                break;
+            case WRITE:
+                out.writeByte(WRITE);
+                writer.writeNormalizedNode(node.getDataAfter().get());
+                break;
+            default:
+                throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
+            }
+
+            writer.close();
+        } catch (IOException e) {
+            throw new IllegalArgumentException(String.format("Failed to serialize candidate %s", candidate), e);
+        }
+
+        return new DataTreeCandidatePayload(out.toByteArray());
+    }
+
+    private static Collection<DataTreeCandidateNode> readChildren(final NormalizedNodeInputStreamReader reader,
+        final DataInput in) throws IOException {
+        final int size = in.readInt();
+        if (size != 0) {
+            final Collection<DataTreeCandidateNode> ret = new ArrayList<>(size);
+            for (int i = 0; i < size; ++i) {
+                final DataTreeCandidateNode child = readNode(reader, in);
+                if (child != null) {
+                    ret.add(child);
+                }
+            }
+            return ret;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+    private static DataTreeCandidateNode readNode(final NormalizedNodeInputStreamReader reader,
+            final DataInput in) throws IOException {
+        final byte type = in.readByte();
+        switch (type) {
+        case DELETE:
+            return DeletedDataTreeCandidateNode.create(reader.readPathArgument());
+        case SUBTREE_MODIFIED:
+            final PathArgument identifier = reader.readPathArgument();
+            final Collection<DataTreeCandidateNode> children = readChildren(reader, in);
+            if (children.isEmpty()) {
+                LOG.debug("Modified node {} does not have any children, not instantiating it", identifier);
+                return null;
+            } else {
+                return ModifiedDataTreeCandidateNode.create(identifier, children);
+            }
+        case UNMODIFIED:
+            return null;
+        case WRITE:
+            return DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode());
+        default:
+            throw new IllegalArgumentException("Unhandled node type " + type);
+        }
+    }
+
+    private static DataTreeCandidate parseCandidate(final ByteArrayDataInput in) throws IOException {
+        final NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(in);
+        final YangInstanceIdentifier rootPath = reader.readYangInstanceIdentifier();
+        final byte type = in.readByte();
+
+        final DataTreeCandidateNode rootNode;
+        switch (type) {
+        case DELETE:
+            rootNode = DeletedDataTreeCandidateNode.create();
+            break;
+        case SUBTREE_MODIFIED:
+            rootNode = ModifiedDataTreeCandidateNode.create(readChildren(reader, in));
+            break;
+        case WRITE:
+            rootNode = DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode());
+            break;
+        default:
+            throw new IllegalArgumentException("Unhandled node type " + type);
+        }
+
+        return DataTreeCandidates.newDataTreeCandidate(rootPath, rootNode);
+    }
+
+    DataTreeCandidate getCandidate() throws IOException {
+        return parseCandidate(ByteStreams.newDataInput(serialized));
+    }
+
+    @Override
+    @Deprecated
+    @SuppressWarnings("rawtypes")
+    public <T> Map<GeneratedExtension, T> encode() {
+        return null;
+    }
+
+    @Override
+    @Deprecated
+    public Payload decode(final AppendEntries.ReplicatedLogEntry.Payload payload) {
+        return null;
+    }
+
+    @Override
+    public int size() {
+        return serialized.length;
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeByte((byte)serialVersionUID);
+        out.writeInt(serialized.length);
+        out.write(serialized);
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        final long version = in.readByte();
+        Preconditions.checkArgument(version == serialVersionUID, "Unsupported serialization version %s", version);
+
+        final int length = in.readInt();
+        serialized = new byte[length];
+        in.readFully(serialized);
+    }
+}
index 3987c9af359a31ec7dcde5c2fa6e33911ed2a19d..db5eeb83e70eedd4101cceeabd5491a2c5b3a47e 100644 (file)
@@ -11,15 +11,18 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> {
+final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> {
     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class);
     private final ArrayList<DelayedDataTreeListenerRegistration> delayedRegistrations = new ArrayList<>();
     private final Collection<ActorSelection> actors = new ArrayList<>();
@@ -49,6 +52,7 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<Reg
         LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader);
 
         final ListenerRegistration<DOMDataTreeChangeListener> registration;
+        final DataTreeCandidate event;
         if (!isLeader) {
             LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
@@ -56,8 +60,11 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<Reg
                     new DelayedDataTreeListenerRegistration(registerTreeChangeListener);
             delayedRegistrations.add(delayedReg);
             registration = delayedReg;
+            event = null;
         } else {
-            registration = createDelegate(registerTreeChangeListener);
+            final Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> res = createDelegate(registerTreeChangeListener);
+            registration = res.getKey();
+            event = res.getValue();
         }
 
         ActorRef listenerRegistration = createActor(DataTreeChangeListenerRegistrationActor.props(registration));
@@ -66,10 +73,13 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<Reg
             persistenceId(), listenerRegistration.path());
 
         tellSender(new RegisterDataTreeChangeListenerReply(listenerRegistration));
+        if (event != null) {
+            registration.getInstance().onDataTreeChanged(Collections.singletonList(event));
+        }
     }
 
     @Override
-    ListenerRegistration<DOMDataTreeChangeListener> createDelegate(final RegisterDataTreeChangeListener message) {
+    Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> createDelegate(final RegisterDataTreeChangeListener message) {
         ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
index b3ae8a3ca2d6fcd0c9f68e2a3c3a65a4ac439b26..e8cd31097b872d222d6f886c173b362de26fb804 100644 (file)
@@ -8,10 +8,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.Map.Entry;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 /**
  * Intermediate proxy registration returned to the user when we cannot
@@ -28,9 +31,13 @@ final class DelayedDataTreeListenerRegistration implements ListenerRegistration<
         this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener);
     }
 
-    synchronized void createDelegate(final DelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> factory) {
+    synchronized void createDelegate(final DelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> factory) {
         if (!closed) {
-            this.delegate = factory.createDelegate(registerTreeChangeListener);
+            final Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> res = factory.createDelegate(registerTreeChangeListener);
+            this.delegate = res.getKey();
+            if (res.getValue() != null) {
+                delegate.getInstance().onDataTreeChanged(Collections.singletonList(res.getValue()));
+            }
         }
     }
 
index e6702d90f1f22e0ee01e7e6624ee77011003ef16..3b9b7adc6b951731e8bc829b44032bebf0de7db7 100644 (file)
@@ -7,12 +7,15 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.Map.Entry;
+
 /**
  * Base class for factories instantiating delegates.
  *
  * <D> delegate type
  * <M> message type
+ * <I> initial state type
  */
-abstract class DelegateFactory<M, D> {
-    abstract D createDelegate(M message);
+abstract class DelegateFactory<M, D, I> {
+    abstract Entry<D, I> createDelegate(M message);
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java
new file mode 100644 (file)
index 0000000..2df380b
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+
+/**
+ * A deserialized {@link DataTreeCandidateNode} which represents a deletion.
+ */
+abstract class DeletedDataTreeCandidateNode extends AbstractDataTreeCandidateNode {
+    private DeletedDataTreeCandidateNode() {
+        super(ModificationType.DELETE);
+    }
+
+    static DataTreeCandidateNode create() {
+        return new DeletedDataTreeCandidateNode() {
+            @Override
+            public PathArgument getIdentifier() {
+                throw new UnsupportedOperationException("Root node does not have an identifier");
+            }
+        };
+    }
+
+    static DataTreeCandidateNode create(final PathArgument identifier) {
+        return new DeletedDataTreeCandidateNode() {
+            @Override
+            public final PathArgument getIdentifier() {
+                return identifier;
+            }
+        };
+    }
+
+    @Override
+    public final Optional<NormalizedNode<?, ?>> getDataAfter() {
+        return Optional.absent();
+    }
+
+    @Override
+    public final Collection<DataTreeCandidateNode> getChildNodes() {
+        // We would require the before-image to reconstruct the list of nodes which
+        // were deleted.
+        throw new UnsupportedOperationException("Children not available after serialization");
+    }
+}
index d33cebbebc2f7680dff2097f8b3ab3745f861c26..3f927736b5a8eaf5ee1be0e355a43f616fb2d9a1 100644 (file)
@@ -19,8 +19,9 @@ import com.google.common.base.Preconditions;
  *
  * <D> delegate type
  * <M> message type
+ * <I> initial state type
  */
-abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
+abstract class LeaderLocalDelegateFactory<M, D, I> extends DelegateFactory<M, D, I> {
     private final Shard shard;
 
     protected LeaderLocalDelegateFactory(final Shard shard) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java
new file mode 100644 (file)
index 0000000..208ec33
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+
+/**
+ * A deserialized {@link DataTreeCandidateNode} which represents a modification in
+ * one of its children.
+ */
+abstract class ModifiedDataTreeCandidateNode extends AbstractDataTreeCandidateNode {
+    private final Collection<DataTreeCandidateNode> children;
+
+    private ModifiedDataTreeCandidateNode(final Collection<DataTreeCandidateNode> children) {
+        super(ModificationType.SUBTREE_MODIFIED);
+        this.children = Preconditions.checkNotNull(children);
+    }
+
+    static DataTreeCandidateNode create(final Collection<DataTreeCandidateNode> children) {
+        return new ModifiedDataTreeCandidateNode(children) {
+            @Override
+            public PathArgument getIdentifier() {
+                throw new UnsupportedOperationException("Root node does not have an identifier");
+            }
+        };
+    }
+
+    static DataTreeCandidateNode create(final PathArgument identifier, final Collection<DataTreeCandidateNode> children) {
+        return new ModifiedDataTreeCandidateNode(children) {
+            @Override
+            public final PathArgument getIdentifier() {
+                return identifier;
+            }
+        };
+    }
+
+    @Override
+    public final Optional<NormalizedNode<?, ?>> getDataAfter() {
+        throw new UnsupportedOperationException("After-image not available after serialization");
+    }
+
+    @Override
+    public final Collection<DataTreeCandidateNode> getChildNodes() {
+        return children;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java
new file mode 100644 (file)
index 0000000..5926568
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+final class ReadOnlyShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeSnapshot> {
+    ReadOnlyShardDataTreeTransaction(final String id, final DataTreeSnapshot snapshot) {
+        super(id, snapshot);
+    }
+
+    @Override
+    void abort() {
+        close();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java
new file mode 100644 (file)
index 0000000..cb17335
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+
+final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
+    private final ShardDataTreeTransactionParent parent;
+
+    protected ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final String id, final DataTreeModification modification) {
+        super(id, modification);
+        this.parent = Preconditions.checkNotNull(parent);
+    }
+
+    @Override
+    void abort() {
+        Preconditions.checkState(close(), "Transaction is already closed");
+
+        parent.abortTransaction(this);
+    }
+
+    ShardDataTreeCohort ready() {
+        Preconditions.checkState(close(), "Transaction is already closed");
+
+        return parent.finishTransaction(this);
+    }
+}
index 91e072b076ef7c68116009c94127df859ef7540b..62d3259a714059f497ca3e97ad08166a3a1106bf 100644 (file)
@@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
@@ -64,9 +63,9 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -74,7 +73,7 @@ import scala.concurrent.duration.FiniteDuration;
 /**
  * A Shard represents a portion of the logical data tree <br/>
  * <p>
- * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ * Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
  * </p>
  */
 public class Shard extends RaftActor {
@@ -85,7 +84,7 @@ public class Shard extends RaftActor {
     static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
-    private final InMemoryDOMDataStore store;
+    private final ShardDataTree store;
 
     /// The name of this shard
     private final String name;
@@ -104,8 +103,6 @@ public class Shard extends RaftActor {
 
     private final MessageTracker appendEntriesReplyTracker;
 
-    private final DOMTransactionFactory domTransactionFactory;
-
     private final ShardTransactionActorFactory transactionActorFactory;
 
     private final ShardSnapshotCohort snapshotCohort;
@@ -124,25 +121,17 @@ public class Shard extends RaftActor {
 
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
-        store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
-                datastoreContext.getDataStoreProperties());
-
-        if (schemaContext != null) {
-            store.onGlobalContextUpdated(schemaContext);
-        }
+        store = new ShardDataTree(schemaContext);
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
                 datastoreContext.getDataStoreMXBeanType());
-        shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
         shardMBean.setShardActor(getSelf());
 
         if (isMetricsCaptureEnabled()) {
             getContext().become(new MeteringBehavior(this));
         }
 
-        domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
-
-        commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
+        commitCoordinator = new ShardCommitCoordinator(store,
                 TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
 
@@ -154,7 +143,7 @@ public class Shard extends RaftActor {
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
 
-        transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+        transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(
                         Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
 
@@ -305,15 +294,21 @@ public class Shard extends RaftActor {
         }
     }
 
+    private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
+        return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
+    }
+
     void continueCommit(final CohortEntry cohortEntry) throws Exception {
+        final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate();
+
         // If we do not have any followers and we are not using persistence
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
-        if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
-            applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification());
+        if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
+            applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
         } else {
             Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
-                    new ModificationPayload(cohortEntry.getModification()));
+                DataTreeCandidatePayload.create(candidate));
         }
     }
 
@@ -323,12 +318,37 @@ public class Shard extends RaftActor {
         }
     }
 
+    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+        LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
+
+        try {
+            // We block on the future here so we don't have to worry about possibly accessing our
+            // state on a different thread outside of our dispatcher. Also, the data store
+            // currently uses a same thread executor anyway.
+            cohortEntry.getCohort().commit().get();
+
+            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
+
+            shardMBean.incrementCommittedTransactionCount();
+            shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+
+        } catch (Exception e) {
+            sender.tell(new akka.actor.Status.Failure(e), getSelf());
+
+            LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+                    transactionID, e);
+            shardMBean.incrementFailedTransactionsCount();
+        } finally {
+            commitCoordinator.currentTransactionComplete(transactionID, true);
+        }
+    }
+
     private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
         // With persistence enabled, this method is called via applyState by the leader strategy
         // after the commit has been replicated to a majority of the followers.
 
         CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
-        if(cohortEntry == null) {
+        if (cohortEntry == null) {
             // The transaction is no longer the current commit. This can happen if the transaction
             // was aborted prior, most likely due to timeout in the front-end. We need to finish
             // committing the transaction though since it was successfully persisted and replicated
@@ -337,7 +357,13 @@ public class Shard extends RaftActor {
             // transaction.
             cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
             if(cohortEntry != null) {
-                commitWithNewTransaction(cohortEntry.getModification());
+                try {
+                    store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate());
+                } catch (DataValidationFailedException e) {
+                    shardMBean.incrementFailedTransactionsCount();
+                    LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
+                }
+
                 sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
             } else {
                 // This really shouldn't happen - it likely means that persistence or replication
@@ -348,31 +374,8 @@ public class Shard extends RaftActor {
                 LOG.error(ex.getMessage());
                 sender.tell(new akka.actor.Status.Failure(ex), getSelf());
             }
-
-            return;
-        }
-
-        LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
-
-        try {
-            // We block on the future here so we don't have to worry about possibly accessing our
-            // state on a different thread outside of our dispatcher. Also, the data store
-            // currently uses a same thread executor anyway.
-            cohortEntry.getCohort().commit().get();
-
-            sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
-
-            shardMBean.incrementCommittedTransactionCount();
-            shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-
-        } catch (Exception e) {
-            sender.tell(new akka.actor.Status.Failure(e), getSelf());
-
-            LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
-                    transactionID, e);
-            shardMBean.incrementFailedTransactionsCount();
-        } finally {
-            commitCoordinator.currentTransactionComplete(transactionID, true);
+        } else {
+            finishCommit(sender, transactionID, cohortEntry);
         }
     }
 
@@ -474,7 +477,7 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+        store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
@@ -515,13 +518,13 @@ public class Shard extends RaftActor {
     }
 
     private void commitWithNewTransaction(final Modification modification) {
-        DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
-        modification.apply(tx);
+        ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+        modification.apply(tx.getSnapshot());
         try {
             snapshotCohort.syncCommitTransaction(tx);
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (Exception e) {
             shardMBean.incrementFailedTransactionsCount();
             LOG.error("{}: Failed to commit", persistenceId(), e);
         }
@@ -533,7 +536,7 @@ public class Shard extends RaftActor {
 
     @VisibleForTesting
     void updateSchemaContext(final SchemaContext schemaContext) {
-        store.onGlobalContextUpdated(schemaContext);
+        store.updateSchemaContext(schemaContext);
     }
 
     private boolean isMetricsCaptureEnabled() {
@@ -570,15 +573,25 @@ public class Shard extends RaftActor {
 
     @Override
     protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
-
-        if(data instanceof ModificationPayload) {
+        if (data instanceof DataTreeCandidatePayload) {
+            if (clientActor == null) {
+                // No clientActor indicates a replica coming from the leader
+                try {
+                    store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+                } catch (DataValidationFailedException | IOException e) {
+                    LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
+                }
+            } else {
+                // Replication consensus reached, proceed to commit
+                finishCommit(clientActor, identifier);
+            }
+        } else if (data instanceof ModificationPayload) {
             try {
                 applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
             } catch (ClassNotFoundException | IOException e) {
                 LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
             }
-        }
-        else if (data instanceof CompositeModificationPayload) {
+        } else if (data instanceof CompositeModificationPayload) {
             Object modification = ((CompositeModificationPayload) data).getModification();
 
             applyModificationToState(clientActor, identifier, modification);
@@ -622,7 +635,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
-            domTransactionFactory.closeAllTransactionChains();
+            store.closeAllTransactionChains();
         }
     }
 
@@ -666,7 +679,7 @@ public class Shard extends RaftActor {
     }
 
     @VisibleForTesting
-    public InMemoryDOMDataStore getDataStore() {
+    public ShardDataTree getDataStore() {
         return store;
     }
 
index 4ff9b5fd4353e5857ec6533c7a0cfc4ee6ec4ee2..30947fa6662b4a56d5b091cfe3133d019c9f9a24 100644 (file)
@@ -30,8 +30,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.slf4j.Logger;
 
 /**
@@ -43,14 +41,14 @@ public class ShardCommitCoordinator {
 
     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     public interface CohortDecorator {
-        DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
+        ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
     }
 
     private final Cache<String, CohortEntry> cohortCache;
 
     private CohortEntry currentCohortEntry;
 
-    private final DOMTransactionFactory transactionFactory;
+    private final ShardDataTree dataTree;
 
     private final Queue<CohortEntry> queuedCohortEntries;
 
@@ -75,13 +73,13 @@ public class ShardCommitCoordinator {
 
     private ReadyTransactionReply readyTransactionReply;
 
-    public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
+    public ShardCommitCoordinator(ShardDataTree dataTree,
             long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
 
         this.queueCapacity = queueCapacity;
         this.log = log;
         this.name = name;
-        this.transactionFactory = transactionFactory;
+        this.dataTree = Preconditions.checkNotNull(dataTree);
 
         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
                 removalListener(cacheRemovalListener).build();
@@ -162,8 +160,7 @@ public class ShardCommitCoordinator {
         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
-                    transactionFactory.<DOMStoreWriteTransaction>newTransaction(
-                        TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
+                    dataTree.newReadWriteTransaction(batched.getTransactionID(),
                         batched.getTransactionChainID()));
             cohortCache.put(batched.getTransactionID(), cohortEntry);
         }
@@ -415,25 +412,22 @@ public class ShardCommitCoordinator {
 
     static class CohortEntry {
         private final String transactionID;
-        private DOMStoreThreePhaseCommitCohort cohort;
-        private final MutableCompositeModification compositeModification;
-        private final DOMStoreWriteTransaction transaction;
+        private ShardDataTreeCohort cohort;
+        private final ReadWriteShardDataTreeTransaction transaction;
         private ActorRef replySender;
         private Shard shard;
         private long lastAccessTime;
         private boolean doImmediateCommit;
 
-        CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
-            this.compositeModification = new MutableCompositeModification();
-            this.transaction = transaction;
+        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
+            this.transaction = Preconditions.checkNotNull(transaction);
             this.transactionID = transactionID;
         }
 
-        CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
+        CohortEntry(String transactionID, ShardDataTreeCohort cohort,
                 MutableCompositeModification compositeModification) {
             this.transactionID = transactionID;
             this.cohort = cohort;
-            this.compositeModification = compositeModification;
             this.transaction = null;
         }
 
@@ -449,18 +443,13 @@ public class ShardCommitCoordinator {
             return transactionID;
         }
 
-        DOMStoreThreePhaseCommitCohort getCohort() {
+        ShardDataTreeCohort getCohort() {
             return cohort;
         }
 
-        MutableCompositeModification getModification() {
-            return compositeModification;
-        }
-
         void applyModifications(Iterable<Modification> modifications) {
-            for(Modification modification: modifications) {
-                compositeModification.addModification(modification);
-                modification.apply(transaction);
+            for (Modification modification : modifications) {
+                modification.apply(transaction.getSnapshot());
             }
         }
 
@@ -500,9 +489,5 @@ public class ShardCommitCoordinator {
         void setShard(Shard shard) {
             this.shard = shard;
         }
-
-        boolean hasModifications(){
-            return compositeModification.getModifications().size() > 0;
-        }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
new file mode 100644 (file)
index 0000000..fbe6992
--- /dev/null
@@ -0,0 +1,180 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
+ * e.g. it does not expose public interfaces and assumes it is only ever called from a
+ * single thread.
+ *
+ * This class is not part of the API contract and is subject to change at any time.
+ */
+@NotThreadSafe
+@VisibleForTesting
+public final class ShardDataTree extends ShardDataTreeTransactionParent {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
+    private static final ShardDataTreeNotificationManager MANAGER = new ShardDataTreeNotificationManager();
+    private final Map<String, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+    private final ShardDataTreeChangePublisher treeChangePublisher = new ShardDataTreeChangePublisher();
+    private final ListenerTree listenerTree = ListenerTree.create();
+    private final TipProducingDataTree dataTree;
+
+    ShardDataTree(final SchemaContext schemaContext) {
+        dataTree = InMemoryDataTreeFactory.getInstance().create();
+        if (schemaContext != null) {
+            dataTree.setSchemaContext(schemaContext);
+        }
+    }
+
+    TipProducingDataTree getDataTree() {
+        return dataTree;
+    }
+
+    void updateSchemaContext(final SchemaContext schemaContext) {
+        dataTree.setSchemaContext(schemaContext);
+    }
+
+    private ShardDataTreeTransactionChain ensureTransactionChain(final String chainId) {
+        ShardDataTreeTransactionChain chain = transactionChains.get(chainId);
+        if (chain == null) {
+            chain = new ShardDataTreeTransactionChain(chainId, this);
+            transactionChains.put(chainId, chain);
+        }
+
+        return chain;
+    }
+
+    ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final String txId, final String chainId) {
+        if (Strings.isNullOrEmpty(chainId)) {
+            return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot());
+        }
+
+        return ensureTransactionChain(chainId).newReadOnlyTransaction(txId);
+    }
+
+    ReadWriteShardDataTreeTransaction newReadWriteTransaction(final String txId, final String chainId) {
+        if (Strings.isNullOrEmpty(chainId)) {
+            return new ReadWriteShardDataTreeTransaction(this, txId, dataTree.takeSnapshot().newModification());
+        }
+
+        return ensureTransactionChain(chainId).newReadWriteTransaction(txId);
+    }
+
+    void notifyListeners(final DataTreeCandidate candidate) {
+        LOG.debug("Notifying listeners on candidate {}", candidate);
+
+        // DataTreeChanges first, as they are more light-weight
+        treeChangePublisher.publishChanges(candidate);
+
+        // DataChanges second, as they are heavier
+        ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(MANAGER);
+    }
+
+    void closeAllTransactionChains() {
+        for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
+            chain.close();
+        }
+
+        transactionChains.clear();
+    }
+
+    void closeTransactionChain(final String transactionChainId) {
+        final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId);
+        if (chain != null) {
+            chain.close();
+        } else {
+            LOG.warn("Closing non-existent transaction chain {}", transactionChainId);
+        }
+    }
+
+    Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> registerChangeListener(
+            final YangInstanceIdentifier path,
+            final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, final DataChangeScope scope) {
+        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
+                listenerTree.registerDataChangeListener(path, listener, scope);
+
+        final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
+        final DOMImmutableDataChangeEvent event;
+        if (currentState.isPresent()) {
+            final NormalizedNode<?, ?> data = currentState.get();
+            event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE).setAfter(data).addCreated(path, data).build();
+        } else {
+            event = null;
+        }
+
+        return new SimpleEntry<>(reg, event);
+    }
+
+    Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> registerTreeChangeListener(final YangInstanceIdentifier path,
+            final DOMDataTreeChangeListener listener) {
+        final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangePublisher.registerTreeChangeListener(path, listener);
+
+        final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
+        final DataTreeCandidate event;
+        if (currentState.isPresent()) {
+            event = DataTreeCandidates.fromNormalizedNode(path, currentState.get());
+        } else {
+            event = null;
+        }
+        return new SimpleEntry<>(reg, event);
+    }
+
+    void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
+        LOG.debug("Applying foreign transaction {}", identifier);
+
+        final DataTreeModification mod = dataTree.takeSnapshot().newModification();
+        DataTreeCandidates.applyToModification(mod, foreign);
+        mod.ready();
+
+        LOG.trace("Applying foreign modification {}", mod);
+        dataTree.validate(mod);
+        final DataTreeCandidate candidate = dataTree.prepare(mod);
+        dataTree.commit(candidate);
+        notifyListeners(candidate);
+    }
+
+    @Override
+    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
+        // Intentional no-op
+    }
+
+    @Override
+    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+        final DataTreeModification snapshot = transaction.getSnapshot();
+        snapshot.ready();
+        return new SimpleShardDataTreeCohort(this, snapshot);
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java
new file mode 100644 (file)
index 0000000..5e24dcd
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collection;
+import java.util.Collections;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.spi.DefaultDataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@NotThreadSafe
+final class ShardDataTreeChangePublisher extends AbstractDOMStoreTreeChangePublisher {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeChangePublisher.class);
+
+    void publishChanges(final DataTreeCandidate candidate) {
+        processCandidateTree(candidate);
+    }
+
+    @Override
+    protected void notifyListeners(final Collection<AbstractDOMDataTreeChangeListenerRegistration<?>> registrations,
+            final YangInstanceIdentifier path, final DataTreeCandidateNode node) {
+        final Collection<DataTreeCandidate> changes = Collections.<DataTreeCandidate>singleton(new DefaultDataTreeCandidate(path, node));
+
+        for (AbstractDOMDataTreeChangeListenerRegistration<?> reg : registrations) {
+            reg.getInstance().onDataTreeChanged(changes);
+        }
+    }
+
+    @Override
+    protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
+        LOG.debug("Registration {} removed", registration);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
new file mode 100644 (file)
index 0000000..213e36a
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+
+public abstract class ShardDataTreeCohort {
+    ShardDataTreeCohort() {
+        // Prevent foreign instantiation
+    }
+
+    abstract DataTreeCandidateTip getCandidate();
+
+    @VisibleForTesting
+    public abstract ListenableFuture<Boolean> canCommit();
+    @VisibleForTesting
+    public abstract ListenableFuture<Void> preCommit();
+    @VisibleForTesting
+    public abstract ListenableFuture<Void> abort();
+    @VisibleForTesting
+    public abstract ListenableFuture<Void> commit();
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java
new file mode 100644 (file)
index 0000000..8a54fc6
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ShardDataTreeNotificationManager implements NotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeNotificationManager.class);
+
+    @Override
+    public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
+        LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification);
+
+        listener.getInstance().onDataChanged(notification);
+    }
+
+    @Override
+    public void submitNotifications(final DataChangeListenerRegistration<?> listener, final Iterable<DOMImmutableDataChangeEvent> notifications) {
+        final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
+        LOG.debug("Notifying listener {} about {}", instance, notifications);
+
+        for (DOMImmutableDataChangeEvent n : notifications) {
+            instance.onDataChanged(n);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
new file mode 100644 (file)
index 0000000..183c219
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A transaction chain attached to a Shard.
+ */
+@NotThreadSafe
+final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class);
+    private final ShardDataTree dataTree;
+    private final String chainId;
+
+    private ReadWriteShardDataTreeTransaction previousTx;
+    private ReadWriteShardDataTreeTransaction openTransaction;
+    private boolean closed;
+
+    ShardDataTreeTransactionChain(final String chainId, final ShardDataTree dataTree) {
+        this.dataTree = Preconditions.checkNotNull(dataTree);
+        this.chainId = Preconditions.checkNotNull(chainId);
+    }
+
+    private DataTreeSnapshot getSnapshot() {
+        Preconditions.checkState(!closed, "TransactionChain %s has been closed", this);
+        Preconditions.checkState(openTransaction == null, "Transaction %s is open", openTransaction);
+
+        if (previousTx == null) {
+            return dataTree.getDataTree().takeSnapshot();
+        } else {
+            return previousTx.getSnapshot();
+        }
+    }
+
+    ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final String txId) {
+        final DataTreeSnapshot snapshot = getSnapshot();
+        LOG.debug("Allocated read-only transaction {} snapshot {}", txId, snapshot);
+
+        return new ReadOnlyShardDataTreeTransaction(txId, snapshot);
+    }
+
+    ReadWriteShardDataTreeTransaction newReadWriteTransaction(final String txId) {
+        final DataTreeSnapshot snapshot = getSnapshot();
+        LOG.debug("Allocated read-write transaction {} snapshot {}", txId, snapshot);
+
+        openTransaction = new ReadWriteShardDataTreeTransaction(this, txId, snapshot.newModification());
+        return openTransaction;
+    }
+
+    void close() {
+        closed = true;
+    }
+
+    @Override
+    protected void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
+        if (transaction instanceof ReadWriteShardDataTreeTransaction) {
+            Preconditions.checkState(openTransaction != null, "Attempted to abort transaction %s while none is outstanding", transaction);
+            LOG.debug("Aborted transaction {}", transaction);
+            openTransaction = null;
+        }
+    }
+
+    @Override
+    protected ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+        Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction);
+
+        // dataTree is finalizing ready the transaction, we just record it for the next
+        // transaction in chain
+        final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction);
+        openTransaction = null;
+        previousTx = transaction;
+        LOG.debug("Committing transaction {}", transaction);
+
+        return new ChainedCommitCohort(this, transaction, delegate);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("id", chainId).toString();
+    }
+
+    void clearTransaction(ReadWriteShardDataTreeTransaction transaction) {
+        if (transaction.equals(previousTx)) {
+            previousTx = null;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java
new file mode 100644 (file)
index 0000000..ee04aff
--- /dev/null
@@ -0,0 +1,13 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+abstract class ShardDataTreeTransactionParent {
+    abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
+    abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
+}
index 41ca486eb6cf9c15756b8e5748f75358fb61c409..f2c77e32d87f3bf12e841e6f25fef15c6191e7bb 100644 (file)
@@ -13,17 +13,12 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -34,9 +29,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 public class ShardReadTransaction extends ShardTransaction {
     private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
 
-    private final DOMStoreReadTransaction transaction;
+    private final AbstractShardDataTreeTransaction<?> transaction;
 
-    public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
+    public ShardReadTransaction(AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
             ShardStats shardStats, String transactionID, short clientTxVersion) {
         super(shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
@@ -70,28 +65,16 @@ public class ShardReadTransaction extends ShardTransaction {
 
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
-        final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = transaction.read(DATASTORE_ROOT);
+        final Optional<NormalizedNode<?, ?>> result = transaction.getSnapshot().readNode(DATASTORE_ROOT);
 
-        Futures.addCallback(future, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
-            @Override
-            public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
-                byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get());
-                sender.tell(new CaptureSnapshotReply(serialized), self);
+        byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get());
+        sender.tell(new CaptureSnapshotReply(serialized), self);
 
-                self.tell(PoisonPill.getInstance(), self);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                sender.tell(new akka.actor.Status.Failure(t), self);
-
-                self.tell(PoisonPill.getInstance(), self);
-            }
-        });
+        self.tell(PoisonPill.getInstance(), self);
     }
 
     @Override
-    protected DOMStoreTransaction getDOMStoreTransaction() {
+    protected AbstractShardDataTreeTransaction<?> getDOMStoreTransaction() {
         return transaction;
     }
 
index 2042e955777f6668a831889e5ef4985f31505f12..c90b2ae02855145282b3a6ea72deb92364c06927 100644 (file)
@@ -14,35 +14,30 @@ import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 
 /**
  * @author: syedbahm
  * Date: 8/6/14
  */
 public class ShardReadWriteTransaction extends ShardWriteTransaction {
-    private final DOMStoreReadWriteTransaction transaction;
-
-    public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
+    public ShardReadWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
             ShardStats shardStats, String transactionID, short clientTxVersion) {
         super(transaction, shardActor, shardStats, transactionID, clientTxVersion);
-        this.transaction = transaction;
     }
 
     @Override
     public void handleReceive(Object message) throws Exception {
         if (message instanceof ReadData) {
-            readData(transaction, (ReadData) message, !SERIALIZED_REPLY);
+            readData((ReadData) message, !SERIALIZED_REPLY);
 
         } else if (message instanceof DataExists) {
-            dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY);
+            dataExists((DataExists) message, !SERIALIZED_REPLY);
 
         } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY);
+            readData(ReadData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            dataExists(transaction, DataExists.fromSerializable(message), SERIALIZED_REPLY);
-
+            dataExists(DataExists.fromSerializable(message), SERIALIZED_REPLY);
         } else {
             super.handleReceive(message);
         }
index 01a124b6977c801e3f273c57341efe91d97c52b2..797641978d2cd47cc7eed57c12e77e2334cb943c 100644 (file)
@@ -7,9 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import com.google.common.collect.Lists;
 import java.io.IOException;
-import java.util.List;
 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
@@ -17,11 +15,12 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.slf4j.Logger;
 
 /**
@@ -31,56 +30,59 @@ import org.slf4j.Logger;
  * committed to the data store in the order the corresponding snapshot or log batch are received
  * to preserve data store integrity.
  *
- * @author Thomas Panetelis
+ * @author Thomas Pantelis
  */
 class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
-
-    private final InMemoryDOMDataStore store;
-    private List<ModificationPayload> currentLogRecoveryBatch;
+    private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
+    private final DataTree store;
     private final String shardName;
     private final Logger log;
+    private DataTreeModification transaction;
+    private int size;
 
-    ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
-        this.store = store;
+    ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) {
+        this.store = store.getDataTree();
         this.shardName = shardName;
         this.log = log;
     }
 
     @Override
     public void startLogRecoveryBatch(int maxBatchSize) {
-        currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
         log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
+        transaction = store.takeSnapshot().newModification();
+        size = 0;
     }
 
     @Override
     public void appendRecoveredLogEntry(Payload payload) {
         try {
-            if(payload instanceof ModificationPayload) {
-                currentLogRecoveryBatch.add((ModificationPayload) payload);
+            if (payload instanceof DataTreeCandidatePayload) {
+                DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
+                size++;
+            } else if (payload instanceof ModificationPayload) {
+                MutableCompositeModification.fromSerializable(
+                    ((ModificationPayload) payload).getModification()).apply(transaction);
+                size++;
             } else if (payload instanceof CompositeModificationPayload) {
-                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
-                        ((CompositeModificationPayload) payload).getModification())));
+                MutableCompositeModification.fromSerializable(
+                    ((CompositeModificationPayload) payload).getModification()).apply(transaction);
+                size++;
             } else if (payload instanceof CompositeModificationByteStringPayload) {
-                currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
-                        ((CompositeModificationByteStringPayload) payload).getModification())));
+                MutableCompositeModification.fromSerializable(
+                        ((CompositeModificationByteStringPayload) payload).getModification()).apply(transaction);
+                size++;
             } else {
                 log.error("{}: Unknown payload {} received during recovery", shardName, payload);
             }
-        } catch (IOException e) {
+        } catch (IOException | ClassNotFoundException e) {
             log.error("{}: Error extracting ModificationPayload", shardName, e);
         }
-
     }
 
-    private void commitTransaction(DOMStoreWriteTransaction transaction) {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        try {
-            commitCohort.preCommit().get();
-            commitCohort.commit().get();
-        } catch (Exception e) {
-            log.error("{}: Failed to commit Tx on recovery", shardName, e);
-        }
+    private void commitTransaction(DataTreeModification tx) throws DataValidationFailedException {
+        tx.ready();
+        store.validate(tx);
+        store.commit(store.prepare(tx));
     }
 
     /**
@@ -88,20 +90,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
      */
     @Override
     public void applyCurrentLogRecoveryBatch() {
-        log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
-
-        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
-        for(ModificationPayload payload: currentLogRecoveryBatch) {
-            try {
-                MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
-            } catch (Exception e) {
-                log.error("{}: Error extracting ModificationPayload", shardName, e);
-            }
+        log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
+        try {
+            commitTransaction(transaction);
+        } catch (DataValidationFailedException e) {
+            log.error("{}: Failed to apply recovery batch", shardName, e);
         }
-
-        commitTransaction(writeTx);
-
-        currentLogRecoveryBatch = null;
+        transaction = null;
     }
 
     /**
@@ -111,14 +106,15 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
      */
     @Override
     public void applyRecoverySnapshot(final byte[] snapshotBytes) {
-        log.debug("{}: Applyng recovered sbapshot", shardName);
-
-        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
-
-        NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+        log.debug("{}: Applying recovered snapshot", shardName);
 
-        writeTx.write(YangInstanceIdentifier.builder().build(), node);
-
-        commitTransaction(writeTx);
+        final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+        final DataTreeModification tx = store.takeSnapshot().newModification();
+        tx.write(ROOT, node);
+        try {
+            commitTransaction(tx);
+        } catch (DataValidationFailedException e) {
+            log.error("{}: Failed to apply recovery snapshot", shardName, e);
+        }
     }
 }
index c59085d61c57961feb915077e996262b92e2ce17..600509a26b87b480156f4baf843f2b887ed4f655 100644 (file)
@@ -7,15 +7,13 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Preconditions;
 import akka.actor.ActorRef;
 import java.util.concurrent.ExecutionException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
@@ -31,14 +29,14 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
 
     private int createSnapshotTransactionCounter;
     private final ShardTransactionActorFactory transactionActorFactory;
-    private final InMemoryDOMDataStore store;
+    private final ShardDataTree store;
     private final Logger log;
     private final String logId;
 
-    ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store,
+    ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
             Logger log, String logId) {
         this.transactionActorFactory = transactionActorFactory;
-        this.store = store;
+        this.store = Preconditions.checkNotNull(store);
         this.log = log;
         this.logId = logId;
     }
@@ -67,15 +65,15 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
         log.info("{}: Applying snapshot", logId);
 
         try {
-            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+            ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("snapshot-" + logId, null);
 
             NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
             // delete everything first
-            transaction.delete(DATASTORE_ROOT);
+            transaction.getSnapshot().delete(DATASTORE_ROOT);
 
             // Add everything from the remote node back
-            transaction.write(DATASTORE_ROOT, node);
+            transaction.getSnapshot().write(DATASTORE_ROOT, node);
             syncCommitTransaction(transaction);
         } catch (InterruptedException | ExecutionException e) {
             log.error("{}: An exception occurred when applying snapshot", logId, e);
@@ -85,9 +83,9 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
 
     }
 
-    void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+    void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction)
             throws ExecutionException, InterruptedException {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        ShardDataTreeCohort commitCohort = store.finishTransaction(transaction);
         commitCohort.preCommit().get();
         commitCohort.commit().get();
     }
index 81125a7152bf562baede534f7b84b2560f4f9bae..600ec393971ffe72efd12dfaaaf472cc61edd204 100644 (file)
@@ -14,8 +14,9 @@ import akka.actor.Props;
 import akka.actor.ReceiveTimeout;
 import akka.japi.Creator;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -25,10 +26,6 @@ import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -66,13 +63,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         this.clientTxVersion = clientTxVersion;
     }
 
-    public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
+    public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
             DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
-        return Props.create(new ShardTransactionCreator(transaction, shardActor,
+        return Props.create(new ShardTransactionCreator(type, transaction, shardActor,
            datastoreContext, shardStats, transactionID, txnClientVersion));
     }
 
-    protected abstract DOMStoreTransaction getDOMStoreTransaction();
+    protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
 
     protected ActorRef getShardActor() {
         return shardActor;
@@ -105,7 +102,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     }
 
     private void closeTransaction(boolean sendReply) {
-        getDOMStoreTransaction().close();
+        getDOMStoreTransaction().abort();
 
         if(sendReply && returnCloseTransactionReply()) {
             getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
@@ -114,71 +111,83 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
 
-    protected void readData(DOMStoreReadTransaction transaction, ReadData message,
-            final boolean returnSerialized) {
-
-        final YangInstanceIdentifier path = message.getPath();
-        try {
-            final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
-            Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
-            ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
+    private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
+        final boolean ret = transaction.isClosed();
+        if (ret) {
+            shardStats.incrementFailedReadTransactionsCount();
+            getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf());
+        }
+        return ret;
+    }
 
-            sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
+    protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message,
+            final boolean returnSerialized) {
 
-        } catch (Exception e) {
-            LOG.debug(String.format("Unexpected error reading path %s", path), e);
-            shardStats.incrementFailedReadTransactionsCount();
-            sender().tell(new akka.actor.Status.Failure(e), self());
+        if (checkClosed(transaction)) {
+            return;
         }
+
+        final YangInstanceIdentifier path = message.getPath();
+        Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
+        ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
+        sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
     }
 
-    protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
+    protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message,
         final boolean returnSerialized) {
-        final YangInstanceIdentifier path = message.getPath();
 
-        try {
-            boolean exists = transaction.exists(path).checkedGet();
-            DataExistsReply dataExistsReply = DataExistsReply.create(exists);
-            getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
-                dataExistsReply, getSelf());
-        } catch (ReadFailedException e) {
-            getSender().tell(new akka.actor.Status.Failure(e),getSelf());
+        if (checkClosed(transaction)) {
+            return;
         }
+
+        final YangInstanceIdentifier path = message.getPath();
+        boolean exists = transaction.getSnapshot().readNode(path).isPresent();
+        DataExistsReply dataExistsReply = DataExistsReply.create(exists);
+        getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
+            dataExistsReply, getSelf());
     }
 
     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
 
         private static final long serialVersionUID = 1L;
 
-        final DOMStoreTransaction transaction;
+        final AbstractShardDataTreeTransaction<?> transaction;
         final ActorRef shardActor;
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
         final String transactionID;
         final short txnClientVersion;
+        final TransactionType type;
 
-        ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
+        ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
                 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
-            this.transaction = transaction;
+            this.transaction = Preconditions.checkNotNull(transaction);
             this.shardActor = shardActor;
             this.shardStats = shardStats;
             this.datastoreContext = datastoreContext;
             this.transactionID = transactionID;
             this.txnClientVersion = txnClientVersion;
+            this.type = type;
         }
 
         @Override
         public ShardTransaction create() throws Exception {
-            ShardTransaction tx;
-            if(transaction instanceof DOMStoreReadWriteTransaction) {
-                tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, shardStats, transactionID, txnClientVersion);
-            } else if(transaction instanceof DOMStoreReadTransaction) {
-                tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        shardStats, transactionID, txnClientVersion);
-            } else {
-                tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, shardStats, transactionID, txnClientVersion);
+            final ShardTransaction tx;
+            switch (type) {
+            case READ_ONLY:
+                tx = new ShardReadTransaction(transaction, shardActor,
+                    shardStats, transactionID, txnClientVersion);
+                break;
+            case READ_WRITE:
+                tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
+                    shardActor, shardStats, transactionID, txnClientVersion);
+                break;
+            case WRITE_ONLY:
+                tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
+                    shardActor, shardStats, transactionID, txnClientVersion);
+                break;
+            default:
+                throw new IllegalArgumentException("Unhandled transaction type " + type);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
index a4c97e8ab9248cd471ad23f50913abf8032a01d3..7a163088d4bec3938132285810ae35d70f2127fe 100644 (file)
@@ -8,16 +8,17 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Preconditions;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -25,13 +26,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  */
 public class ShardTransactionChain extends AbstractUntypedActor {
 
-    private final DOMStoreTransactionChain chain;
+    private final ShardDataTreeTransactionChain chain;
     private final DatastoreContext datastoreContext;
     private final ShardStats shardStats;
 
-    public ShardTransactionChain(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+    public ShardTransactionChain(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext,
             ShardStats shardStats) {
-        this.chain = chain;
+        this.chain = Preconditions.checkNotNull(chain);
         this.datastoreContext = datastoreContext;
         this.shardStats = shardStats;
     }
@@ -55,29 +56,25 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
     private ActorRef createTypedTransactionActor(CreateTransaction createTransaction) {
         String transactionName = "shard-" + createTransaction.getTransactionId();
-        if(createTransaction.getTransactionType() ==
-                TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
-            return getContext().actorOf(
-                    ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
-                            datastoreContext, shardStats, createTransaction.getTransactionId(),
-                            createTransaction.getVersion()), transactionName);
-        } else if (createTransaction.getTransactionType() ==
-                TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
-            return getContext().actorOf(
-                    ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
-                            datastoreContext, shardStats, createTransaction.getTransactionId(),
-                            createTransaction.getVersion()), transactionName);
-        } else if (createTransaction.getTransactionType() ==
-                TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
-            return getContext().actorOf(
-                    ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
-                            datastoreContext, shardStats, createTransaction.getTransactionId(),
-                            createTransaction.getVersion()), transactionName);
-        } else {
-            throw new IllegalArgumentException (
-                    "CreateTransaction message has unidentified transaction type=" +
-                             createTransaction.getTransactionType());
+
+        final TransactionType type = TransactionType.fromInt(createTransaction.getTransactionType());
+        final AbstractShardDataTreeTransaction<?> transaction;
+        switch (type) {
+        case READ_ONLY:
+            transaction = chain.newReadOnlyTransaction(transactionName);
+            break;
+        case READ_WRITE:
+        case WRITE_ONLY:
+            transaction = chain.newReadWriteTransaction(transactionName);
+            break;
+        default:
+            throw new IllegalArgumentException("Unhandled transaction type " + type);
         }
+
+        return getContext().actorOf(
+            ShardTransaction.props(type, transaction, getShardActor(),
+                    datastoreContext, shardStats, createTransaction.getTransactionId(),
+                    createTransaction.getVersion()), transactionName);
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
@@ -87,7 +84,7 @@ public class ShardTransactionChain extends AbstractUntypedActor {
                 createTransaction.getTransactionId()).toSerializable(), getSelf());
     }
 
-    public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+    public static Props props(ShardDataTreeTransactionChain chain, SchemaContext schemaContext,
         DatastoreContext datastoreContext, ShardStats shardStats) {
         return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats));
     }
@@ -95,12 +92,11 @@ public class ShardTransactionChain extends AbstractUntypedActor {
     private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
         private static final long serialVersionUID = 1L;
 
-        final DOMStoreTransactionChain chain;
+        final ShardDataTreeTransactionChain chain;
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
 
-
-        ShardTransactionChainCreator(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+        ShardTransactionChainCreator(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext,
                 ShardStats shardStats) {
             this.chain = chain;
             this.datastoreContext = datastoreContext;
index 9637646fc554207746aba1db325eca26de74fbfa..3a92062e7f86972d55fe7f230c655898d0f0acfb 100644 (file)
@@ -7,11 +7,11 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Preconditions;
 import akka.actor.ActorRef;
 import akka.actor.UntypedActorContext;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 
 /**
  * A factory for creating ShardTransaction actors.
@@ -20,16 +20,16 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
  */
 class ShardTransactionActorFactory {
 
-    private final DOMTransactionFactory domTransactionFactory;
+    private final ShardDataTree dataTree;
     private final DatastoreContext datastoreContext;
     private final String txnDispatcherPath;
     private final ShardStats shardMBean;
     private final UntypedActorContext actorContext;
     private final ActorRef shardActor;
 
-    ShardTransactionActorFactory(DOMTransactionFactory domTransactionFactory, DatastoreContext datastoreContext,
+    ShardTransactionActorFactory(ShardDataTree dataTree, DatastoreContext datastoreContext,
             String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean) {
-        this.domTransactionFactory = domTransactionFactory;
+        this.dataTree = Preconditions.checkNotNull(dataTree);
         this.datastoreContext = datastoreContext;
         this.txnDispatcherPath = txnDispatcherPath;
         this.shardMBean = shardMBean;
@@ -39,11 +39,20 @@ class ShardTransactionActorFactory {
 
     ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID,
             String transactionChainID, short clientVersion) {
+        final AbstractShardDataTreeTransaction<?> transaction;
+        switch (type) {
+        case READ_ONLY:
+            transaction = dataTree.newReadOnlyTransaction(transactionID.toString(), transactionChainID);
+            break;
+        case READ_WRITE:
+        case WRITE_ONLY:
+            transaction = dataTree.newReadWriteTransaction(transactionID.toString(), transactionChainID);
+            break;
+        default:
+            throw new IllegalArgumentException("Unsupported transaction type " + type);
+        }
 
-        DOMStoreTransaction transaction = domTransactionFactory.newTransaction(type, transactionID.toString(),
-                transactionChainID);
-
-        return actorContext.actorOf(ShardTransaction.props(transaction, shardActor, datastoreContext, shardMBean,
+        return actorContext.actorOf(ShardTransaction.props(type, transaction, shardActor, datastoreContext, shardMBean,
                 transactionID.getRemoteTransactionId(), clientVersion).withDispatcher(txnDispatcherPath),
                 transactionID.toString());
     }
index 424ab2052c13c6dd545d7aac1f53b4a5a054daae..365f97dd3f8f7797ceb953f2b9b71606b4ba099b 100644 (file)
@@ -15,11 +15,13 @@ import akka.actor.PoisonPill;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
@@ -29,9 +31,6 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 
 /**
  * @author: syedbahm
@@ -42,16 +41,16 @@ public class ShardWriteTransaction extends ShardTransaction {
     private final MutableCompositeModification compositeModification = new MutableCompositeModification();
     private int totalBatchedModificationsReceived;
     private Exception lastBatchedModificationsException;
-    private final DOMStoreWriteTransaction transaction;
+    private final ReadWriteShardDataTreeTransaction transaction;
 
-    public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
+    public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
             ShardStats shardStats, String transactionID, short clientTxVersion) {
         super(shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
     @Override
-    protected DOMStoreTransaction getDOMStoreTransaction() {
+    protected ReadWriteShardDataTreeTransaction getDOMStoreTransaction() {
         return transaction;
     }
 
@@ -61,17 +60,17 @@ public class ShardWriteTransaction extends ShardTransaction {
         if (message instanceof BatchedModifications) {
             batchedModifications((BatchedModifications)message);
         } else if (message instanceof ReadyTransaction) {
-            readyTransaction(transaction, !SERIALIZED_REPLY, false);
+            readyTransaction(!SERIALIZED_REPLY, false);
         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readyTransaction(transaction, SERIALIZED_REPLY, false);
+            readyTransaction(SERIALIZED_REPLY, false);
         } else if(WriteData.isSerializedType(message)) {
-            writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
+            writeData(WriteData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(MergeData.isSerializedType(message)) {
-            mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY);
+            mergeData(MergeData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(DeleteData.isSerializedType(message)) {
-            deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
+            deleteData(DeleteData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
@@ -82,10 +81,17 @@ public class ShardWriteTransaction extends ShardTransaction {
     }
 
     private void batchedModifications(BatchedModifications batched) {
+        if (checkClosed()) {
+            if (batched.isReady()) {
+                getSelf().tell(PoisonPill.getInstance(), getSelf());
+            }
+            return;
+        }
+
         try {
             for(Modification modification: batched.getModifications()) {
                 compositeModification.addModification(modification);
-                modification.apply(transaction);
+                modification.apply(transaction.getSnapshot());
             }
 
             totalBatchedModificationsReceived++;
@@ -100,7 +106,7 @@ public class ShardWriteTransaction extends ShardTransaction {
                             totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
                 }
 
-                readyTransaction(transaction, false, batched.isDoCommitOnReady());
+                readyTransaction(false, batched.isDoCommitOnReady());
             } else {
                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
             }
@@ -114,14 +120,33 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void writeData(DOMStoreWriteTransaction transaction, WriteData message,
-            boolean returnSerialized) {
+    protected final void dataExists(DataExists message, final boolean returnSerialized) {
+        super.dataExists(transaction, message, returnSerialized);
+    }
+
+    protected final void readData(ReadData message, final boolean returnSerialized) {
+        super.readData(transaction, message, returnSerialized);
+    }
+
+    private boolean checkClosed() {
+        if (transaction.isClosed()) {
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException("Transaction is closed, no modifications allowed")), getSelf());
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private void writeData(WriteData message, boolean returnSerialized) {
         LOG.debug("writeData at path : {}", message.getPath());
+        if (checkClosed()) {
+            return;
+        }
 
         compositeModification.addModification(
                 new WriteModification(message.getPath(), message.getData()));
         try {
-            transaction.write(message.getPath(), message.getData());
+            transaction.getSnapshot().write(message.getPath(), message.getData());
             WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
             getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
                 writeDataReply, getSelf());
@@ -130,15 +155,17 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void mergeData(DOMStoreWriteTransaction transaction, MergeData message,
-            boolean returnSerialized) {
+    private void mergeData(MergeData message, boolean returnSerialized) {
         LOG.debug("mergeData at path : {}", message.getPath());
+        if (checkClosed()) {
+            return;
+        }
 
         compositeModification.addModification(
                 new MergeModification(message.getPath(), message.getData()));
 
         try {
-            transaction.merge(message.getPath(), message.getData());
+            transaction.getSnapshot().merge(message.getPath(), message.getData());
             MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
             getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
                 mergeDataReply, getSelf());
@@ -147,28 +174,29 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message,
-            boolean returnSerialized) {
+    private void deleteData(DeleteData message, boolean returnSerialized) {
         LOG.debug("deleteData at path : {}", message.getPath());
+        if (checkClosed()) {
+            return;
+        }
 
         compositeModification.addModification(new DeleteModification(message.getPath()));
         try {
-            transaction.delete(message.getPath());
+            transaction.getSnapshot().delete(message.getPath());
             DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
             getSender().tell(returnSerialized ? deleteDataReply.toSerializable(message.getVersion()) :
                 deleteDataReply, getSelf());
-        }catch(Exception e){
+        } catch(Exception e) {
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
     }
 
-    private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized,
-            boolean doImmediateCommit) {
+    private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit) {
         String transactionID = getTransactionID();
 
         LOG.debug("readyTransaction : {}", transactionID);
 
-        DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
+        ShardDataTreeCohort cohort =  transaction.ready();
 
         getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
                 cohort, compositeModification, returnSerialized, doImmediateCommit), getContext());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java
new file mode 100644 (file)
index 0000000..9f22ce8
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
+    private static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+    private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
+    private final DataTreeModification transaction;
+    private final ShardDataTree dataTree;
+    private DataTreeCandidateTip candidate;
+
+    SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) {
+        this.dataTree = Preconditions.checkNotNull(dataTree);
+        this.transaction = Preconditions.checkNotNull(transaction);
+    }
+
+    @Override
+    DataTreeCandidateTip getCandidate() {
+        return candidate;
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        try {
+            dataTree.getDataTree().validate(transaction);
+            LOG.debug("Transaction {} validated", transaction);
+            return TRUE_FUTURE;
+        } catch (Exception e) {
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        try {
+            candidate = dataTree.getDataTree().prepare(transaction);
+            /*
+             * FIXME: this is the place where we should be interacting with persistence, specifically by invoking
+             *        persist on the candidate (which gives us a Future).
+             */
+            LOG.debug("Transaction {} prepared candidate {}", transaction, candidate);
+            return VOID_FUTURE;
+        } catch (Exception e) {
+            LOG.debug("Transaction {} failed to prepare", transaction, e);
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        // No-op, really
+        return VOID_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        try {
+            dataTree.getDataTree().commit(candidate);
+        } catch (Exception e) {
+            LOG.error("Transaction {} failed to commit", transaction, e);
+            return Futures.immediateFailedFuture(e);
+        }
+
+        LOG.debug("Transaction {} committed, proceeding to notify", transaction);
+        dataTree.notifyListeners(candidate);
+        return VOID_FUTURE;
+    }
+}
index f12fdd99eab792080069f38bebe1a721a71f1b32..361a221dd5d8589eb2c04cbdd42ce94f4737493e 100644 (file)
@@ -159,7 +159,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         return false;
     }
 
-    private boolean isRootPath(YangInstanceIdentifier path){
+    private static boolean isRootPath(YangInstanceIdentifier path) {
         return !path.getPathArguments().iterator().hasNext();
     }
 
index cdd7859a3019d1dded7d53ae84e8b39d6acfa717..2f48ab9d1be137562f170d354e7580b74e6b7da6 100644 (file)
@@ -7,8 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 
 /**
  * Transaction ReadyTransaction message that is forwarded to the local Shard from the ShardTransaction.
@@ -17,14 +17,14 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
  */
 public class ForwardedReadyTransaction {
     private final String transactionID;
-    private final DOMStoreThreePhaseCommitCohort cohort;
+    private final ShardDataTreeCohort cohort;
     private final Modification modification;
     private final boolean returnSerialized;
     private final boolean doImmediateCommit;
     private final short txnClientVersion;
 
     public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
-            DOMStoreThreePhaseCommitCohort cohort, Modification modification,
+            ShardDataTreeCohort cohort, Modification modification,
             boolean returnSerialized, boolean doImmediateCommit) {
         this.transactionID = transactionID;
         this.cohort = cohort;
@@ -38,7 +38,7 @@ public class ForwardedReadyTransaction {
         return transactionID;
     }
 
-    public DOMStoreThreePhaseCommitCohort getCohort() {
+    public ShardDataTreeCohort getCohort() {
         return cohort;
     }
 
index 3a63f5b17361a0015ac43ba293bcd54ee29339cb..2c553571612b81ee6bd751c7a741ce8ba95aad8d 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 /**
  * DeleteModification store all the parameters required to delete a path from the data tree
@@ -41,6 +42,11 @@ public class DeleteModification extends AbstractModification {
         transaction.delete(getPath());
     }
 
+    @Override
+    public void apply(DataTreeModification transaction) {
+        transaction.delete(getPath());
+    }
+
     @Override
     public byte getType() {
         return DELETE;
index 7ba74f4e7ff63b42e82d81ab6e82f7f22b1ede6f..cc7956ebbc561578d55c55fef48fa9e936f5e2ef 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessa
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 /**
  * MergeModification stores all the parameters required to merge data into the specified path
@@ -41,6 +42,11 @@ public class MergeModification extends WriteModification {
         transaction.merge(getPath(), getData());
     }
 
+    @Override
+    public void apply(final DataTreeModification transaction) {
+        transaction.merge(getPath(), getData());
+    }
+
     @Override
     public byte getType() {
         return MERGE;
index 2dfcdf028785aeb5f35369b983b59e314469289f..6fc8183bd8f29b7e0b1e0c13ce34c4a73f9966f6 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.modification;
 
 import java.io.Externalizable;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 /**
  * Represents a modification to the data store.
@@ -39,6 +40,13 @@ public interface Modification extends Externalizable {
      */
     void apply(DOMStoreWriteTransaction transaction);
 
+    /**
+     * Apply the modification to the specified transaction
+     *
+     * @param transaction
+     */
+    void apply(DataTreeModification transaction);
+
     byte getType();
 
     @Deprecated
index b597742319f08a2c04a8b633baf6d525e97dff14..b594578eb2ece4485c9a75643ccd2f8db11a199a 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.datastore.node.utils.stream.Normalize
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 /**
  * MutableCompositeModification is just a mutable version of a
@@ -45,6 +46,13 @@ public class MutableCompositeModification implements CompositeModification {
         }
     }
 
+    @Override
+    public void apply(DataTreeModification transaction) {
+        for (Modification modification : modifications) {
+            modification.apply(transaction);
+        }
+    }
+
     @Override
     public byte getType() {
         return COMPOSITE;
index 2fdca5f3792161400bf5e8cbbb0f4e13222bcad5..f7f9a71735a587133126db407aba8fb5c49dc63c 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessa
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 /**
  * WriteModification stores all the parameters required to write data to the specified path
@@ -48,6 +49,11 @@ public class WriteModification extends AbstractModification {
         transaction.write(getPath(), data);
     }
 
+    @Override
+    public void apply(final DataTreeModification transaction) {
+        transaction.write(getPath(), data);
+    }
+
     public NormalizedNode<?, ?> getData() {
         return data;
     }
index f6a103ffb88e9a6b8b89aeb4794c4468bd7a84e1..1100f3a7fa2c0fd584dc57618a6cd54339aa4b60 100644 (file)
@@ -21,7 +21,6 @@ import akka.japi.Creator;
 import akka.testkit.TestActorRef;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Collections;
@@ -42,16 +41,17 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -168,49 +168,35 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied));
     }
 
-    protected NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
-        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
-            transaction.read(YangInstanceIdentifier.builder().build());
-
-        Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
-
-        NormalizedNode<?, ?> normalizedNode = optional.get();
-
-        transaction.close();
-
-        return normalizedNode;
-    }
-
-    protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
-            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+    protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
+            final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
             final MutableCompositeModification modification) {
         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
     }
 
-    protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
-            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+    protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
+            final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
             final MutableCompositeModification modification,
-            final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
+            final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
 
-        DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
-        tx.write(path, data);
-        DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, tx.ready(), preCommit);
+        ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null);
+        tx.getSnapshot().write(path, data);
+        ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit);
 
         modification.addModification(new WriteModification(path, data));
 
         return cohort;
     }
 
-    protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName,
-            final DOMStoreThreePhaseCommitCohort actual) {
+    protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
+            final ShardDataTreeCohort actual) {
         return createDelegatingMockCohort(cohortName, actual, null);
     }
 
-    protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName,
-            final DOMStoreThreePhaseCommitCohort actual,
-            final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
-        DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
+    protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
+            final ShardDataTreeCohort actual,
+            final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
+        ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName);
 
         doAnswer(new Answer<ListenableFuture<Boolean>>() {
             @Override
@@ -244,43 +230,55 @@ public abstract class AbstractShardTest extends AbstractActorTest{
             }
         }).when(cohort).abort();
 
+        doAnswer(new Answer<DataTreeCandidateTip>() {
+            @Override
+            public DataTreeCandidateTip answer(final InvocationOnMock invocation) {
+                return actual.getCandidate();
+            }
+        }).when(cohort).getCandidate();
+
         return cohort;
     }
 
     public static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
             throws ExecutionException, InterruptedException {
-        return readStore(shard.underlyingActor().getDataStore(), id);
+        return readStore(shard.underlyingActor().getDataStore().getDataTree(), id);
     }
 
-    public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
-            throws ExecutionException, InterruptedException {
-        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
-
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-            transaction.read(id);
+    public static NormalizedNode<?,?> readStore(final DataTree store, final YangInstanceIdentifier id) {
+        DataTreeSnapshot transaction = store.takeSnapshot();
 
-        Optional<NormalizedNode<?, ?>> optional = future.get();
+        Optional<NormalizedNode<?, ?>> optional = transaction.readNode(id);
         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
 
-        transaction.close();
-
         return node;
     }
 
     public static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
+            final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
         writeToStore(shard.underlyingActor().getDataStore(), id, node);
     }
 
-    public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
-        DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+    public static void writeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
+            final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
+        ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null);
 
-        transaction.write(id, node);
+        transaction.getSnapshot().write(id, node);
+        ShardDataTreeCohort cohort = transaction.ready();
+        cohort.canCommit().get();
+        cohort.preCommit().get();
+        cohort.commit();
+    }
 
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
+    public static void writeToStore(final DataTree store, final YangInstanceIdentifier id,
+            final NormalizedNode<?,?> node) throws DataValidationFailedException {
+        DataTreeModification transaction = store.takeSnapshot().newModification();
+
+        transaction.write(id, node);
+        transaction.ready();
+        store.validate(transaction);
+        final DataTreeCandidate candidate = store.prepare(transaction);
+        store.commit(candidate);
     }
 
     @SuppressWarnings("serial")
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java
new file mode 100644 (file)
index 0000000..781c3db
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+
+public class DataTreeCandidatePayloadTest {
+    private DataTreeCandidate candidate;
+
+    private static DataTreeCandidateNode findNode(final Collection<DataTreeCandidateNode> nodes, final PathArgument arg) {
+        for (DataTreeCandidateNode node : nodes) {
+            if (arg.equals(node.getIdentifier())) {
+                return node;
+            }
+        }
+        return null;
+    }
+
+    private static void assertChildrenEquals(final Collection<DataTreeCandidateNode> expected,
+            final Collection<DataTreeCandidateNode> actual) {
+        // Make sure all expected nodes are there
+        for (DataTreeCandidateNode exp : expected) {
+            final DataTreeCandidateNode act = findNode(actual, exp.getIdentifier());
+            assertNotNull("missing expected child", act);
+            assertCandidateNodeEquals(exp, act);
+        }
+        // Make sure no nodes are present which are not in the expected set
+        for (DataTreeCandidateNode act : actual) {
+            final DataTreeCandidateNode exp = findNode(expected, act.getIdentifier());
+            assertNull("unexpected child", exp);
+        }
+    }
+
+    private static void assertCandidateEquals(final DataTreeCandidate expected, final DataTreeCandidate actual) {
+        assertEquals("root path", expected.getRootPath(), actual.getRootPath());
+
+        final DataTreeCandidateNode expRoot = expected.getRootNode();
+        final DataTreeCandidateNode actRoot = expected.getRootNode();
+        assertEquals("root type", expRoot.getModificationType(), actRoot.getModificationType());
+
+        switch (actRoot.getModificationType()) {
+        case DELETE:
+        case WRITE:
+            assertEquals("root data", expRoot.getDataAfter(), actRoot.getDataAfter());
+            break;
+        case SUBTREE_MODIFIED:
+            assertChildrenEquals(expRoot.getChildNodes(), actRoot.getChildNodes());
+            break;
+        default:
+            fail("Unexpect root type " + actRoot.getModificationType());
+            break;
+        }
+
+        assertCandidateNodeEquals(expected.getRootNode(), actual.getRootNode());
+    }
+
+    private static void assertCandidateNodeEquals(final DataTreeCandidateNode expected, final DataTreeCandidateNode actual) {
+        assertEquals("child type", expected.getModificationType(), actual.getModificationType());
+        assertEquals("child identifier", expected.getIdentifier(), actual.getIdentifier());
+
+        switch (actual.getModificationType()) {
+        case DELETE:
+        case WRITE:
+            assertEquals("child data", expected.getDataAfter(), actual.getDataAfter());
+            break;
+        case SUBTREE_MODIFIED:
+            assertChildrenEquals(expected.getChildNodes(), actual.getChildNodes());
+            break;
+        default:
+            fail("Unexpect root type " + actual.getModificationType());
+            break;
+        }
+    }
+
+    @Before
+    public void setUp() {
+        final YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        final NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+        candidate = DataTreeCandidates.fromNormalizedNode(writePath, writeData);
+    }
+
+    @Test
+    public void testCandidateSerialization() throws IOException {
+        final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
+        assertEquals("payload size", 141, payload.size());
+    }
+
+    @Test
+    public void testCandidateSerDes() throws IOException {
+        final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
+        assertCandidateEquals(candidate, payload.getCandidate());
+    }
+
+    @Test
+    public void testPayloadSerDes() throws IOException {
+        final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
+        assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate());
+    }
+}
index 72f672794ab16c9bf89920bcf855cb72fd4bcc63..3d28672c9fd08906e2fba0a063dbcf10e4326ced 100644 (file)
@@ -25,7 +25,6 @@ import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
@@ -34,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -88,25 +86,32 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 public class ShardTest extends AbstractShardTest {
+    private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
 
     @Test
     public void testRegisterChangeListener() throws Exception {
@@ -338,8 +343,8 @@ public class ShardTest extends AbstractShardTest {
         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
                 "testApplySnapshot");
 
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+        DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        store.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -378,12 +383,27 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @Test
-    public void testRecovery() throws Exception {
+    public void testApplyStateWithCandidatePayload() throws Exception {
 
-        // Set up the InMemorySnapshotStore.
+        TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+
+        NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
+
+        ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+                DataTreeCandidatePayload.create(candidate)));
+
+        shard.underlyingActor().onReceiveCommand(applyState);
+
+        NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
+        assertEquals("Applied state", node, actual);
+
+        shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+    }
 
-        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
-        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+    DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
+        DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+        testStore.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -392,6 +412,55 @@ public class ShardTest extends AbstractShardTest {
         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
                 SerializationUtils.serializeNormalizedNode(root),
                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+        return testStore;
+    }
+
+    private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
+        source.validate(mod);
+        final DataTreeCandidate candidate = source.prepare(mod);
+        source.commit(candidate);
+        return DataTreeCandidatePayload.create(candidate);
+    }
+
+    @Test
+    public void testDataTreeCandidateRecovery() throws Exception {
+        // Set up the InMemorySnapshotStore.
+        final DataTree source = setupInMemorySnapshotStore();
+
+        final DataTreeModification writeMod = source.takeSnapshot().newModification();
+        writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+        // Set up the InMemoryJournal.
+        InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
+
+        int nListEntries = 16;
+        Set<Integer> listEntryKeys = new HashSet<>();
+
+        // Add some ModificationPayload entries
+        for (int i = 1; i <= nListEntries; i++) {
+            listEntryKeys.add(Integer.valueOf(i));
+
+            YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+                    .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+
+            final DataTreeModification mod = source.takeSnapshot().newModification();
+            mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
+
+            InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+                payloadForModification(source, mod)));
+        }
+
+        InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+                new ApplyJournalEntries(nListEntries));
+
+        testRecovery(listEntryKeys);
+    }
+
+    @Test
+    public void testModicationRecovery() throws Exception {
+
+        // Set up the InMemorySnapshotStore.
+        setupInMemorySnapshotStore();
 
         // Set up the InMemoryJournal.
 
@@ -419,7 +488,7 @@ public class ShardTest extends AbstractShardTest {
         testRecovery(listEntryKeys);
     }
 
-    private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
+    private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
         MutableCompositeModification compMod = new MutableCompositeModification();
         for(Modification mod: mods) {
             compMod.addModification(mod);
@@ -428,7 +497,6 @@ public class ShardTest extends AbstractShardTest {
         return new ModificationPayload(compMod);
     }
 
-    @SuppressWarnings({ "unchecked" })
     @Test
     public void testConcurrentThreePhaseCommits() throws Throwable {
         new ShardTestKit(getSystem()) {{
@@ -440,23 +508,23 @@ public class ShardTest extends AbstractShardTest {
 
          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
                     modification2);
 
             String transactionID3 = "tx3";
             MutableCompositeModification modification3 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+            ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
@@ -604,12 +672,12 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
-    private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
+    private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
             NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
         return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
     }
 
-    private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
+    private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
             YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
         BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
         batched.addModification(new WriteModification(path, data));
@@ -630,10 +698,10 @@ public class ShardTest extends AbstractShardTest {
             final String transactionID = "tx";
             FiniteDuration duration = duration("5 seconds");
 
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+            final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
                 @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+                public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
                     if(mockCohort.get() == null) {
                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
                     }
@@ -698,10 +766,10 @@ public class ShardTest extends AbstractShardTest {
             final String transactionID = "tx";
             FiniteDuration duration = duration("5 seconds");
 
-            final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+            final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
             ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
                 @Override
-                public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+                public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
                     if(mockCohort.get() == null) {
                         mockCohort.set(createDelegatingMockCohort("cohort", actual));
                     }
@@ -744,7 +812,7 @@ public class ShardTest extends AbstractShardTest {
     }
 
     @SuppressWarnings("unchecked")
-    private void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
+    private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
         NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
@@ -817,6 +885,8 @@ public class ShardTest extends AbstractShardTest {
         final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
         new ShardTestKit(getSystem()) {{
             Creator<Shard> creator = new Creator<Shard>() {
+                private static final long serialVersionUID = 1L;
+
                 @Override
                 public Shard create() throws Exception {
                     return new Shard(shardID, Collections.<String,String>emptyMap(),
@@ -861,12 +931,12 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             String transactionID = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+            ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
                     TestModel.TEST_PATH, containerNode, modification);
 
             FiniteDuration duration = duration("5 seconds");
@@ -901,14 +971,14 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             // Setup a simulated transactions with a mock cohort.
 
             String transactionID = "tx";
             MutableCompositeModification modification = new MutableCompositeModification();
             NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
-            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+            ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
                     TestModel.TEST_PATH, containerNode, modification);
 
             FiniteDuration duration = duration("5 seconds");
@@ -944,6 +1014,25 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    private static DataTreeCandidateTip mockCandidate(final String name) {
+        DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
+        DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
+        doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
+        doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
+        doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
+        doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
+        return mockCandidate;
+    }
+
+    private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
+        DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
+        DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
+        doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
+        doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
+        doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
+        return mockCandidate;
+    }
+
     @Test
     public void testCommitWhenTransactionHasNoModifications(){
         // Note that persistence is enabled which would normally result in the entry getting written to the journal
@@ -958,10 +1047,11 @@ public class ShardTest extends AbstractShardTest {
 
                 String transactionID = "tx1";
                 MutableCompositeModification modification = new MutableCompositeModification();
-                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+                doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
 
                 FiniteDuration duration = duration("5 seconds");
 
@@ -1013,10 +1103,11 @@ public class ShardTest extends AbstractShardTest {
                 String transactionID = "tx1";
                 MutableCompositeModification modification = new MutableCompositeModification();
                 modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
-                DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+                ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
                 doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+                doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
 
                 FiniteDuration duration = duration("5 seconds");
 
@@ -1069,14 +1160,15 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
+            doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
@@ -1145,13 +1237,13 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
@@ -1221,7 +1313,7 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
             // Simulate the ForwardedReadyTransaction messages that would be sent
@@ -1269,7 +1361,7 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
 
             // Simulate the ForwardedReadyTransaction messages that would be sent
@@ -1319,7 +1411,7 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
 
             // Simulate the ForwardedReadyTransaction messages that would be sent
@@ -1338,6 +1430,11 @@ public class ShardTest extends AbstractShardTest {
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+            DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
+            DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
+            doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
+            doReturn(candidateRoot).when(candidate).getRootNode();
+            doReturn(candidate).when(cohort).getCandidate();
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
                     cohort, modification, true, true), getRef());
@@ -1361,7 +1458,7 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
 
             // Simulate the ForwardedReadyTransaction messages that would be sent
@@ -1380,6 +1477,11 @@ public class ShardTest extends AbstractShardTest {
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+            DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
+            DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
+            doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
+            doReturn(candidateRoot).when(candidate).getRootNode();
+            doReturn(candidate).when(cohort).getCandidate();
 
             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
                     cohort, modification, true, true), getRef());
@@ -1400,13 +1502,13 @@ public class ShardTest extends AbstractShardTest {
             waitUntilLeader(shard);
 
             final FiniteDuration duration = duration("5 seconds");
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             final String transactionID = "tx1";
-            Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
-                          new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
+            Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
+                          new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
                 @Override
-                public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
+                public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
                     ListenableFuture<Void> preCommitFuture = cohort.preCommit();
 
                     // Simulate an AbortTransaction message occurring during replication, after
@@ -1424,7 +1526,7 @@ public class ShardTest extends AbstractShardTest {
             };
 
             MutableCompositeModification modification = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+            ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                     modification, preCommit);
 
@@ -1464,7 +1566,7 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
             writeToStore(shard, TestModel.OUTER_LIST_PATH,
@@ -1474,7 +1576,7 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
@@ -1486,7 +1588,7 @@ public class ShardTest extends AbstractShardTest {
             MutableCompositeModification modification2 = new MutableCompositeModification();
             YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
                     listNodePath,
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
                     modification2);
@@ -1541,23 +1643,23 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
                     modification2);
 
             String transactionID3 = "tx3";
             MutableCompositeModification modification3 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+            ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
 
             // Ready the Tx's
@@ -1619,13 +1721,13 @@ public class ShardTest extends AbstractShardTest {
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+            ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
             doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+            ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
             doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
 
             FiniteDuration duration = duration("5 seconds");
@@ -1781,29 +1883,29 @@ public class ShardTest extends AbstractShardTest {
     /**
      * This test simply verifies that the applySnapShot logic will work
      * @throws ReadFailedException
+     * @throws DataValidationFailedException
      */
     @Test
-    public void testInMemoryDataStoreRestore() throws ReadFailedException {
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
-
-        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+    public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
+        DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        store.setSchemaContext(SCHEMA_CONTEXT);
 
-        DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
+        DataTreeModification putTransaction = store.takeSnapshot().newModification();
         putTransaction.write(TestModel.TEST_PATH,
             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
-        commitTransaction(putTransaction);
+        commitTransaction(store, putTransaction);
 
 
-        NormalizedNode<?, ?> expected = readStore(store);
+        NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
 
-        DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
+        DataTreeModification writeTransaction = store.takeSnapshot().newModification();
 
         writeTransaction.delete(YangInstanceIdentifier.builder().build());
         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
 
-        commitTransaction(writeTransaction);
+        commitTransaction(store, writeTransaction);
 
-        NormalizedNode<?, ?> actual = readStore(store);
+        NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
 
         assertEquals(expected, actual);
     }
@@ -1912,15 +2014,9 @@ public class ShardTest extends AbstractShardTest {
         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
     }
 
-    private void commitTransaction(final DOMStoreWriteTransaction transaction) {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        ListenableFuture<Void> future =
-            commitCohort.preCommit();
-        try {
-            future.get();
-            future = commitCohort.commit();
-            future.get();
-        } catch (InterruptedException | ExecutionException e) {
-        }
+    private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
+        modification.ready();
+        store.validate(modification);
+        store.commit(store.prepare(modification));
     }
 }
index 21fb55fcf138bd8e10cd54b2488079770e055e63..e45389f5f36fbd151a24ae9e4d5a02a4995a2772 100644 (file)
@@ -14,17 +14,15 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.pattern.AskTimeoutException;
 import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -40,11 +38,13 @@ import scala.concurrent.duration.Duration;
  * @author Basheeruddin Ahmed <syedbahm@cisco.com>
  */
 public class ShardTransactionFailureTest extends AbstractActorTest {
-    private static final InMemoryDOMDataStore store =
-        new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-
     private static final SchemaContext testSchemaContext =
-        TestModel.createTestContext();
+            TestModel.createTestContext();
+    private static final TransactionType RO = TransactionType.READ_ONLY;
+    private static final TransactionType RW = TransactionType.READ_WRITE;
+    private static final TransactionType WO = TransactionType.WRITE_ONLY;
+
+    private static final ShardDataTree store = new ShardDataTree(testSchemaContext);
 
     private static final ShardIdentifier SHARD_IDENTIFIER =
         ShardIdentifier.builder().memberName("member-1")
@@ -54,11 +54,6 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
-    @BeforeClass
-    public static void staticSetup() {
-        store.onGlobalContextUpdated(testSchemaContext);
-    }
-
     private ActorRef createShard(){
         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<String, String>emptyMap(), datastoreContext,
                 TestModel.createTestContext()));
@@ -69,7 +64,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         throws Throwable {
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+        final Props props = ShardTransaction.props(RO, store.newReadOnlyTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -86,7 +81,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             akka.pattern.Patterns.ask(subject, readData, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().close();
+        subject.underlyingActor().getDOMStoreTransaction().abort();
 
         future = akka.pattern.Patterns.ask(subject, readData, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
@@ -98,7 +93,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         throws Throwable {
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+        final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -116,7 +111,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             akka.pattern.Patterns.ask(subject, readData, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().close();
+        subject.underlyingActor().getDOMStoreTransaction().abort();
 
         future = akka.pattern.Patterns.ask(subject, readData, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
@@ -127,7 +122,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         throws Throwable {
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+        final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -145,7 +140,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             akka.pattern.Patterns.ask(subject, dataExists, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().close();
+        subject.underlyingActor().getDOMStoreTransaction().abort();
 
         future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
@@ -156,7 +151,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+        final Props props = ShardTransaction.props(WO, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -188,7 +183,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+        final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -225,7 +220,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+        final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -257,7 +252,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+        final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
index b22001a4da6c234f6ac7b7167e4f4a4e18d25771..23984ad973933666d41fb60c762e18517181ef1a 100644 (file)
@@ -11,14 +11,13 @@ import akka.actor.Status.Failure;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
-import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
@@ -50,13 +49,11 @@ import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCo
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -64,6 +61,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class ShardTransactionTest extends AbstractActorTest {
 
     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
+    private static final TransactionType RO = TransactionType.READ_ONLY;
+    private static final TransactionType RW = TransactionType.READ_WRITE;
+    private static final TransactionType WO = TransactionType.WRITE_ONLY;
 
     private static final ShardIdentifier SHARD_IDENTIFIER =
         ShardIdentifier.builder().memberName("member-1")
@@ -73,46 +73,50 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
-    private final InMemoryDOMDataStore store =
-            new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+    private final ShardDataTree store = new ShardDataTree(testSchemaContext);
 
-    @Before
-    public void setup() {
-        store.onGlobalContextUpdated(testSchemaContext);
-    }
+    private int txCounter = 0;
 
-    private ActorRef createShard(){
+    private ActorRef createShard() {
         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
             Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
     }
 
-    private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
-        return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
+    private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
+        return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
     }
 
-    private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
-        return newTransactionActor(transaction, null, name, version);
+    private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
+        return newTransactionActor(type, transaction, null, name, version);
     }
 
-    private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
-        return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
+    private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
+        return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
     }
 
-    private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
+    private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
             short version) {
-        Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
+        Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
                 datastoreContext, shardStats, "txn", version);
         return getSystem().actorOf(props, name);
     }
 
+    private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
+        return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
+    }
+
+    private ReadWriteShardDataTreeTransaction readWriteTransaction() {
+        return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
+    }
+
     @Test
     public void testOnReceiveReadData() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
 
-            testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
+            testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
 
-            testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
+            testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
         }
 
         private void testOnReceiveReadData(final ActorRef transaction) {
@@ -140,10 +144,10 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
 
             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
-                    store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
+                    RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
 
             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
-                    store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
+                    RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
         }
 
         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
@@ -167,7 +171,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadDataHeliumR1() throws Exception {
         new JavaTestKit(getSystem()) {{
-            ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+            ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
 
             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
@@ -185,10 +189,10 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
 
-            testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
+            testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
                     "testDataExistsPositiveRO"));
 
-            testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
+            testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
                     "testDataExistsPositiveRW"));
         }
 
@@ -215,10 +219,10 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
 
-            testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
+            testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
                     "testDataExistsNegativeRO"));
 
-            testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
+            testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
                     "testDataExistsNegativeRW"));
         }
 
@@ -253,9 +257,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveWriteData() throws Exception {
+    public void testOnReceiveWriteData() {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
@@ -276,9 +280,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveHeliumR1WriteData() throws Exception {
+    public void testOnReceiveHeliumR1WriteData() {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
 
             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
@@ -296,9 +300,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveMergeData() throws Exception {
+    public void testOnReceiveMergeData() {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+            final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
                     "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
@@ -321,7 +325,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveHeliumR1MergeData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
 
             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
@@ -341,7 +345,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDeleteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testDeleteData");
 
             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
@@ -362,8 +366,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveBatchedModifications() throws Exception {
         new JavaTestKit(getSystem()) {{
 
-            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
-            final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
+            ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
+            DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
+            ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
+            final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
 
             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
@@ -405,10 +411,10 @@ public class ShardTransactionTest extends AbstractActorTest {
             DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
             assertEquals("getPath", deletePath, delete.getPath());
 
-            InOrder inOrder = Mockito.inOrder(mockWriteTx);
-            inOrder.verify(mockWriteTx).write(writePath, writeData);
-            inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
-            inOrder.verify(mockWriteTx).delete(deletePath);
+            InOrder inOrder = Mockito.inOrder(mockModification);
+            inOrder.verify(mockModification).write(writePath, writeData);
+            inOrder.verify(mockModification).merge(mergePath, mergeData);
+            inOrder.verify(mockModification).delete(deletePath);
         }};
     }
 
@@ -416,7 +422,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
         new JavaTestKit(getSystem()) {{
 
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -448,7 +454,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
         new JavaTestKit(getSystem()) {{
 
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -475,8 +481,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
         new JavaTestKit(getSystem()) {{
 
-            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
-            final ActorRef transaction = newTransactionActor(mockWriteTx,
+            ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
+            DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
+            ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
+            final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
                     "testOnReceiveBatchedModificationsFailure");
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -485,7 +493,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-            doThrow(new TestException()).when(mockWriteTx).write(path, node);
+            doThrow(new TestException()).when(mockModification).write(path, node);
 
             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
             batched.addModification(new WriteModification(path, node));
@@ -511,7 +519,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
         new JavaTestKit(getSystem()) {{
 
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -535,7 +543,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+            final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
                     "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -549,7 +557,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
         // test
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+            final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
                     "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -565,13 +573,13 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveCreateSnapshot() throws Exception {
         new JavaTestKit(getSystem()) {{
-            ShardTest.writeToStore(store, TestModel.TEST_PATH,
+            ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-            NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
+            NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
                     YangInstanceIdentifier.builder().build());
 
-            final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
                     "testOnReceiveCreateSnapshot");
 
             watch(transaction);
@@ -594,7 +602,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+            final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
                     "testReadWriteTxOnReceiveCloseTransaction");
 
             watch(transaction);
@@ -609,7 +617,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testWriteTxOnReceiveCloseTransaction");
 
             watch(transaction);
@@ -624,7 +632,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
                     "testReadOnlyTxOnReceiveCloseTransaction");
 
             watch(transaction);
@@ -638,7 +646,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test(expected=UnknownMessageException.class)
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+        final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
@@ -653,7 +661,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 500, TimeUnit.MILLISECONDS).build();
 
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+            final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
                     "testShardTransactionInactivity");
 
             watch(transaction);
index caabb32d71e174ad03a785056b2cd84d06ab453b..a2309be48f4e09f5b102418611bf7d035943a589 100644 (file)
@@ -20,7 +20,6 @@ import akka.pattern.Patterns;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -32,6 +31,8 @@ import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
 import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.ShardDataTree;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
@@ -56,16 +57,15 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Compos
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -110,8 +110,8 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
         NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
 
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+        DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        store.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -154,10 +154,8 @@ public class PreLithiumShardTest extends AbstractShardTest {
     @Test
     public void testHelium2VersionRecovery() throws Exception {
 
-        // Set up the InMemorySnapshotStore.
-
-        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
-        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+        DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+        testStore.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -217,23 +215,23 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
 
             String transactionID2 = "tx2";
             MutableCompositeModification modification2 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
                     TestModel.OUTER_LIST_PATH,
                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
                     modification2);
 
             String transactionID3 = "tx3";
             MutableCompositeModification modification3 = new MutableCompositeModification();
-            DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+            ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
index bbfff70e2dac8623cb66328a023a5fd1cf62ea99..7016ada5257c034d5b1c64557e5d8ed9e8ed21fb 100644 (file)
@@ -8,7 +8,7 @@
 package org.opendaylight.controller.cluster.datastore.modification;
 
 import static org.junit.Assert.assertEquals;
-import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.lang3.SerializationUtils;
 import org.junit.Test;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -43,8 +43,7 @@ public class ModificationPayloadTest {
         assertEquals("getPath", writePath, write.getPath());
         assertEquals("getData", writeData, write.getData());
 
-        ModificationPayload cloned =
-                (ModificationPayload) SerializationUtils.clone(payload);
+        ModificationPayload cloned = SerializationUtils.clone(payload);
 
         deserialized = (MutableCompositeModification) payload.getModification();
 
index 42406080361601de2d3c53bb91a8f9c62fc7a886..60420dcf236ac8d19a51bb9111bf7c9d3b6880f7 100644 (file)
@@ -7,54 +7,54 @@
  */
 package org.opendaylight.controller.md.cluster.datastore.model;
 
+import com.google.common.io.Resources;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collections;
-import java.util.Set;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.parser.api.YangSyntaxErrorException;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
 public class TestModel {
 
-  public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
-          "test");
-
-  public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
-          "junk");
-
-
-  public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
-  public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
-  public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
-  public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
-  public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
-  public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
-  public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
-  private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
-
-  public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
-  public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
-  public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
-          node(OUTER_LIST_QNAME).build();
-  public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
-          node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
-  public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
-  public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
-
-
-  public static final InputStream getDatastoreTestInputStream() {
-    return getInputStream(DATASTORE_TEST_YANG);
-  }
-
-  private static InputStream getInputStream(final String resourceName) {
-    return TestModel.class.getResourceAsStream(DATASTORE_TEST_YANG);
-  }
-
-  public static SchemaContext createTestContext() {
-    YangParserImpl parser = new YangParserImpl();
-    Set<Module> modules = parser.parseYangModelsFromStreams(Collections.singletonList(getDatastoreTestInputStream()));
-    return parser.resolveSchemaContext(modules);
-  }
+    public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
+            "test");
+
+    public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
+            "junk");
+
+
+    public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
+    public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
+    public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
+    public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
+    public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
+    public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
+    public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
+    private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
+
+    public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+    public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
+    public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+            node(OUTER_LIST_QNAME).build();
+    public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+            node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
+    public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
+    public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
+
+
+    public static final InputStream getDatastoreTestInputStream() {
+        return TestModel.class.getResourceAsStream(DATASTORE_TEST_YANG);
+    }
+
+    public static SchemaContext createTestContext() {
+        YangParserImpl parser = new YangParserImpl();
+        try {
+            return parser.parseSources(Collections.singleton(Resources.asByteSource(TestModel.class.getResource(DATASTORE_TEST_YANG))));
+        } catch (IOException | YangSyntaxErrorException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
 }
index f043d2cb84274f870ea8e05fec14ee3ce4069042..106abca3ec44fd674c54dfe282a8eb8c73ba7ff0 100644 (file)
@@ -49,7 +49,8 @@ public abstract class AbstractDOMStoreTransaction<T> implements DOMStoreTransact
      * @return The context in which this transaction was allocated, or null
      *         if the context was not recorded.
      */
-    @Nullable public final Throwable getDebugContext() {
+    @Nullable
+    public final Throwable getDebugContext() {
         return debugContext;
     }
 
diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java
new file mode 100644 (file)
index 0000000..b7776b2
--- /dev/null
@@ -0,0 +1,270 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.spi.data;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract implementation of the {@link DOMStoreTransactionChain} interface relying on {@link DataTreeSnapshot} supplier
+ * and backend commit coordinator.
+ *
+ * @param <T> transaction identifier type
+ */
+@Beta
+public abstract class AbstractSnapshotBackedTransactionChain<T> extends TransactionReadyPrototype<T> implements DOMStoreTransactionChain {
+    private static abstract class State {
+        /**
+         * Allocate a new snapshot.
+         *
+         * @return A new snapshot
+         */
+        protected abstract DataTreeSnapshot getSnapshot();
+    }
+
+    private static final class Idle extends State {
+        private final AbstractSnapshotBackedTransactionChain<?> chain;
+
+        Idle(final AbstractSnapshotBackedTransactionChain<?> chain) {
+            this.chain = Preconditions.checkNotNull(chain);
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            return chain.takeSnapshot();
+        }
+    }
+
+    /**
+     * We have a transaction out there.
+     */
+    private static final class Allocated extends State {
+        private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
+                AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
+        private final DOMStoreWriteTransaction transaction;
+        private volatile DataTreeSnapshot snapshot;
+
+        Allocated(final DOMStoreWriteTransaction transaction) {
+            this.transaction = Preconditions.checkNotNull(transaction);
+        }
+
+        public DOMStoreWriteTransaction getTransaction() {
+            return transaction;
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            final DataTreeSnapshot ret = snapshot;
+            Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
+            return ret;
+        }
+
+        void setSnapshot(final DataTreeSnapshot snapshot) {
+            final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
+            Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier());
+        }
+    }
+
+    /**
+     * Chain is logically shut down, no further allocation allowed.
+     */
+    private static final class Shutdown extends State {
+        private final String message;
+
+        Shutdown(final String message) {
+            this.message = Preconditions.checkNotNull(message);
+        }
+
+        @Override
+        protected DataTreeSnapshot getSnapshot() {
+            throw new IllegalStateException(message);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static final AtomicReferenceFieldUpdater<AbstractSnapshotBackedTransactionChain, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(AbstractSnapshotBackedTransactionChain.class, State.class, "state");
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractSnapshotBackedTransactionChain.class);
+    private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed");
+    private static final Shutdown FAILED = new Shutdown("Transaction chain has failed");
+    private final Idle idleState;
+    private volatile State state;
+
+    protected AbstractSnapshotBackedTransactionChain() {
+        idleState = new Idle(this);
+        state = idleState;
+    }
+
+    private Entry<State, DataTreeSnapshot> getSnapshot() {
+        final State localState = state;
+        return new SimpleEntry<>(localState, localState.getSnapshot());
+    }
+
+    private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
+        final State state = new Allocated(transaction);
+        return STATE_UPDATER.compareAndSet(this, expected, state);
+    }
+
+    @Override
+    public final DOMStoreReadTransaction newReadOnlyTransaction() {
+        final Entry<State, DataTreeSnapshot> entry = getSnapshot();
+        return SnapshotBackedTransactions.newReadTransaction(nextTransactionIdentifier(), getDebugTransactions(), entry.getValue());
+    }
+
+    @Override
+    public final DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        Entry<State, DataTreeSnapshot> entry;
+        DOMStoreReadWriteTransaction ret;
+
+        do {
+            entry = getSnapshot();
+            ret = new SnapshotBackedReadWriteTransaction<T>(nextTransactionIdentifier(),
+                getDebugTransactions(), entry.getValue(), this);
+        } while (!recordTransaction(entry.getKey(), ret));
+
+        return ret;
+    }
+
+    @Override
+    public final DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        Entry<State, DataTreeSnapshot> entry;
+        DOMStoreWriteTransaction ret;
+
+        do {
+            entry = getSnapshot();
+            ret = new SnapshotBackedWriteTransaction<T>(nextTransactionIdentifier(),
+                getDebugTransactions(), entry.getValue(), this);
+        } while (!recordTransaction(entry.getKey(), ret));
+
+        return ret;
+    }
+
+    @Override
+    protected final void transactionAborted(final SnapshotBackedWriteTransaction<T> tx) {
+        final State localState = state;
+        if (localState instanceof Allocated) {
+            final Allocated allocated = (Allocated)localState;
+            if (allocated.getTransaction().equals(tx)) {
+                final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
+                if (!success) {
+                    LOG.warn("Transaction {} aborted, but chain {} state already transitioned from {} to {}, very strange",
+                        tx, this, localState, state);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected final DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<T> tx, final DataTreeModification tree) {
+        final State localState = state;
+
+        if (localState instanceof Allocated) {
+            final Allocated allocated = (Allocated)localState;
+            final DOMStoreWriteTransaction transaction = allocated.getTransaction();
+            Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
+            allocated.setSnapshot(tree);
+        } else {
+            LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
+        }
+
+        return createCohort(tx, tree);
+    }
+
+    @Override
+    public final void close() {
+        final State localState = state;
+
+        do {
+            Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this);
+
+            if (FAILED.equals(localState)) {
+                LOG.debug("Ignoring user close in failed state");
+                return;
+            }
+        } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED));
+    }
+
+    /**
+     * Notify the base logic that a previously-submitted transaction has been committed successfully.
+     *
+     * @param transaction Transaction which completed successfully.
+     */
+    protected final void onTransactionCommited(final SnapshotBackedWriteTransaction<T> transaction) {
+        // If the committed transaction was the one we allocated last,
+        // we clear it and the ready snapshot, so the next transaction
+        // allocated refers to the data tree directly.
+        final State localState = state;
+
+        if (!(localState instanceof Allocated)) {
+            // This can legally happen if the chain is shut down before the transaction was committed
+            // by the backend.
+            LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
+            return;
+        }
+
+        final Allocated allocated = (Allocated)localState;
+        final DOMStoreWriteTransaction tx = allocated.getTransaction();
+        if (!tx.equals(transaction)) {
+            LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
+            return;
+        }
+
+        if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
+            LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state);
+        }
+    }
+
+    /**
+     * Notify the base logic that a previously-submitted transaction has failed.
+     *
+     * @param transaction Transaction which failed.
+     * @param cause Failure cause
+     */
+    protected final void onTransactionFailed(final SnapshotBackedWriteTransaction<T> transaction, final Throwable cause) {
+        LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, cause);
+        state = FAILED;
+    }
+
+    /**
+     * Return the next transaction identifier.
+     *
+     * @return transaction identifier.
+     */
+    protected abstract T nextTransactionIdentifier();
+
+    /**
+     * Inquire as to whether transactions should record their allocation context.
+     *
+     * @return True if allocation context should be recorded.
+     */
+    protected abstract boolean getDebugTransactions();
+
+    /**
+     * Take a fresh {@link DataTreeSnapshot} from the backend.
+     *
+     * @return A new snapshot.
+     */
+    protected abstract DataTreeSnapshot takeSnapshot();
+
+    /**
+     * Create a cohort for driving the transaction through the commit process.
+     *
+     * @param transaction Transaction handle
+     * @param modification {@link DataTreeModification} which needs to be applied to the backend
+     * @return A {@link DOMStoreThreePhaseCommitCohort} cohort.
+     */
+    protected abstract DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<T> transaction, final DataTreeModification modification);
+}
@@ -5,16 +5,15 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.annotations.Beta;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
@@ -28,14 +27,21 @@ import org.slf4j.LoggerFactory;
  * Implementation of read-only transaction backed by {@link DataTreeSnapshot}
  * which delegates most of its calls to similar methods provided by underlying snapshot.
  *
+ * <T> identifier type
  */
-final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction<Object>
-                                          implements DOMStoreReadTransaction {
-
+@Beta
+public final class SnapshotBackedReadTransaction<T> extends AbstractDOMStoreTransaction<T> implements DOMStoreReadTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadTransaction.class);
     private volatile DataTreeSnapshot stableSnapshot;
 
-    public SnapshotBackedReadTransaction(final Object identifier, final boolean debug, final DataTreeSnapshot snapshot) {
+    /**
+     * Creates a new read-only transaction.
+     *
+     * @param identifier Transaction Identifier
+     * @param debug Enable transaction debugging
+     * @param snapshot Snapshot which will be modified.
+     */
+    SnapshotBackedReadTransaction(final T identifier, final boolean debug, final DataTreeSnapshot snapshot) {
         super(identifier, debug);
         this.stableSnapshot = Preconditions.checkNotNull(snapshot);
         LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot);
@@ -71,8 +77,7 @@ final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction<Ob
         checkNotNull(path, "Path must not be null.");
 
         try {
-            return Futures.immediateCheckedFuture(
-                read(path).checkedGet().isPresent());
+            return Futures.immediateCheckedFuture(read(path).checkedGet().isPresent());
         } catch (ReadFailedException e) {
             return Futures.immediateFailedCheckedFuture(e);
         }
@@ -5,14 +5,15 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.annotations.Beta;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
@@ -23,20 +24,15 @@ import org.slf4j.LoggerFactory;
  * Implementation of Read-Write transaction which is backed by {@link DataTreeSnapshot}
  * and executed according to {@link TransactionReadyPrototype}.
  *
+ * @param <T> identifier type
  */
-final class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements DOMStoreReadWriteTransaction {
+@Beta
+public final class SnapshotBackedReadWriteTransaction<T> extends SnapshotBackedWriteTransaction<T> implements DOMStoreReadWriteTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadWriteTransaction.class);
 
-    /**
-     * Creates new read-write transaction.
-     *
-     * @param identifier transaction Identifier
-     * @param snapshot Snapshot which will be modified.
-     * @param readyImpl Implementation of ready method.
-     */
-    protected SnapshotBackedReadWriteTransaction(final Object identifier, final boolean debug,
-            final DataTreeSnapshot snapshot, final TransactionReadyPrototype store) {
-        super(identifier, debug, snapshot, store);
+    SnapshotBackedReadWriteTransaction(final T identifier, final boolean debug,
+            final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+        super(identifier, debug, snapshot, readyImpl);
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedTransactions.java b/opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/SnapshotBackedTransactions.java
new file mode 100644 (file)
index 0000000..3368c8a
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.spi.data;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+/**
+ * Public utility class for instantiating snapshot-backed transactions.
+ */
+@Beta
+public final class SnapshotBackedTransactions {
+    private SnapshotBackedTransactions() {
+        throw new UnsupportedOperationException("Utility class");
+    }
+
+    /**
+     * Creates a new read-only transaction.
+     *
+     * @param identifier Transaction Identifier
+     * @param debug Enable transaction debugging
+     * @param snapshot Snapshot which will be modified.
+     */
+    public static <T> SnapshotBackedReadTransaction<T> newReadTransaction(final T identifier, final boolean debug, final DataTreeSnapshot snapshot) {
+        return new SnapshotBackedReadTransaction<T>(identifier, debug, snapshot);
+    }
+
+    /**
+     * Creates a new read-write transaction.
+     *
+     * @param identifier transaction Identifier
+     * @param debug Enable transaction debugging
+     * @param snapshot Snapshot which will be modified.
+     * @param readyImpl Implementation of ready method.
+     */
+    public static <T> SnapshotBackedReadWriteTransaction<T> newReadWriteTransaction(final T identifier, final boolean debug,
+            final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+        return new SnapshotBackedReadWriteTransaction<T>(identifier, debug, snapshot, readyImpl);
+    }
+
+    /**
+     * Creates a new write-only transaction.
+     *
+     * @param identifier transaction Identifier
+     * @param debug Enable transaction debugging
+     * @param snapshot Snapshot which will be modified.
+     * @param readyImpl Implementation of ready method.
+     */
+    public static <T> SnapshotBackedWriteTransaction<T> newWriteTransaction(final T identifier, final boolean debug,
+            final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+        return new SnapshotBackedWriteTransaction<T>(identifier, debug, snapshot, readyImpl);
+    }
+}
@@ -5,17 +5,15 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
 
 import static com.google.common.base.Preconditions.checkState;
+import com.google.common.annotations.Beta;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
@@ -26,33 +24,27 @@ import org.slf4j.LoggerFactory;
 /**
  * Implementation of Write transaction which is backed by
  * {@link DataTreeSnapshot} and executed according to
- * {@link org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype}.
+ * {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype}.
  *
+ * @param <T> Identifier type
  */
-class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object> implements DOMStoreWriteTransaction {
+@Beta
+public class SnapshotBackedWriteTransaction<T> extends AbstractDOMStoreTransaction<T> implements DOMStoreWriteTransaction {
     private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedWriteTransaction.class);
+    @SuppressWarnings("rawtypes")
     private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, TransactionReadyPrototype> READY_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, TransactionReadyPrototype.class, "readyImpl");
+    @SuppressWarnings("rawtypes")
     private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, DataTreeModification> TREE_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, DataTreeModification.class, "mutableTree");
 
     // non-null when not ready
-    private volatile TransactionReadyPrototype readyImpl;
+    private volatile TransactionReadyPrototype<T> readyImpl;
     // non-null when not committed/closed
     private volatile DataTreeModification mutableTree;
 
-    /**
-     * Creates new write-only transaction.
-     *
-     * @param identifier
-     *            transaction Identifier
-     * @param snapshot
-     *            Snapshot which will be modified.
-     * @param readyImpl
-     *            Implementation of ready method.
-     */
-    public SnapshotBackedWriteTransaction(final Object identifier, final boolean debug,
-            final DataTreeSnapshot snapshot, final TransactionReadyPrototype readyImpl) {
+    SnapshotBackedWriteTransaction(final T identifier, final boolean debug,
+            final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
         super(identifier, debug);
         this.readyImpl = Preconditions.checkNotNull(readyImpl, "readyImpl must not be null.");
         mutableTree = snapshot.newModification();
@@ -126,7 +118,7 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object>
      * @param path Path to read
      * @return null if the the transaction has been closed;
      */
-    protected final Optional<NormalizedNode<?, ?>> readSnapshotNode(final YangInstanceIdentifier path) {
+    final Optional<NormalizedNode<?, ?>> readSnapshotNode(final YangInstanceIdentifier path) {
         return readyImpl == null ? null : mutableTree.readNode(path);
     }
 
@@ -136,7 +128,8 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object>
 
     @Override
     public DOMStoreThreePhaseCommitCohort ready() {
-        final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null);
+        @SuppressWarnings("unchecked")
+        final TransactionReadyPrototype<T> wasReady = READY_UPDATER.getAndSet(this, null);
         checkState(wasReady != null, "Transaction %s is no longer open", getIdentifier());
 
         LOG.debug("Store transaction: {} : Ready", getIdentifier());
@@ -149,7 +142,8 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object>
 
     @Override
     public void close() {
-        final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null);
+        @SuppressWarnings("unchecked")
+        final TransactionReadyPrototype<T> wasReady = READY_UPDATER.getAndSet(this, null);
         if (wasReady != null) {
             LOG.debug("Store transaction: {} : Closed", getIdentifier());
             TREE_UPDATER.lazySet(this, null);
@@ -166,21 +160,22 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object>
 
     /**
      * Prototype implementation of
-     * {@link #ready(org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction)}
+     * {@link #ready(org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction)}
      *
      * This class is intended to be implemented by Transaction factories
-     * responsible for allocation of {@link org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction} and
+     * responsible for allocation of {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction} and
      * providing underlying logic for applying implementation.
      *
+     * @param <T> identifier type
      */
-    abstract static class TransactionReadyPrototype {
+    public abstract static class TransactionReadyPrototype<T> {
         /**
          * Called when a transaction is closed without being readied. This is not invoked for
          * transactions which are ready.
          *
          * @param tx Transaction which got aborted.
          */
-        protected abstract void transactionAborted(final SnapshotBackedWriteTransaction tx);
+        protected abstract void transactionAborted(final SnapshotBackedWriteTransaction<T> tx);
 
         /**
          * Returns a commit coordinator associated with supplied transactions.
@@ -193,6 +188,6 @@ class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object>
          *            Modified data tree which has been constructed.
          * @return DOMStoreThreePhaseCommitCohort associated with transaction
          */
-        protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction tx, DataTreeModification tree);
+        protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction<T> tx, DataTreeModification tree);
     }
 }
\ No newline at end of file
index 05e3d5cb26e5944dc46de6a6d3e7b7fc87ad56b3..35d891dac025f7acc5cfb0095f40974ad2dfb601 100644 (file)
@@ -8,44 +8,24 @@
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
-final class ChainedTransactionCommitImpl extends ForwardingDOMStoreThreePhaseCommitCohort {
-    private final SnapshotBackedWriteTransaction transaction;
-    private final DOMStoreThreePhaseCommitCohort delegate;
+final class ChainedTransactionCommitImpl extends InMemoryDOMStoreThreePhaseCommitCohort {
     private final DOMStoreTransactionChainImpl txChain;
 
-    ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
-            final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
-        this.transaction = Preconditions.checkNotNull(transaction);
-        this.delegate = Preconditions.checkNotNull(delegate);
+    ChainedTransactionCommitImpl(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction<String> transaction,
+        final DataTreeModification modification, final DOMStoreTransactionChainImpl txChain) {
+        super(store, transaction, modification);
         this.txChain = Preconditions.checkNotNull(txChain);
     }
 
-    @Override
-    protected DOMStoreThreePhaseCommitCohort delegate() {
-        return delegate;
-    }
-
     @Override
     public ListenableFuture<Void> commit() {
-        ListenableFuture<Void> commitFuture = super.commit();
-        Futures.addCallback(commitFuture, new FutureCallback<Void>() {
-            @Override
-            public void onFailure(final Throwable t) {
-                txChain.onTransactionFailed(transaction, t);
-            }
-
-            @Override
-            public void onSuccess(final Void result) {
-                txChain.onTransactionCommited(transaction);
-            }
-        });
-        return commitFuture;
+        ListenableFuture<Void> ret = super.commit();
+        txChain.transactionCommited(getTransaction());
+        return ret;
     }
 
 }
\ No newline at end of file
index 3f731cf18b66bb9305cc1ef40b867f4904343f04..2cf79d899b3682bf06696e2465ad035b56c9e2a2 100644 (file)
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
 import com.google.common.base.Preconditions;
-import java.util.AbstractMap.SimpleEntry;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-final class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
-    private static abstract class State {
-        /**
-         * Allocate a new snapshot.
-         *
-         * @return A new snapshot
-         */
-        protected abstract DataTreeSnapshot getSnapshot();
-    }
-
-    private static final class Idle extends State {
-        private final InMemoryDOMDataStore store;
-
-        Idle(final InMemoryDOMDataStore store) {
-            this.store = Preconditions.checkNotNull(store);
-        }
-
-        @Override
-        protected DataTreeSnapshot getSnapshot() {
-            return store.takeSnapshot();
-        }
-    }
-
-    /**
-     * We have a transaction out there.
-     */
-    private static final class Allocated extends State {
-        private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
-                AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
-        private final DOMStoreWriteTransaction transaction;
-        private volatile DataTreeSnapshot snapshot;
-
-        Allocated(final DOMStoreWriteTransaction transaction) {
-            this.transaction = Preconditions.checkNotNull(transaction);
-        }
-
-        public DOMStoreWriteTransaction getTransaction() {
-            return transaction;
-        }
-
-        @Override
-        protected DataTreeSnapshot getSnapshot() {
-            final DataTreeSnapshot ret = snapshot;
-            Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
-            return ret;
-        }
-
-        void setSnapshot(final DataTreeSnapshot snapshot) {
-            final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
-            Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier());
-        }
-    }
-
-    /**
-     * Chain is logically shut down, no further allocation allowed.
-     */
-    private static final class Shutdown extends State {
-        private final String message;
-
-        Shutdown(final String message) {
-            this.message = Preconditions.checkNotNull(message);
-        }
-
-        @Override
-        protected DataTreeSnapshot getSnapshot() {
-            throw new IllegalStateException(message);
-        }
-    }
-
-    private static final AtomicReferenceFieldUpdater<DOMStoreTransactionChainImpl, State> STATE_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(DOMStoreTransactionChainImpl.class, State.class, "state");
-    private static final Logger LOG = LoggerFactory.getLogger(DOMStoreTransactionChainImpl.class);
-    private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed");
-    private static final Shutdown FAILED = new Shutdown("Transaction chain has failed");
+final class DOMStoreTransactionChainImpl extends AbstractSnapshotBackedTransactionChain<String> {
     private final InMemoryDOMDataStore store;
-    private final Idle idleState;
-    private volatile State state;
 
     DOMStoreTransactionChainImpl(final InMemoryDOMDataStore store) {
         this.store = Preconditions.checkNotNull(store);
-        idleState = new Idle(store);
-        state = idleState;
-    }
-
-    private Entry<State, DataTreeSnapshot> getSnapshot() {
-        final State localState = state;
-        return new SimpleEntry<>(localState, localState.getSnapshot());
-    }
-
-    private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
-        final State state = new Allocated(transaction);
-        return STATE_UPDATER.compareAndSet(this, expected, state);
     }
 
     @Override
-    public DOMStoreReadTransaction newReadOnlyTransaction() {
-        final Entry<State, DataTreeSnapshot> entry = getSnapshot();
-        return new SnapshotBackedReadTransaction(store.nextIdentifier(), store.getDebugTransactions(), entry.getValue());
+    protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification modification) {
+        return new ChainedTransactionCommitImpl(store, tx, modification, this);
     }
 
     @Override
-    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        Entry<State, DataTreeSnapshot> entry;
-        DOMStoreReadWriteTransaction ret;
-
-        do {
-            entry = getSnapshot();
-            ret = new SnapshotBackedReadWriteTransaction(store.nextIdentifier(),
-                store.getDebugTransactions(), entry.getValue(), this);
-        } while (!recordTransaction(entry.getKey(), ret));
-
-        return ret;
-    }
-
-    @Override
-    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        Entry<State, DataTreeSnapshot> entry;
-        DOMStoreWriteTransaction ret;
-
-        do {
-            entry = getSnapshot();
-            ret = new SnapshotBackedWriteTransaction(store.nextIdentifier(),
-                store.getDebugTransactions(), entry.getValue(), this);
-        } while (!recordTransaction(entry.getKey(), ret));
-
-        return ret;
+    protected DataTreeSnapshot takeSnapshot() {
+        return store.takeSnapshot();
     }
 
     @Override
-    protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
-        final State localState = state;
-        if (localState instanceof Allocated) {
-            final Allocated allocated = (Allocated)localState;
-            if (allocated.getTransaction().equals(tx)) {
-                final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
-                if (!success) {
-                    LOG.info("State already transitioned from {} to {}", localState, state);
-                }
-            }
-        }
+    protected String nextTransactionIdentifier() {
+        return store.nextIdentifier();
     }
 
     @Override
-    protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
-        final State localState = state;
-
-        if (localState instanceof Allocated) {
-            final Allocated allocated = (Allocated)localState;
-            final DOMStoreWriteTransaction transaction = allocated.getTransaction();
-            Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
-            allocated.setSnapshot(tree);
-        } else {
-            LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
-        }
-
-        return new ChainedTransactionCommitImpl(tx, store.transactionReady(tx, tree), this);
+    protected boolean getDebugTransactions() {
+        return store.getDebugTransactions();
     }
 
-    @Override
-    public void close() {
-        final State localState = state;
-
-        do {
-            Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this);
-
-            if (FAILED.equals(localState)) {
-                LOG.debug("Ignoring user close in failed state");
-                return;
-            }
-        } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED));
-    }
-
-    void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, final Throwable t) {
-        LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, t);
-        state = FAILED;
-    }
-
-    void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
-        // If the committed transaction was the one we allocated last,
-        // we clear it and the ready snapshot, so the next transaction
-        // allocated refers to the data tree directly.
-        final State localState = state;
-
-        if (!(localState instanceof Allocated)) {
-            LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
-            return;
-        }
-
-        final Allocated allocated = (Allocated)localState;
-        final DOMStoreWriteTransaction tx = allocated.getTransaction();
-        if (!tx.equals(transaction)) {
-            LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
-            return;
-        }
-
-        if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
-            LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state);
-        }
+    void transactionCommited(final SnapshotBackedWriteTransaction<String> transaction) {
+        super.onTransactionCommited(transaction);
     }
-}
\ No newline at end of file
+}
index 354abcf69fe1040a1e24822b7db78aa6e674909d..a85d8ac3fb645ecc5a5815df3aeaf46bb8fc4187 100644 (file)
@@ -7,22 +7,15 @@
  */
 package org.opendaylight.controller.md.sal.dom.store.impl;
 
-import static com.google.common.base.Preconditions.checkState;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -30,6 +23,9 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -38,7 +34,6 @@ import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
@@ -55,14 +50,12 @@ import org.slf4j.LoggerFactory;
  *
  * Implementation of {@link DOMStore} which uses {@link DataTree} and other
  * classes such as {@link SnapshotBackedWriteTransaction}.
- * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
+ * {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
  * to implement {@link DOMStore} contract.
  *
  */
-public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
+public class InMemoryDOMDataStore extends TransactionReadyPrototype<String> implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
-    private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
-    private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
 
     private static final Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
             new Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent>() {
@@ -120,17 +113,17 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new SnapshotBackedReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot());
+        return SnapshotBackedTransactions.newReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot());
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new SnapshotBackedReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
+        return SnapshotBackedTransactions.newReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new SnapshotBackedWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
+        return SnapshotBackedTransactions.newWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
     }
 
     @Override
@@ -214,99 +207,31 @@ public class InMemoryDOMDataStore extends TransactionReadyPrototype implements D
     }
 
     @Override
-    protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+    protected void transactionAborted(final SnapshotBackedWriteTransaction<String> tx) {
         LOG.debug("Tx: {} is closed.", tx.getIdentifier());
     }
 
     @Override
-    protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
-        LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), tree);
-        return new ThreePhaseCommitImpl(tx, tree);
+    protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification modification) {
+        LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), modification);
+        return new InMemoryDOMStoreThreePhaseCommitCohort(this, tx, modification);
     }
 
-    Object nextIdentifier() {
+    String nextIdentifier() {
         return name + "-" + txCounter.getAndIncrement();
     }
 
-    private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
-        final Throwable ctx = transaction.getDebugContext();
-        if (ctx != null) {
-            LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
-        }
+    void validate(final DataTreeModification modification) throws DataValidationFailedException {
+        dataTree.validate(modification);
     }
 
-    private final class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
-        private final SnapshotBackedWriteTransaction transaction;
-        private final DataTreeModification modification;
-
-        private ResolveDataChangeEventsTask listenerResolver;
-        private DataTreeCandidate candidate;
-
-        public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction, final DataTreeModification modification) {
-            this.transaction = writeTransaction;
-            this.modification = modification;
-        }
-
-        @Override
-        public ListenableFuture<Boolean> canCommit() {
-            try {
-                dataTree.validate(modification);
-                LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
-                return CAN_COMMIT_FUTURE;
-            } catch (ConflictingModificationAppliedException e) {
-                LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
-                        e.getPath());
-                warnDebugContext(transaction);
-                return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
-            } catch (DataValidationFailedException e) {
-                LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
-                        e.getPath(), e);
-                warnDebugContext(transaction);
-
-                // For debugging purposes, allow dumping of the modification. Coupled with the above
-                // precondition log, it should allow us to understand what went on.
-                LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, dataTree);
-
-                return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
-            } catch (Exception e) {
-                LOG.warn("Unexpected failure in validation phase", e);
-                return Futures.immediateFailedFuture(e);
-            }
-        }
-
-        @Override
-        public ListenableFuture<Void> preCommit() {
-            try {
-                candidate = dataTree.prepare(modification);
-                listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
-                return SUCCESSFUL_FUTURE;
-            } catch (Exception e) {
-                LOG.warn("Unexpected failure in pre-commit phase", e);
-                return Futures.immediateFailedFuture(e);
-            }
-        }
-
-        @Override
-        public ListenableFuture<Void> abort() {
-            candidate = null;
-            return SUCCESSFUL_FUTURE;
-        }
-
-        @Override
-        public ListenableFuture<Void> commit() {
-            checkState(candidate != null, "Proposed subtree must be computed");
-
-            /*
-             * The commit has to occur atomically with regard to listener
-             * registrations.
-             */
-            synchronized (InMemoryDOMDataStore.this) {
-                dataTree.commit(candidate);
-                changePublisher.publishChange(candidate);
-                listenerResolver.resolve(dataChangeListenerNotificationManager);
-            }
+    DataTreeCandidate prepare(final DataTreeModification modification) {
+        return dataTree.prepare(modification);
+    }
 
-            return SUCCESSFUL_FUTURE;
-        }
+    synchronized void commit(final DataTreeCandidate candidate) {
+        dataTree.commit(candidate);
+        changePublisher.publishChange(candidate);
+        ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(dataChangeListenerNotificationManager);
     }
 }
diff --git a/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-inmemory-datastore/src/main/java/org/opendaylight/controller/md/sal/dom/store/impl/InMemoryDOMStoreThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..dba71bf
--- /dev/null
@@ -0,0 +1,100 @@
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InMemoryDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+    private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMStoreThreePhaseCommitCohort.class);
+    private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
+    private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+    private final SnapshotBackedWriteTransaction<String> transaction;
+    private final DataTreeModification modification;
+    private final InMemoryDOMDataStore store;
+    private DataTreeCandidate candidate;
+
+    public InMemoryDOMStoreThreePhaseCommitCohort(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction<String> writeTransaction, final DataTreeModification modification) {
+        this.transaction = Preconditions.checkNotNull(writeTransaction);
+        this.modification = Preconditions.checkNotNull(modification);
+        this.store = Preconditions.checkNotNull(store);
+    }
+
+    private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
+        final Throwable ctx = transaction.getDebugContext();
+        if (ctx != null) {
+            LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
+        }
+    }
+
+    @Override
+    public final ListenableFuture<Boolean> canCommit() {
+        try {
+            store.validate(modification);
+            LOG.debug("Store Transaction: {} can be committed", getTransaction().getIdentifier());
+            return CAN_COMMIT_FUTURE;
+        } catch (ConflictingModificationAppliedException e) {
+            LOG.warn("Store Tx: {} Conflicting modification for {}.", getTransaction().getIdentifier(),
+                    e.getPath());
+            warnDebugContext(getTransaction());
+            return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
+        } catch (DataValidationFailedException e) {
+            LOG.warn("Store Tx: {} Data Precondition failed for {}.", getTransaction().getIdentifier(),
+                    e.getPath(), e);
+            warnDebugContext(getTransaction());
+
+            // For debugging purposes, allow dumping of the modification. Coupled with the above
+            // precondition log, it should allow us to understand what went on.
+            LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, store);
+
+            return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
+        } catch (Exception e) {
+            LOG.warn("Unexpected failure in validation phase", e);
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public final ListenableFuture<Void> preCommit() {
+        try {
+            candidate = store.prepare(modification);
+            return SUCCESSFUL_FUTURE;
+        } catch (Exception e) {
+            LOG.warn("Unexpected failure in pre-commit phase", e);
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public final ListenableFuture<Void> abort() {
+        candidate = null;
+        return SUCCESSFUL_FUTURE;
+    }
+
+    protected final SnapshotBackedWriteTransaction<String> getTransaction() {
+        return transaction;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        checkState(candidate != null, "Proposed subtree must be computed");
+
+        /*
+         * The commit has to occur atomically with regard to listener
+         * registrations.
+         */
+        store.commit(candidate);
+        return SUCCESSFUL_FUTURE;
+    }
+}
\ No newline at end of file
index 9de4892d9140b1afc7ea3a93ef532ca1bf97edeb..568f88376cbc816b9ea151a68fb36b1d4a5f15a7 100644 (file)
@@ -21,12 +21,13 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
@@ -37,7 +38,6 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-
 public class InMemoryDataStoreTest {
 
     private SchemaContext schemaContext;
@@ -268,7 +268,7 @@ public class InMemoryDataStoreTest {
         Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockSnapshot )
         .readNode( Mockito.any( YangInstanceIdentifier.class ) );
 
-        DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction("1", true, mockSnapshot);
+        DOMStoreReadTransaction readTx = SnapshotBackedTransactions.newReadTransaction("1", true, mockSnapshot);
 
         doReadAndThrowEx( readTx );
     }
@@ -292,14 +292,14 @@ public class InMemoryDataStoreTest {
         Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockModification )
         .readNode( Mockito.any( YangInstanceIdentifier.class ) );
         Mockito.doReturn( mockModification ).when( mockSnapshot ).newModification();
-        TransactionReadyPrototype mockReady = Mockito.mock( TransactionReadyPrototype.class );
-        DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction("1", false, mockSnapshot, mockReady);
+        @SuppressWarnings("unchecked")
+        TransactionReadyPrototype<String> mockReady = Mockito.mock( TransactionReadyPrototype.class );
+        DOMStoreReadTransaction readTx = SnapshotBackedTransactions.newReadWriteTransaction("1", false, mockSnapshot, mockReady);
 
         doReadAndThrowEx( readTx );
     }
 
-    private void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable {
-
+    private static void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable {
         try {
             readTx.read(TestModel.TEST_PATH).get();
         } catch( ExecutionException e ) {
index e11cac2eb3fb0bc2e5a18abc219c239ce1587cad..10399ffeffa85a20b7bb0c3c8de77aff069a86ac 100644 (file)
@@ -27,6 +27,9 @@ import org.opendaylight.controller.sal.restconf.impl.NormalizedNodeContext;
 import org.opendaylight.controller.sal.restconf.impl.RestconfDocumentedException;
 import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorTag;
 import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType;
+import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
@@ -84,9 +87,18 @@ public class JsonNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPr
             final JsonReader reader = new JsonReader(new InputStreamReader(entityStream));
             jsonParser.parse(reader);
 
-            final NormalizedNode<?, ?> partialResult = resultHolder.getResult();
+            NormalizedNode<?, ?> partialResult = resultHolder.getResult();
             final NormalizedNode<?, ?> result;
-            if(partialResult instanceof MapNode) {
+
+            // unwrap result from augmentation and choice nodes on PUT
+            if (!isPost()) {
+                while (partialResult instanceof AugmentationNode || partialResult instanceof ChoiceNode) {
+                    final Object childNode = ((DataContainerNode) partialResult).getValue().iterator().next();
+                    partialResult = (NormalizedNode<?, ?>) childNode;
+                }
+            }
+
+            if (partialResult instanceof MapNode) {
                 result = Iterables.getOnlyElement(((MapNode) partialResult).getValue());
             } else {
                 result = partialResult;
index 4257e172b4a5c22847b1f0469e810ab86dca828b..74a9bd2d313618be1b090e49c472bd7894457c50 100644 (file)
@@ -34,6 +34,8 @@ import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlUtils;
 import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory;
+import org.opendaylight.yangtools.yang.model.api.AugmentationSchema;
+import org.opendaylight.yangtools.yang.model.api.AugmentationTarget;
 import org.opendaylight.yangtools.yang.model.api.ChoiceCaseNode;
 import org.opendaylight.yangtools.yang.model.api.ChoiceSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
@@ -126,18 +128,25 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
         final String docRootElm = doc.getDocumentElement().getLocalName();
         final String schemaNodeName = pathContext.getSchemaNode().getQName().getLocalName();
 
+        // FIXME the factory instance should be cached if the schema context is the same
+        final DomToNormalizedNodeParserFactory parserFactory =
+                DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, pathContext.getSchemaContext());
+
         if (!schemaNodeName.equalsIgnoreCase(docRootElm)) {
             final DataSchemaNode foundSchemaNode = findSchemaNodeOrParentChoiceByName(schemaNode, docRootElm);
             if (foundSchemaNode != null) {
+                if (schemaNode instanceof AugmentationTarget) {
+                    final AugmentationSchema augmentSchemaNode = findCorrespondingAugment(schemaNode, foundSchemaNode);
+                    if (augmentSchemaNode != null) {
+                        return parserFactory.getAugmentationNodeParser().parse(elements, augmentSchemaNode);
+                    }
+                }
                 schemaNode = foundSchemaNode;
             }
         }
 
-        // FIXME the factory instance should be cached if the schema context is the same
-        final DomToNormalizedNodeParserFactory parserFactory =
-                DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, pathContext.getSchemaContext());
-
         NormalizedNode<?, ?> parsed = null;
+
         if(schemaNode instanceof ContainerSchemaNode) {
             return parserFactory.getContainerNodeParser().parse(Collections.singletonList(doc.getDocumentElement()), (ContainerSchemaNode) schemaNode);
         } else if(schemaNode instanceof ListSchemaNode) {
@@ -147,7 +156,6 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
             final ChoiceSchemaNode casted = (ChoiceSchemaNode) schemaNode;
             return parserFactory.getChoiceNodeParser().parse(elements, casted);
         }
-
         // FIXME : add another DataSchemaNode extensions e.g. LeafSchemaNode
 
         return parsed;
@@ -175,5 +183,17 @@ public class XmlNormalizedNodeBodyReader extends AbstractIdentifierAwareJaxRsPro
         }
         return null;
     }
+
+    private static AugmentationSchema findCorrespondingAugment(final DataSchemaNode parent, final DataSchemaNode child) {
+        if (parent instanceof AugmentationTarget && !((parent instanceof ChoiceCaseNode) || (parent instanceof ChoiceSchemaNode))) {
+            for (AugmentationSchema augmentation : ((AugmentationTarget) parent).getAvailableAugmentations()) {
+                DataSchemaNode childInAugmentation = augmentation.getDataChildByName(child.getQName());
+                if (childInAugmentation != null) {
+                    return augmentation;
+                }
+            }
+        }
+        return null;
+    }
 }
 
index 33795889a122536c14f9e6c61f368b8b7c949ece..8e88be6f501a14ccd5f843c9bafb72100bd3d53f 100644 (file)
@@ -828,12 +828,13 @@ public class RestconfImpl implements RestconfService {
             throw new RestconfDocumentedException("Input is required.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
         }
 
-        final URI payloadNS = payload.getData().getNodeType().getNamespace();
-        if (payloadNS == null) {
-            throw new RestconfDocumentedException(
-                    "Data has bad format. Root element node must have namespace (XML format) or module name(JSON format)",
-                    ErrorType.PROTOCOL, ErrorTag.UNKNOWN_NAMESPACE);
-        }
+        // FIXME: move this to parsing stage (we can have augmentation nodes here which do not have namespace)
+//        final URI payloadNS = payload.getData().getNodeType().getNamespace();
+//        if (payloadNS == null) {
+//            throw new RestconfDocumentedException(
+//                    "Data has bad format. Root element node must have namespace (XML format) or module name(JSON format)",
+//                    ErrorType.PROTOCOL, ErrorTag.UNKNOWN_NAMESPACE);
+//        }
 
         final DOMMountPoint mountPoint = payload.getInstanceIdentifierContext().getMountPoint();
         final InstanceIdentifierContext<?> iiWithData = payload.getInstanceIdentifierContext();
index 817b45f786308abaef45b85780eda9f6dbaa00c6..3476b11fbeaf6379c512ba8dbcc0ca3a3328330c 100644 (file)
@@ -43,8 +43,8 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -173,7 +173,7 @@ public class NetconfDeviceSimulator implements Closeable {
                 return input.getKey().getAST();
             }
         });
-        final Map<String, TreeMap<Date, URI>> namespaceContext = BuilderUtils.createYangNamespaceContext(
+        final Map<String, NavigableMap<Date, URI>> namespaceContext = BuilderUtils.createYangNamespaceContext(
                 asts.values(), Optional.<SchemaContext>absent());
 
         final ParseTreeWalker walker = new ParseTreeWalker();