CDS: use internal DataTree instance 98/18498/32
authorRobert Varga <rovarga@cisco.com>
Thu, 16 Apr 2015 14:30:47 +0000 (16:30 +0200)
committerRobert Varga <rovarga@cisco.com>
Wed, 22 Apr 2015 14:02:05 +0000 (16:02 +0200)
Instead of using a captive InMemoryDataStore instance, which is geared
toward interaction with a DOMDataBroker, use the baseline DataTree
instance and utility classes for implementing the data store behavior.

Since most of CDS is geared for interactions with the usual DOMStore
interfaces, this patch provides a set of bare-bone workalike classes,
but taking advantage of the safeties provided by the AKKA actor system.

Immediate benefit should be a speed up in backend processing, as we
eliminate intermediate Future instances when accessing and manipulating
data.

We also gain explicit control over DataTree lifecycle, which enables us
switching to DataTreeCandidate as the replication and persistence unit,
as opposed to individual modifications.

Change-Id: I031d0ed4d82dcc923fd377c13fefba2819923f9b
Signed-off-by: Robert Varga <rovarga@cisco.com>
35 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeChangeListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedDataTreeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderLocalDelegateFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/DeleteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MergeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/Modification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/MutableCompositeModification.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/modification/WriteModification.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java
new file mode 100644 (file)
index 0000000..dd8ec0b
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+/**
+ * Abstract base for transactions running on SharrdDataTree.
+ *
+ * @param <T> Backing transaction type.
+ */
+@NotThreadSafe
+abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot> {
+    private final T snapshot;
+    private final String id;
+    private boolean closed;
+
+    protected AbstractShardDataTreeTransaction(final String id, final T snapshot) {
+        this.snapshot = Preconditions.checkNotNull(snapshot);
+        this.id = Preconditions.checkNotNull(id);
+    }
+
+    final T getSnapshot() {
+        return snapshot;
+    }
+
+    final boolean isClosed() {
+        return closed;
+    }
+
+    /**
+     * Close this transaction and mark it as closed, allowing idempotent invocations.
+     *
+     * @return True if the transaction got closed by this method invocation.
+     */
+    protected final boolean close() {
+        if (closed) {
+            return false;
+        }
+
+        closed = true;
+        return true;
+    }
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot).toString();
+    }
+
+    abstract void abort();
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DOMTransactionFactory.java
deleted file mode 100644 (file)
index f243620..0000000
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
-import org.slf4j.Logger;
-
-/**
- * A factory for creating DOM transactions, either normal or chained.
- *
- * @author Thomas Pantelis
- */
-public class DOMTransactionFactory {
-
-    private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
-    private final InMemoryDOMDataStore store;
-    private final ShardStats shardMBean;
-    private final Logger log;
-    private final String name;
-
-    public DOMTransactionFactory(InMemoryDOMDataStore store, ShardStats shardMBean, Logger log, String name) {
-        this.store = store;
-        this.shardMBean = shardMBean;
-        this.log = log;
-        this.name = name;
-    }
-
-    @SuppressWarnings("unchecked")
-    public <T extends DOMStoreTransaction> T newTransaction(TransactionProxy.TransactionType type,
-            String transactionID, String transactionChainID) {
-
-        DOMStoreTransactionFactory factory = store;
-
-        if(!transactionChainID.isEmpty()) {
-            factory = transactionChains.get(transactionChainID);
-            if(factory == null) {
-                if(log.isDebugEnabled()) {
-                    log.debug("{}: Creating transaction with ID {} from chain {}", name, transactionID,
-                            transactionChainID);
-                }
-
-                DOMStoreTransactionChain transactionChain = store.createTransactionChain();
-                transactionChains.put(transactionChainID, transactionChain);
-                factory = transactionChain;
-            }
-        } else {
-            log.debug("{}: Creating transaction with ID {}", name, transactionID);
-        }
-
-        T transaction = null;
-        switch(type) {
-            case READ_ONLY:
-                transaction = (T) factory.newReadOnlyTransaction();
-                shardMBean.incrementReadOnlyTransactionCount();
-                break;
-            case READ_WRITE:
-                transaction = (T) factory.newReadWriteTransaction();
-                shardMBean.incrementReadWriteTransactionCount();
-                break;
-            case WRITE_ONLY:
-                transaction = (T) factory.newWriteOnlyTransaction();
-                shardMBean.incrementWriteOnlyTransactionCount();
-                break;
-        }
-
-        return transaction;
-    }
-
-    public void closeTransactionChain(String transactionChainID) {
-        DOMStoreTransactionChain chain =
-                transactionChains.remove(transactionChainID);
-
-        if(chain != null) {
-            chain.close();
-        }
-    }
-
-    public void closeAllTransactionChains() {
-        for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
-            entry.getValue().close();
-        }
-
-        transactionChains.clear();
-    }
-}
index 939ddf8..e6f63d7 100644 (file)
@@ -7,21 +7,24 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
-import java.util.ArrayList;
-import java.util.List;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener, ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
+final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener, ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> {
     private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
     private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
     private final List<ActorSelection> dataChangeListeners =  new ArrayList<>();
@@ -39,7 +42,12 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
         if (isLeader) {
             for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
                 if(!reg.isClosed()) {
-                    reg.setDelegate(createDelegate(reg.getRegisterChangeListener()));
+                    final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
+                            createDelegate(reg.getRegisterChangeListener());
+                    reg.setDelegate(res.getKey());
+                    if (res.getValue() != null) {
+                        reg.getInstance().onDataChanged(res.getValue());
+                    }
                 }
             }
 
@@ -52,16 +60,21 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
 
         LOG.debug("{}: registerDataChangeListener for {}, leader: {}", persistenceId(), message.getPath(), isLeader);
 
-        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
                                                      NormalizedNode<?, ?>>> registration;
+        final AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> event;
         if (isLeader) {
-            registration = createDelegate(message);
+            final Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> res =
+                    createDelegate(message);
+            registration = res.getKey();
+            event = res.getValue();
         } else {
             LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
             DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
             delayedListenerRegistrations.add(delayedReg);
             registration = delayedReg;
+            event = null;
         }
 
         ActorRef listenerRegistration = createActor(DataChangeListenerRegistration.props(registration));
@@ -70,10 +83,13 @@ final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<Registe
                 persistenceId(), listenerRegistration.path());
 
         tellSender(new RegisterChangeListenerReply(listenerRegistration));
+        if (event != null) {
+            registration.getInstance().onDataChanged(event);
+        }
     }
 
     @Override
-    ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> createDelegate(
+    Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> createDelegate(
             final RegisterChangeListener message) {
         ActorSelection dataChangeListenerPath = selectActor(message.getDataChangeListenerPath());
 
index 3987c9a..db5eeb8 100644 (file)
@@ -11,15 +11,18 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Map.Entry;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> {
+final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> {
     private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class);
     private final ArrayList<DelayedDataTreeListenerRegistration> delayedRegistrations = new ArrayList<>();
     private final Collection<ActorSelection> actors = new ArrayList<>();
@@ -49,6 +52,7 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<Reg
         LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader);
 
         final ListenerRegistration<DOMDataTreeChangeListener> registration;
+        final DataTreeCandidate event;
         if (!isLeader) {
             LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
 
@@ -56,8 +60,11 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<Reg
                     new DelayedDataTreeListenerRegistration(registerTreeChangeListener);
             delayedRegistrations.add(delayedReg);
             registration = delayedReg;
+            event = null;
         } else {
-            registration = createDelegate(registerTreeChangeListener);
+            final Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> res = createDelegate(registerTreeChangeListener);
+            registration = res.getKey();
+            event = res.getValue();
         }
 
         ActorRef listenerRegistration = createActor(DataTreeChangeListenerRegistrationActor.props(registration));
@@ -66,10 +73,13 @@ final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<Reg
             persistenceId(), listenerRegistration.path());
 
         tellSender(new RegisterDataTreeChangeListenerReply(listenerRegistration));
+        if (event != null) {
+            registration.getInstance().onDataTreeChanged(Collections.singletonList(event));
+        }
     }
 
     @Override
-    ListenerRegistration<DOMDataTreeChangeListener> createDelegate(final RegisterDataTreeChangeListener message) {
+    Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> createDelegate(final RegisterDataTreeChangeListener message) {
         ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
 
         // Notify the listener if notifications should be enabled or not
index b3ae8a3..e8cd310 100644 (file)
@@ -8,10 +8,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.Map.Entry;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 
 /**
  * Intermediate proxy registration returned to the user when we cannot
@@ -28,9 +31,13 @@ final class DelayedDataTreeListenerRegistration implements ListenerRegistration<
         this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener);
     }
 
-    synchronized void createDelegate(final DelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> factory) {
+    synchronized void createDelegate(final DelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> factory) {
         if (!closed) {
-            this.delegate = factory.createDelegate(registerTreeChangeListener);
+            final Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> res = factory.createDelegate(registerTreeChangeListener);
+            this.delegate = res.getKey();
+            if (res.getValue() != null) {
+                delegate.getInstance().onDataTreeChanged(Collections.singletonList(res.getValue()));
+            }
         }
     }
 
index e6702d9..3b9b7ad 100644 (file)
@@ -7,12 +7,15 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.Map.Entry;
+
 /**
  * Base class for factories instantiating delegates.
  *
  * <D> delegate type
  * <M> message type
+ * <I> initial state type
  */
-abstract class DelegateFactory<M, D> {
-    abstract D createDelegate(M message);
+abstract class DelegateFactory<M, D, I> {
+    abstract Entry<D, I> createDelegate(M message);
 }
index d33cebb..3f92773 100644 (file)
@@ -19,8 +19,9 @@ import com.google.common.base.Preconditions;
  *
  * <D> delegate type
  * <M> message type
+ * <I> initial state type
  */
-abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
+abstract class LeaderLocalDelegateFactory<M, D, I> extends DelegateFactory<M, D, I> {
     private final Shard shard;
 
     protected LeaderLocalDelegateFactory(final Shard shard) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadOnlyShardDataTreeTransaction.java
new file mode 100644 (file)
index 0000000..5926568
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+final class ReadOnlyShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeSnapshot> {
+    ReadOnlyShardDataTreeTransaction(final String id, final DataTreeSnapshot snapshot) {
+        super(id, snapshot);
+    }
+
+    @Override
+    void abort() {
+        close();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java
new file mode 100644 (file)
index 0000000..0f3ab61
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+
+final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
+    private final ShardDataTreeTransactionParent parent;
+
+    protected ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final String id, final DataTreeModification modification) {
+        super(id, modification);
+        this.parent = Preconditions.checkNotNull(parent);
+    }
+
+    @Override
+    void abort() {
+        Preconditions.checkState(close(), "Transaction is already closed");
+
+        parent.abortTransaction(this);
+    }
+
+    DOMStoreThreePhaseCommitCohort ready() {
+        Preconditions.checkState(close(), "Transaction is already closed");
+
+        return parent.finishTransaction(this);
+    }
+}
index 91e072b..b53d12c 100644 (file)
@@ -24,7 +24,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
@@ -64,9 +63,6 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
@@ -74,7 +70,7 @@ import scala.concurrent.duration.FiniteDuration;
 /**
  * A Shard represents a portion of the logical data tree <br/>
  * <p>
- * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ * Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
  * </p>
  */
 public class Shard extends RaftActor {
@@ -85,7 +81,7 @@ public class Shard extends RaftActor {
     static final String DEFAULT_NAME = "default";
 
     // The state of this Shard
-    private final InMemoryDOMDataStore store;
+    private final ShardDataTree store;
 
     /// The name of this shard
     private final String name;
@@ -104,8 +100,6 @@ public class Shard extends RaftActor {
 
     private final MessageTracker appendEntriesReplyTracker;
 
-    private final DOMTransactionFactory domTransactionFactory;
-
     private final ShardTransactionActorFactory transactionActorFactory;
 
     private final ShardSnapshotCohort snapshotCohort;
@@ -124,25 +118,17 @@ public class Shard extends RaftActor {
 
         LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
 
-        store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
-                datastoreContext.getDataStoreProperties());
-
-        if (schemaContext != null) {
-            store.onGlobalContextUpdated(schemaContext);
-        }
+        store = new ShardDataTree(schemaContext);
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
                 datastoreContext.getDataStoreMXBeanType());
-        shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
         shardMBean.setShardActor(getSelf());
 
         if (isMetricsCaptureEnabled()) {
             getContext().become(new MeteringBehavior(this));
         }
 
-        domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
-
-        commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
+        commitCoordinator = new ShardCommitCoordinator(store,
                 TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
 
@@ -154,7 +140,7 @@ public class Shard extends RaftActor {
         appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
 
-        transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+        transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(
                         Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
 
@@ -474,7 +460,7 @@ public class Shard extends RaftActor {
     }
 
     private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+        store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
     }
 
     private ActorRef createTypedTransactionActor(int transactionType,
@@ -515,13 +501,13 @@ public class Shard extends RaftActor {
     }
 
     private void commitWithNewTransaction(final Modification modification) {
-        DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
-        modification.apply(tx);
+        ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+        modification.apply(tx.getSnapshot());
         try {
             snapshotCohort.syncCommitTransaction(tx);
             shardMBean.incrementCommittedTransactionCount();
             shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-        } catch (InterruptedException | ExecutionException e) {
+        } catch (Exception e) {
             shardMBean.incrementFailedTransactionsCount();
             LOG.error("{}: Failed to commit", persistenceId(), e);
         }
@@ -533,7 +519,7 @@ public class Shard extends RaftActor {
 
     @VisibleForTesting
     void updateSchemaContext(final SchemaContext schemaContext) {
-        store.onGlobalContextUpdated(schemaContext);
+        store.updateSchemaContext(schemaContext);
     }
 
     private boolean isMetricsCaptureEnabled() {
@@ -622,7 +608,7 @@ public class Shard extends RaftActor {
                     persistenceId(), getId());
             }
 
-            domTransactionFactory.closeAllTransactionChains();
+            store.closeAllTransactionChains();
         }
     }
 
@@ -666,7 +652,7 @@ public class Shard extends RaftActor {
     }
 
     @VisibleForTesting
-    public InMemoryDOMDataStore getDataStore() {
+    public ShardDataTree getDataStore() {
         return store;
     }
 
index 4ff9b5f..0eb48fd 100644 (file)
@@ -31,7 +31,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.slf4j.Logger;
 
 /**
@@ -50,7 +49,7 @@ public class ShardCommitCoordinator {
 
     private CohortEntry currentCohortEntry;
 
-    private final DOMTransactionFactory transactionFactory;
+    private final ShardDataTree dataTree;
 
     private final Queue<CohortEntry> queuedCohortEntries;
 
@@ -75,13 +74,13 @@ public class ShardCommitCoordinator {
 
     private ReadyTransactionReply readyTransactionReply;
 
-    public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
+    public ShardCommitCoordinator(ShardDataTree dataTree,
             long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
 
         this.queueCapacity = queueCapacity;
         this.log = log;
         this.name = name;
-        this.transactionFactory = transactionFactory;
+        this.dataTree = Preconditions.checkNotNull(dataTree);
 
         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
                 removalListener(cacheRemovalListener).build();
@@ -162,8 +161,7 @@ public class ShardCommitCoordinator {
         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
-                    transactionFactory.<DOMStoreWriteTransaction>newTransaction(
-                        TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
+                    dataTree.newReadWriteTransaction(batched.getTransactionID(),
                         batched.getTransactionChainID()));
             cohortCache.put(batched.getTransactionID(), cohortEntry);
         }
@@ -417,15 +415,15 @@ public class ShardCommitCoordinator {
         private final String transactionID;
         private DOMStoreThreePhaseCommitCohort cohort;
         private final MutableCompositeModification compositeModification;
-        private final DOMStoreWriteTransaction transaction;
+        private final ReadWriteShardDataTreeTransaction transaction;
         private ActorRef replySender;
         private Shard shard;
         private long lastAccessTime;
         private boolean doImmediateCommit;
 
-        CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
+        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
             this.compositeModification = new MutableCompositeModification();
-            this.transaction = transaction;
+            this.transaction = Preconditions.checkNotNull(transaction);
             this.transactionID = transactionID;
         }
 
@@ -460,7 +458,7 @@ public class ShardCommitCoordinator {
         void applyModifications(Iterable<Modification> modifications) {
             for(Modification modification: modifications) {
                 compositeModification.addModification(modification);
-                modification.apply(transaction);
+                modification.apply(transaction.getSnapshot());
             }
         }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
new file mode 100644 (file)
index 0000000..373bf49
--- /dev/null
@@ -0,0 +1,166 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Strings;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
+import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
+ * e.g. it does not expose public interfaces and assumes it is only ever called from a
+ * single thread.
+ *
+ * This class is not part of the API contract and is subject to change at any time.
+ */
+@NotThreadSafe
+@VisibleForTesting
+public final class ShardDataTree extends ShardDataTreeTransactionParent {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
+    private static final ShardDataTreeNotificationManager MANAGER = new ShardDataTreeNotificationManager();
+    private final Map<String, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
+    private final ShardDataTreeChangePublisher treeChangePublisher = new ShardDataTreeChangePublisher();
+    private final ListenerTree listenerTree = ListenerTree.create();
+    private final TipProducingDataTree dataTree;
+
+    ShardDataTree(final SchemaContext schemaContext) {
+        dataTree = InMemoryDataTreeFactory.getInstance().create();
+        if (schemaContext != null) {
+            dataTree.setSchemaContext(schemaContext);
+        }
+    }
+
+    TipProducingDataTree getDataTree() {
+        return dataTree;
+    }
+
+    void updateSchemaContext(final SchemaContext schemaContext) {
+        dataTree.setSchemaContext(schemaContext);
+    }
+
+    private ShardDataTreeTransactionChain ensureTransactionChain(final String chainId) {
+        ShardDataTreeTransactionChain chain = transactionChains.get(chainId);
+        if (chain == null) {
+            chain = new ShardDataTreeTransactionChain(chainId, this);
+            transactionChains.put(chainId, chain);
+        }
+
+        return chain;
+    }
+
+    ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final String txId, final String chainId) {
+        if (Strings.isNullOrEmpty(chainId)) {
+            return new ReadOnlyShardDataTreeTransaction(txId, dataTree.takeSnapshot());
+        }
+
+        return ensureTransactionChain(chainId).newReadOnlyTransaction(txId);
+    }
+
+    ReadWriteShardDataTreeTransaction newReadWriteTransaction(final String txId, final String chainId) {
+        if (Strings.isNullOrEmpty(chainId)) {
+            return new ReadWriteShardDataTreeTransaction(this, txId, dataTree.takeSnapshot().newModification());
+        }
+
+        return ensureTransactionChain(chainId).newReadWriteTransaction(txId);
+    }
+
+    void notifyListeners(final DataTreeCandidateTip candidate) {
+        LOG.debug("Notifying listeners on candidate {}", candidate);
+
+        // DataTreeChanges first, as they are more light-weight
+        treeChangePublisher.publishChanges(candidate);
+
+        // DataChanges second, as they are heavier
+        ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(MANAGER);
+    }
+
+    void closeAllTransactionChains() {
+        for (ShardDataTreeTransactionChain chain : transactionChains.values()) {
+            chain.close();
+        }
+
+        transactionChains.clear();
+    }
+
+    void closeTransactionChain(final String transactionChainId) {
+        final ShardDataTreeTransactionChain chain = transactionChains.remove(transactionChainId);
+        if (chain != null) {
+            chain.close();
+        } else {
+            LOG.warn("Closing non-existent transaction chain {}", transactionChainId);
+        }
+    }
+
+    Entry<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>, DOMImmutableDataChangeEvent> registerChangeListener(
+            final YangInstanceIdentifier path,
+            final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, final DataChangeScope scope) {
+        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> reg =
+                listenerTree.registerDataChangeListener(path, listener, scope);
+
+        final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
+        final DOMImmutableDataChangeEvent event;
+        if (currentState.isPresent()) {
+            final NormalizedNode<?, ?> data = currentState.get();
+            event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE).setAfter(data).addCreated(path, data).build();
+        } else {
+            event = null;
+        }
+
+        return new SimpleEntry<>(reg, event);
+    }
+
+    Entry<ListenerRegistration<DOMDataTreeChangeListener>, DataTreeCandidate> registerTreeChangeListener(final YangInstanceIdentifier path,
+            final DOMDataTreeChangeListener listener) {
+        final ListenerRegistration<DOMDataTreeChangeListener> reg = treeChangePublisher.registerTreeChangeListener(path, listener);
+
+        final Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
+        final DataTreeCandidate event;
+        if (currentState.isPresent()) {
+            event = DataTreeCandidates.fromNormalizedNode(path, currentState.get());
+        } else {
+            event = null;
+        }
+        return new SimpleEntry<>(reg, event);
+    }
+
+    @Override
+    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
+        // Intentional no-op
+    }
+
+    @Override
+    DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+        final DataTreeModification snapshot = transaction.getSnapshot();
+        snapshot.ready();
+        return new ShardDataTreeCohort(this, snapshot);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeChangePublisher.java
new file mode 100644 (file)
index 0000000..5e24dcd
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collection;
+import java.util.Collections;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.md.sal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.spi.DefaultDataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@NotThreadSafe
+final class ShardDataTreeChangePublisher extends AbstractDOMStoreTreeChangePublisher {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeChangePublisher.class);
+
+    void publishChanges(final DataTreeCandidate candidate) {
+        processCandidateTree(candidate);
+    }
+
+    @Override
+    protected void notifyListeners(final Collection<AbstractDOMDataTreeChangeListenerRegistration<?>> registrations,
+            final YangInstanceIdentifier path, final DataTreeCandidateNode node) {
+        final Collection<DataTreeCandidate> changes = Collections.<DataTreeCandidate>singleton(new DefaultDataTreeCandidate(path, node));
+
+        for (AbstractDOMDataTreeChangeListenerRegistration<?> reg : registrations) {
+            reg.getInstance().onDataTreeChanged(changes);
+        }
+    }
+
+    @Override
+    protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
+        LOG.debug("Registration {} removed", registration);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java
new file mode 100644 (file)
index 0000000..11b3ca8
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ShardDataTreeCohort implements DOMStoreThreePhaseCommitCohort {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeCohort.class);
+    private static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+    private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
+    private final DataTreeModification transaction;
+    private final ShardDataTree dataTree;
+    private DataTreeCandidateTip candidate;
+
+    ShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) {
+        this.dataTree = Preconditions.checkNotNull(dataTree);
+        this.transaction = Preconditions.checkNotNull(transaction);
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        try {
+            dataTree.getDataTree().validate(transaction);
+            LOG.debug("Transaction {} validated", transaction);
+            return TRUE_FUTURE;
+        } catch (Exception e) {
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        try {
+            candidate = dataTree.getDataTree().prepare(transaction);
+            /*
+             * FIXME: this is the place where we should be interacting with persistence, specifically by invoking
+             *        persist on the candidate (which gives us a Future).
+             */
+            LOG.debug("Transaction {} prepared candidate {}", transaction, candidate);
+            return VOID_FUTURE;
+        } catch (Exception e) {
+            LOG.debug("Transaction {} failed to prepare", transaction, e);
+            return Futures.immediateFailedFuture(e);
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        // No-op, really
+        return VOID_FUTURE;
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        try {
+            dataTree.getDataTree().commit(candidate);
+        } catch (Exception e) {
+            LOG.error("Transaction {} failed to commit", transaction, e);
+            return Futures.immediateFailedFuture(e);
+        }
+
+        LOG.debug("Transaction {} committed, proceeding to notify", transaction);
+        dataTree.notifyListeners(candidate);
+        return VOID_FUTURE;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeNotificationManager.java
new file mode 100644 (file)
index 0000000..8a54fc6
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
+import org.opendaylight.controller.md.sal.dom.store.impl.DataChangeListenerRegistration;
+import org.opendaylight.yangtools.util.concurrent.NotificationManager;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ShardDataTreeNotificationManager implements NotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeNotificationManager.class);
+
+    @Override
+    public void submitNotification(final DataChangeListenerRegistration<?> listener, final DOMImmutableDataChangeEvent notification) {
+        LOG.debug("Notifying listener {} about {}", listener.getInstance(), notification);
+
+        listener.getInstance().onDataChanged(notification);
+    }
+
+    @Override
+    public void submitNotifications(final DataChangeListenerRegistration<?> listener, final Iterable<DOMImmutableDataChangeEvent> notifications) {
+        final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> instance = listener.getInstance();
+        LOG.debug("Notifying listener {} about {}", instance, notifications);
+
+        for (DOMImmutableDataChangeEvent n : notifications) {
+            instance.onDataChanged(n);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
new file mode 100644 (file)
index 0000000..780d940
--- /dev/null
@@ -0,0 +1,134 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A transaction chain attached to a Shard.
+ */
+@NotThreadSafe
+final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class);
+    private final ShardDataTree dataTree;
+    private final String chainId;
+
+    private ReadWriteShardDataTreeTransaction previousTx;
+    private ReadWriteShardDataTreeTransaction openTransaction;
+    private boolean closed;
+
+    ShardDataTreeTransactionChain(final String chainId, final ShardDataTree dataTree) {
+        this.dataTree = Preconditions.checkNotNull(dataTree);
+        this.chainId = Preconditions.checkNotNull(chainId);
+    }
+
+    private DataTreeSnapshot getSnapshot() {
+        Preconditions.checkState(!closed, "TransactionChain %s has been closed", this);
+        Preconditions.checkState(openTransaction == null, "Transaction %s is open", openTransaction);
+
+        if (previousTx == null) {
+            return dataTree.getDataTree().takeSnapshot();
+        } else {
+            return previousTx.getSnapshot();
+        }
+    }
+
+    ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final String txId) {
+        final DataTreeSnapshot snapshot = getSnapshot();
+        LOG.debug("Allocated read-only transaction {} snapshot {}", txId, snapshot);
+
+        return new ReadOnlyShardDataTreeTransaction(txId, snapshot);
+    }
+
+    ReadWriteShardDataTreeTransaction newReadWriteTransaction(final String txId) {
+        final DataTreeSnapshot snapshot = getSnapshot();
+        LOG.debug("Allocated read-write transaction {} snapshot {}", txId, snapshot);
+
+        openTransaction = new ReadWriteShardDataTreeTransaction(this, txId, snapshot.newModification());
+        return openTransaction;
+    }
+
+    void close() {
+        closed = true;
+    }
+
+    @Override
+    protected void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
+        if (transaction instanceof ReadWriteShardDataTreeTransaction) {
+            Preconditions.checkState(openTransaction != null, "Attempted to abort transaction %s while none is outstanding", transaction);
+            LOG.debug("Aborted transaction {}", transaction);
+            openTransaction = null;
+        }
+    }
+
+    @Override
+    protected DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+        Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction);
+
+        // dataTree is finalizing ready the transaction, we just record it for the next
+        // transaction in chain
+        final DOMStoreThreePhaseCommitCohort delegate = dataTree.finishTransaction(transaction);
+        openTransaction = null;
+        previousTx = transaction;
+        LOG.debug("Committing transaction {}", transaction);
+
+        return new CommitCohort(transaction, delegate);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("id", chainId).toString();
+    }
+
+    private final class CommitCohort extends ForwardingDOMStoreThreePhaseCommitCohort {
+        private final ReadWriteShardDataTreeTransaction transaction;
+        private final DOMStoreThreePhaseCommitCohort delegate;
+
+        CommitCohort(final ReadWriteShardDataTreeTransaction transaction, final DOMStoreThreePhaseCommitCohort delegate) {
+            this.transaction = Preconditions.checkNotNull(transaction);
+            this.delegate = Preconditions.checkNotNull(delegate);
+        }
+
+        @Override
+        protected DOMStoreThreePhaseCommitCohort delegate() {
+            return delegate;
+        }
+
+        @Override
+        public ListenableFuture<Void> commit() {
+            final ListenableFuture<Void> ret = super.commit();
+
+            Futures.addCallback(ret, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                    if (transaction.equals(previousTx)) {
+                        previousTx = null;
+                    }
+                    LOG.debug("Committed transaction {}", transaction);
+                }
+
+                @Override
+                public void onFailure(Throwable t) {
+                    LOG.error("Transaction {} commit failed, cannot recover", transaction, t);
+                }
+            });
+
+            return ret;
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java
new file mode 100644 (file)
index 0000000..6cc1408
--- /dev/null
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+abstract class ShardDataTreeTransactionParent {
+    abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
+    abstract DOMStoreThreePhaseCommitCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
+}
index 41ca486..f2c77e3 100644 (file)
@@ -13,17 +13,12 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -34,9 +29,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 public class ShardReadTransaction extends ShardTransaction {
     private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
 
-    private final DOMStoreReadTransaction transaction;
+    private final AbstractShardDataTreeTransaction<?> transaction;
 
-    public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
+    public ShardReadTransaction(AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
             ShardStats shardStats, String transactionID, short clientTxVersion) {
         super(shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
@@ -70,28 +65,16 @@ public class ShardReadTransaction extends ShardTransaction {
 
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
-        final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = transaction.read(DATASTORE_ROOT);
+        final Optional<NormalizedNode<?, ?>> result = transaction.getSnapshot().readNode(DATASTORE_ROOT);
 
-        Futures.addCallback(future, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
-            @Override
-            public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
-                byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get());
-                sender.tell(new CaptureSnapshotReply(serialized), self);
+        byte[] serialized = SerializationUtils.serializeNormalizedNode(result.get());
+        sender.tell(new CaptureSnapshotReply(serialized), self);
 
-                self.tell(PoisonPill.getInstance(), self);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                sender.tell(new akka.actor.Status.Failure(t), self);
-
-                self.tell(PoisonPill.getInstance(), self);
-            }
-        });
+        self.tell(PoisonPill.getInstance(), self);
     }
 
     @Override
-    protected DOMStoreTransaction getDOMStoreTransaction() {
+    protected AbstractShardDataTreeTransaction<?> getDOMStoreTransaction() {
         return transaction;
     }
 
index 2042e95..c90b2ae 100644 (file)
@@ -14,35 +14,30 @@ import akka.actor.ActorRef;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 
 /**
  * @author: syedbahm
  * Date: 8/6/14
  */
 public class ShardReadWriteTransaction extends ShardWriteTransaction {
-    private final DOMStoreReadWriteTransaction transaction;
-
-    public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
+    public ShardReadWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
             ShardStats shardStats, String transactionID, short clientTxVersion) {
         super(transaction, shardActor, shardStats, transactionID, clientTxVersion);
-        this.transaction = transaction;
     }
 
     @Override
     public void handleReceive(Object message) throws Exception {
         if (message instanceof ReadData) {
-            readData(transaction, (ReadData) message, !SERIALIZED_REPLY);
+            readData((ReadData) message, !SERIALIZED_REPLY);
 
         } else if (message instanceof DataExists) {
-            dataExists(transaction, (DataExists) message, !SERIALIZED_REPLY);
+            dataExists((DataExists) message, !SERIALIZED_REPLY);
 
         } else if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readData(transaction, ReadData.fromSerializable(message), SERIALIZED_REPLY);
+            readData(ReadData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            dataExists(transaction, DataExists.fromSerializable(message), SERIALIZED_REPLY);
-
+            dataExists(DataExists.fromSerializable(message), SERIALIZED_REPLY);
         } else {
             super.handleReceive(message);
         }
index 01a124b..f9d3050 100644 (file)
@@ -17,11 +17,12 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.slf4j.Logger;
 
 /**
@@ -31,16 +32,16 @@ import org.slf4j.Logger;
  * committed to the data store in the order the corresponding snapshot or log batch are received
  * to preserve data store integrity.
  *
- * @author Thomas Panetelis
+ * @author Thomas Pantelis
  */
 class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
-
-    private final InMemoryDOMDataStore store;
+    private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
+    private final ShardDataTree store;
     private List<ModificationPayload> currentLogRecoveryBatch;
     private final String shardName;
     private final Logger log;
 
-    ShardRecoveryCoordinator(InMemoryDOMDataStore store, String shardName, Logger log) {
+    ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) {
         this.store = store;
         this.shardName = shardName;
         this.log = log;
@@ -73,8 +74,8 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
 
     }
 
-    private void commitTransaction(DOMStoreWriteTransaction transaction) {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+    private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) {
+        DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction);
         try {
             commitCohort.preCommit().get();
             commitCohort.commit().get();
@@ -90,10 +91,11 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
     public void applyCurrentLogRecoveryBatch() {
         log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
 
-        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
-        for(ModificationPayload payload: currentLogRecoveryBatch) {
+        ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null);
+        DataTreeModification snapshot = writeTx.getSnapshot();
+        for (ModificationPayload payload : currentLogRecoveryBatch) {
             try {
-                MutableCompositeModification.fromSerializable(payload.getModification()).apply(writeTx);
+                MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot);
             } catch (Exception e) {
                 log.error("{}: Error extracting ModificationPayload", shardName, e);
             }
@@ -111,14 +113,21 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
      */
     @Override
     public void applyRecoverySnapshot(final byte[] snapshotBytes) {
-        log.debug("{}: Applyng recovered sbapshot", shardName);
+        log.debug("{}: Applying recovered snapshot", shardName);
 
-        DOMStoreWriteTransaction writeTx = store.newWriteOnlyTransaction();
+        // Intentionally bypass normal transaction to side-step persistence/replication
+        final DataTree tree = store.getDataTree();
+        DataTreeModification writeTx = tree.takeSnapshot().newModification();
 
         NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
-        writeTx.write(YangInstanceIdentifier.builder().build(), node);
-
-        commitTransaction(writeTx);
+        writeTx.write(ROOT, node);
+        writeTx.ready();
+        try {
+            tree.validate(writeTx);
+            tree.commit(tree.prepare(writeTx));
+        } catch (DataValidationFailedException e) {
+            log.error("{}: Failed to validate recovery snapshot", shardName, e);
+        }
     }
 }
index c59085d..35d8e92 100644 (file)
@@ -7,15 +7,14 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Preconditions;
 import akka.actor.ActorRef;
 import java.util.concurrent.ExecutionException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
@@ -31,14 +30,14 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
 
     private int createSnapshotTransactionCounter;
     private final ShardTransactionActorFactory transactionActorFactory;
-    private final InMemoryDOMDataStore store;
+    private final ShardDataTree store;
     private final Logger log;
     private final String logId;
 
-    ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, InMemoryDOMDataStore store,
+    ShardSnapshotCohort(ShardTransactionActorFactory transactionActorFactory, ShardDataTree store,
             Logger log, String logId) {
         this.transactionActorFactory = transactionActorFactory;
-        this.store = store;
+        this.store = Preconditions.checkNotNull(store);
         this.log = log;
         this.logId = logId;
     }
@@ -67,15 +66,15 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
         log.info("{}: Applying snapshot", logId);
 
         try {
-            DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+            ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("snapshot-" + logId, null);
 
             NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
 
             // delete everything first
-            transaction.delete(DATASTORE_ROOT);
+            transaction.getSnapshot().delete(DATASTORE_ROOT);
 
             // Add everything from the remote node back
-            transaction.write(DATASTORE_ROOT, node);
+            transaction.getSnapshot().write(DATASTORE_ROOT, node);
             syncCommitTransaction(transaction);
         } catch (InterruptedException | ExecutionException e) {
             log.error("{}: An exception occurred when applying snapshot", logId, e);
@@ -85,9 +84,9 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort {
 
     }
 
-    void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
+    void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction)
             throws ExecutionException, InterruptedException {
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
+        DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction);
         commitCohort.preCommit().get();
         commitCohort.commit().get();
     }
index 81125a7..600ec39 100644 (file)
@@ -14,8 +14,9 @@ import akka.actor.Props;
 import akka.actor.ReceiveTimeout;
 import akka.japi.Creator;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@ -25,10 +26,6 @@ import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
@@ -66,13 +63,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         this.clientTxVersion = clientTxVersion;
     }
 
-    public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
+    public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
             DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
-        return Props.create(new ShardTransactionCreator(transaction, shardActor,
+        return Props.create(new ShardTransactionCreator(type, transaction, shardActor,
            datastoreContext, shardStats, transactionID, txnClientVersion));
     }
 
-    protected abstract DOMStoreTransaction getDOMStoreTransaction();
+    protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
 
     protected ActorRef getShardActor() {
         return shardActor;
@@ -105,7 +102,7 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     }
 
     private void closeTransaction(boolean sendReply) {
-        getDOMStoreTransaction().close();
+        getDOMStoreTransaction().abort();
 
         if(sendReply && returnCloseTransactionReply()) {
             getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
@@ -114,71 +111,83 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
 
-    protected void readData(DOMStoreReadTransaction transaction, ReadData message,
-            final boolean returnSerialized) {
-
-        final YangInstanceIdentifier path = message.getPath();
-        try {
-            final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future = transaction.read(path);
-            Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
-            ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
+    private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
+        final boolean ret = transaction.isClosed();
+        if (ret) {
+            shardStats.incrementFailedReadTransactionsCount();
+            getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf());
+        }
+        return ret;
+    }
 
-            sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
+    protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message,
+            final boolean returnSerialized) {
 
-        } catch (Exception e) {
-            LOG.debug(String.format("Unexpected error reading path %s", path), e);
-            shardStats.incrementFailedReadTransactionsCount();
-            sender().tell(new akka.actor.Status.Failure(e), self());
+        if (checkClosed(transaction)) {
+            return;
         }
+
+        final YangInstanceIdentifier path = message.getPath();
+        Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
+        ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
+        sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
     }
 
-    protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
+    protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message,
         final boolean returnSerialized) {
-        final YangInstanceIdentifier path = message.getPath();
 
-        try {
-            boolean exists = transaction.exists(path).checkedGet();
-            DataExistsReply dataExistsReply = DataExistsReply.create(exists);
-            getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
-                dataExistsReply, getSelf());
-        } catch (ReadFailedException e) {
-            getSender().tell(new akka.actor.Status.Failure(e),getSelf());
+        if (checkClosed(transaction)) {
+            return;
         }
+
+        final YangInstanceIdentifier path = message.getPath();
+        boolean exists = transaction.getSnapshot().readNode(path).isPresent();
+        DataExistsReply dataExistsReply = DataExistsReply.create(exists);
+        getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
+            dataExistsReply, getSelf());
     }
 
     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
 
         private static final long serialVersionUID = 1L;
 
-        final DOMStoreTransaction transaction;
+        final AbstractShardDataTreeTransaction<?> transaction;
         final ActorRef shardActor;
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
         final String transactionID;
         final short txnClientVersion;
+        final TransactionType type;
 
-        ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
+        ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
                 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
-            this.transaction = transaction;
+            this.transaction = Preconditions.checkNotNull(transaction);
             this.shardActor = shardActor;
             this.shardStats = shardStats;
             this.datastoreContext = datastoreContext;
             this.transactionID = transactionID;
             this.txnClientVersion = txnClientVersion;
+            this.type = type;
         }
 
         @Override
         public ShardTransaction create() throws Exception {
-            ShardTransaction tx;
-            if(transaction instanceof DOMStoreReadWriteTransaction) {
-                tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, shardStats, transactionID, txnClientVersion);
-            } else if(transaction instanceof DOMStoreReadTransaction) {
-                tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        shardStats, transactionID, txnClientVersion);
-            } else {
-                tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, shardStats, transactionID, txnClientVersion);
+            final ShardTransaction tx;
+            switch (type) {
+            case READ_ONLY:
+                tx = new ShardReadTransaction(transaction, shardActor,
+                    shardStats, transactionID, txnClientVersion);
+                break;
+            case READ_WRITE:
+                tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
+                    shardActor, shardStats, transactionID, txnClientVersion);
+                break;
+            case WRITE_ONLY:
+                tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
+                    shardActor, shardStats, transactionID, txnClientVersion);
+                break;
+            default:
+                throw new IllegalArgumentException("Unhandled transaction type " + type);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
index a4c97e8..7a16308 100644 (file)
@@ -8,16 +8,17 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Preconditions;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -25,13 +26,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  */
 public class ShardTransactionChain extends AbstractUntypedActor {
 
-    private final DOMStoreTransactionChain chain;
+    private final ShardDataTreeTransactionChain chain;
     private final DatastoreContext datastoreContext;
     private final ShardStats shardStats;
 
-    public ShardTransactionChain(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+    public ShardTransactionChain(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext,
             ShardStats shardStats) {
-        this.chain = chain;
+        this.chain = Preconditions.checkNotNull(chain);
         this.datastoreContext = datastoreContext;
         this.shardStats = shardStats;
     }
@@ -55,29 +56,25 @@ public class ShardTransactionChain extends AbstractUntypedActor {
 
     private ActorRef createTypedTransactionActor(CreateTransaction createTransaction) {
         String transactionName = "shard-" + createTransaction.getTransactionId();
-        if(createTransaction.getTransactionType() ==
-                TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
-            return getContext().actorOf(
-                    ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
-                            datastoreContext, shardStats, createTransaction.getTransactionId(),
-                            createTransaction.getVersion()), transactionName);
-        } else if (createTransaction.getTransactionType() ==
-                TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
-            return getContext().actorOf(
-                    ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
-                            datastoreContext, shardStats, createTransaction.getTransactionId(),
-                            createTransaction.getVersion()), transactionName);
-        } else if (createTransaction.getTransactionType() ==
-                TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
-            return getContext().actorOf(
-                    ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
-                            datastoreContext, shardStats, createTransaction.getTransactionId(),
-                            createTransaction.getVersion()), transactionName);
-        } else {
-            throw new IllegalArgumentException (
-                    "CreateTransaction message has unidentified transaction type=" +
-                             createTransaction.getTransactionType());
+
+        final TransactionType type = TransactionType.fromInt(createTransaction.getTransactionType());
+        final AbstractShardDataTreeTransaction<?> transaction;
+        switch (type) {
+        case READ_ONLY:
+            transaction = chain.newReadOnlyTransaction(transactionName);
+            break;
+        case READ_WRITE:
+        case WRITE_ONLY:
+            transaction = chain.newReadWriteTransaction(transactionName);
+            break;
+        default:
+            throw new IllegalArgumentException("Unhandled transaction type " + type);
         }
+
+        return getContext().actorOf(
+            ShardTransaction.props(type, transaction, getShardActor(),
+                    datastoreContext, shardStats, createTransaction.getTransactionId(),
+                    createTransaction.getVersion()), transactionName);
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
@@ -87,7 +84,7 @@ public class ShardTransactionChain extends AbstractUntypedActor {
                 createTransaction.getTransactionId()).toSerializable(), getSelf());
     }
 
-    public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+    public static Props props(ShardDataTreeTransactionChain chain, SchemaContext schemaContext,
         DatastoreContext datastoreContext, ShardStats shardStats) {
         return Props.create(new ShardTransactionChainCreator(chain, datastoreContext, shardStats));
     }
@@ -95,12 +92,11 @@ public class ShardTransactionChain extends AbstractUntypedActor {
     private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
         private static final long serialVersionUID = 1L;
 
-        final DOMStoreTransactionChain chain;
+        final ShardDataTreeTransactionChain chain;
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
 
-
-        ShardTransactionChainCreator(DOMStoreTransactionChain chain, DatastoreContext datastoreContext,
+        ShardTransactionChainCreator(ShardDataTreeTransactionChain chain, DatastoreContext datastoreContext,
                 ShardStats shardStats) {
             this.chain = chain;
             this.datastoreContext = datastoreContext;
index 9637646..3a92062 100644 (file)
@@ -7,11 +7,11 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.base.Preconditions;
 import akka.actor.ActorRef;
 import akka.actor.UntypedActorContext;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 
 /**
  * A factory for creating ShardTransaction actors.
@@ -20,16 +20,16 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
  */
 class ShardTransactionActorFactory {
 
-    private final DOMTransactionFactory domTransactionFactory;
+    private final ShardDataTree dataTree;
     private final DatastoreContext datastoreContext;
     private final String txnDispatcherPath;
     private final ShardStats shardMBean;
     private final UntypedActorContext actorContext;
     private final ActorRef shardActor;
 
-    ShardTransactionActorFactory(DOMTransactionFactory domTransactionFactory, DatastoreContext datastoreContext,
+    ShardTransactionActorFactory(ShardDataTree dataTree, DatastoreContext datastoreContext,
             String txnDispatcherPath, ActorRef shardActor, UntypedActorContext actorContext, ShardStats shardMBean) {
-        this.domTransactionFactory = domTransactionFactory;
+        this.dataTree = Preconditions.checkNotNull(dataTree);
         this.datastoreContext = datastoreContext;
         this.txnDispatcherPath = txnDispatcherPath;
         this.shardMBean = shardMBean;
@@ -39,11 +39,20 @@ class ShardTransactionActorFactory {
 
     ActorRef newShardTransaction(TransactionProxy.TransactionType type, ShardTransactionIdentifier transactionID,
             String transactionChainID, short clientVersion) {
+        final AbstractShardDataTreeTransaction<?> transaction;
+        switch (type) {
+        case READ_ONLY:
+            transaction = dataTree.newReadOnlyTransaction(transactionID.toString(), transactionChainID);
+            break;
+        case READ_WRITE:
+        case WRITE_ONLY:
+            transaction = dataTree.newReadWriteTransaction(transactionID.toString(), transactionChainID);
+            break;
+        default:
+            throw new IllegalArgumentException("Unsupported transaction type " + type);
+        }
 
-        DOMStoreTransaction transaction = domTransactionFactory.newTransaction(type, transactionID.toString(),
-                transactionChainID);
-
-        return actorContext.actorOf(ShardTransaction.props(transaction, shardActor, datastoreContext, shardMBean,
+        return actorContext.actorOf(ShardTransaction.props(type, transaction, shardActor, datastoreContext, shardMBean,
                 transactionID.getRemoteTransactionId(), clientVersion).withDispatcher(txnDispatcherPath),
                 transactionID.toString());
     }
index 424ab20..69a696f 100644 (file)
@@ -15,11 +15,13 @@ import akka.actor.PoisonPill;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
@@ -30,8 +32,6 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 
 /**
  * @author: syedbahm
@@ -42,16 +42,16 @@ public class ShardWriteTransaction extends ShardTransaction {
     private final MutableCompositeModification compositeModification = new MutableCompositeModification();
     private int totalBatchedModificationsReceived;
     private Exception lastBatchedModificationsException;
-    private final DOMStoreWriteTransaction transaction;
+    private final ReadWriteShardDataTreeTransaction transaction;
 
-    public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
+    public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor,
             ShardStats shardStats, String transactionID, short clientTxVersion) {
         super(shardActor, shardStats, transactionID, clientTxVersion);
         this.transaction = transaction;
     }
 
     @Override
-    protected DOMStoreTransaction getDOMStoreTransaction() {
+    protected ReadWriteShardDataTreeTransaction getDOMStoreTransaction() {
         return transaction;
     }
 
@@ -61,17 +61,17 @@ public class ShardWriteTransaction extends ShardTransaction {
         if (message instanceof BatchedModifications) {
             batchedModifications((BatchedModifications)message);
         } else if (message instanceof ReadyTransaction) {
-            readyTransaction(transaction, !SERIALIZED_REPLY, false);
+            readyTransaction(!SERIALIZED_REPLY, false);
         } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-            readyTransaction(transaction, SERIALIZED_REPLY, false);
+            readyTransaction(SERIALIZED_REPLY, false);
         } else if(WriteData.isSerializedType(message)) {
-            writeData(transaction, WriteData.fromSerializable(message), SERIALIZED_REPLY);
+            writeData(WriteData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(MergeData.isSerializedType(message)) {
-            mergeData(transaction, MergeData.fromSerializable(message), SERIALIZED_REPLY);
+            mergeData(MergeData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if(DeleteData.isSerializedType(message)) {
-            deleteData(transaction, DeleteData.fromSerializable(message), SERIALIZED_REPLY);
+            deleteData(DeleteData.fromSerializable(message), SERIALIZED_REPLY);
 
         } else if (message instanceof GetCompositedModification) {
             // This is here for testing only
@@ -82,10 +82,17 @@ public class ShardWriteTransaction extends ShardTransaction {
     }
 
     private void batchedModifications(BatchedModifications batched) {
+        if (checkClosed()) {
+            if (batched.isReady()) {
+                getSelf().tell(PoisonPill.getInstance(), getSelf());
+            }
+            return;
+        }
+
         try {
             for(Modification modification: batched.getModifications()) {
                 compositeModification.addModification(modification);
-                modification.apply(transaction);
+                modification.apply(transaction.getSnapshot());
             }
 
             totalBatchedModificationsReceived++;
@@ -100,7 +107,7 @@ public class ShardWriteTransaction extends ShardTransaction {
                             totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
                 }
 
-                readyTransaction(transaction, false, batched.isDoCommitOnReady());
+                readyTransaction(false, batched.isDoCommitOnReady());
             } else {
                 getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
             }
@@ -114,14 +121,33 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void writeData(DOMStoreWriteTransaction transaction, WriteData message,
-            boolean returnSerialized) {
+    protected final void dataExists(DataExists message, final boolean returnSerialized) {
+        super.dataExists(transaction, message, returnSerialized);
+    }
+
+    protected final void readData(ReadData message, final boolean returnSerialized) {
+        super.readData(transaction, message, returnSerialized);
+    }
+
+    private boolean checkClosed() {
+        if (transaction.isClosed()) {
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException("Transaction is closed, no modifications allowed")), getSelf());
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private void writeData(WriteData message, boolean returnSerialized) {
         LOG.debug("writeData at path : {}", message.getPath());
+        if (checkClosed()) {
+            return;
+        }
 
         compositeModification.addModification(
                 new WriteModification(message.getPath(), message.getData()));
         try {
-            transaction.write(message.getPath(), message.getData());
+            transaction.getSnapshot().write(message.getPath(), message.getData());
             WriteDataReply writeDataReply = WriteDataReply.INSTANCE;
             getSender().tell(returnSerialized ? writeDataReply.toSerializable(message.getVersion()) :
                 writeDataReply, getSelf());
@@ -130,15 +156,17 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void mergeData(DOMStoreWriteTransaction transaction, MergeData message,
-            boolean returnSerialized) {
+    private void mergeData(MergeData message, boolean returnSerialized) {
         LOG.debug("mergeData at path : {}", message.getPath());
+        if (checkClosed()) {
+            return;
+        }
 
         compositeModification.addModification(
                 new MergeModification(message.getPath(), message.getData()));
 
         try {
-            transaction.merge(message.getPath(), message.getData());
+            transaction.getSnapshot().merge(message.getPath(), message.getData());
             MergeDataReply mergeDataReply = MergeDataReply.INSTANCE;
             getSender().tell(returnSerialized ? mergeDataReply.toSerializable(message.getVersion()) :
                 mergeDataReply, getSelf());
@@ -147,23 +175,24 @@ public class ShardWriteTransaction extends ShardTransaction {
         }
     }
 
-    private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message,
-            boolean returnSerialized) {
+    private void deleteData(DeleteData message, boolean returnSerialized) {
         LOG.debug("deleteData at path : {}", message.getPath());
+        if (checkClosed()) {
+            return;
+        }
 
         compositeModification.addModification(new DeleteModification(message.getPath()));
         try {
-            transaction.delete(message.getPath());
+            transaction.getSnapshot().delete(message.getPath());
             DeleteDataReply deleteDataReply = DeleteDataReply.INSTANCE;
             getSender().tell(returnSerialized ? deleteDataReply.toSerializable(message.getVersion()) :
                 deleteDataReply, getSelf());
-        }catch(Exception e){
+        } catch(Exception e) {
             getSender().tell(new akka.actor.Status.Failure(e), getSelf());
         }
     }
 
-    private void readyTransaction(DOMStoreWriteTransaction transaction, boolean returnSerialized,
-            boolean doImmediateCommit) {
+    private void readyTransaction(boolean returnSerialized, boolean doImmediateCommit) {
         String transactionID = getTransactionID();
 
         LOG.debug("readyTransaction : {}", transactionID);
index 3a63f5b..2c55357 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 /**
  * DeleteModification store all the parameters required to delete a path from the data tree
@@ -41,6 +42,11 @@ public class DeleteModification extends AbstractModification {
         transaction.delete(getPath());
     }
 
+    @Override
+    public void apply(DataTreeModification transaction) {
+        transaction.delete(getPath());
+    }
+
     @Override
     public byte getType() {
         return DELETE;
index 7ba74f4..cc7956e 100644 (file)
@@ -17,6 +17,7 @@ import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessa
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 /**
  * MergeModification stores all the parameters required to merge data into the specified path
@@ -41,6 +42,11 @@ public class MergeModification extends WriteModification {
         transaction.merge(getPath(), getData());
     }
 
+    @Override
+    public void apply(final DataTreeModification transaction) {
+        transaction.merge(getPath(), getData());
+    }
+
     @Override
     public byte getType() {
         return MERGE;
index 2dfcdf0..6fc8183 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.modification;
 
 import java.io.Externalizable;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 /**
  * Represents a modification to the data store.
@@ -39,6 +40,13 @@ public interface Modification extends Externalizable {
      */
     void apply(DOMStoreWriteTransaction transaction);
 
+    /**
+     * Apply the modification to the specified transaction
+     *
+     * @param transaction
+     */
+    void apply(DataTreeModification transaction);
+
     byte getType();
 
     @Deprecated
index b597742..b594578 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.cluster.datastore.node.utils.stream.Normalize
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 /**
  * MutableCompositeModification is just a mutable version of a
@@ -45,6 +46,13 @@ public class MutableCompositeModification implements CompositeModification {
         }
     }
 
+    @Override
+    public void apply(DataTreeModification transaction) {
+        for (Modification modification : modifications) {
+            modification.apply(transaction);
+        }
+    }
+
     @Override
     public byte getType() {
         return COMPOSITE;
index 2fdca5f..f7f9a71 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessa
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 
 /**
  * WriteModification stores all the parameters required to write data to the specified path
@@ -48,6 +49,11 @@ public class WriteModification extends AbstractModification {
         transaction.write(getPath(), data);
     }
 
+    @Override
+    public void apply(final DataTreeModification transaction) {
+        transaction.write(getPath(), data);
+    }
+
     public NormalizedNode<?, ?> getData() {
         return data;
     }
index f6a103f..03f2bb7 100644 (file)
@@ -46,12 +46,16 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -183,19 +187,19 @@ public abstract class AbstractShardTest extends AbstractActorTest{
     }
 
     protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
-            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+            final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
             final MutableCompositeModification modification) {
         return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
     }
 
     protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
-            final InMemoryDOMDataStore dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
+            final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
             final MutableCompositeModification modification,
             final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
 
-        DOMStoreWriteTransaction tx = dataStore.newWriteOnlyTransaction();
-        tx.write(path, data);
-        DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, tx.ready(), preCommit);
+        ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null);
+        tx.getSnapshot().write(path, data);
+        DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit);
 
         modification.addModification(new WriteModification(path, data));
 
@@ -249,38 +253,43 @@ public abstract class AbstractShardTest extends AbstractActorTest{
 
     public static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
             throws ExecutionException, InterruptedException {
-        return readStore(shard.underlyingActor().getDataStore(), id);
+        return readStore(shard.underlyingActor().getDataStore().getDataTree(), id);
     }
 
-    public static NormalizedNode<?,?> readStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id)
-            throws ExecutionException, InterruptedException {
-        DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
-
-        CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-            transaction.read(id);
+    public static NormalizedNode<?,?> readStore(final DataTree store, final YangInstanceIdentifier id) {
+        DataTreeSnapshot transaction = store.takeSnapshot();
 
-        Optional<NormalizedNode<?, ?>> optional = future.get();
+        Optional<NormalizedNode<?, ?>> optional = transaction.readNode(id);
         NormalizedNode<?, ?> node = optional.isPresent()? optional.get() : null;
 
-        transaction.close();
-
         return node;
     }
 
     public static void writeToStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
+            final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
         writeToStore(shard.underlyingActor().getDataStore(), id, node);
     }
 
-    public static void writeToStore(final InMemoryDOMDataStore store, final YangInstanceIdentifier id,
-            final NormalizedNode<?,?> node) throws ExecutionException, InterruptedException {
-        DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
+    public static void writeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
+            final NormalizedNode<?,?> node) throws InterruptedException, ExecutionException {
+        ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null);
 
-        transaction.write(id, node);
+        transaction.getSnapshot().write(id, node);
+        DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
+        cohort.canCommit().get();
+        cohort.preCommit().get();
+        cohort.commit();
+    }
 
-        DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
-        commitCohort.preCommit().get();
-        commitCohort.commit().get();
+    public static void writeToStore(final DataTree store, final YangInstanceIdentifier id,
+            final NormalizedNode<?,?> node) throws DataValidationFailedException {
+        DataTreeModification transaction = store.takeSnapshot().newModification();
+
+        transaction.write(id, node);
+        transaction.ready();
+        store.validate(transaction);
+        final DataTreeCandidate candidate = store.prepare(transaction);
+        store.commit(candidate);
     }
 
     @SuppressWarnings("serial")
index 72f6727..22ce50b 100644 (file)
@@ -89,7 +89,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -100,7 +99,9 @@ import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -338,8 +339,8 @@ public class ShardTest extends AbstractShardTest {
         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
                 "testApplySnapshot");
 
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+        DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        store.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -382,8 +383,8 @@ public class ShardTest extends AbstractShardTest {
 
         // Set up the InMemorySnapshotStore.
 
-        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
-        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+        DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+        testStore.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -440,7 +441,7 @@ public class ShardTest extends AbstractShardTest {
 
          // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
@@ -861,7 +862,7 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             String transactionID = "tx1";
             MutableCompositeModification modification = new MutableCompositeModification();
@@ -901,7 +902,7 @@ public class ShardTest extends AbstractShardTest {
 
             waitUntilLeader(shard);
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             // Setup a simulated transactions with a mock cohort.
 
@@ -1400,7 +1401,7 @@ public class ShardTest extends AbstractShardTest {
             waitUntilLeader(shard);
 
             final FiniteDuration duration = duration("5 seconds");
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             final String transactionID = "tx1";
             Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
@@ -1464,7 +1465,7 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             writeToStore(shard, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
             writeToStore(shard, TestModel.OUTER_LIST_PATH,
@@ -1541,7 +1542,7 @@ public class ShardTest extends AbstractShardTest {
 
             final FiniteDuration duration = duration("5 seconds");
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();
index 21fb55f..e45389f 100644 (file)
@@ -14,17 +14,15 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.pattern.AskTimeoutException;
 import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -40,11 +38,13 @@ import scala.concurrent.duration.Duration;
  * @author Basheeruddin Ahmed <syedbahm@cisco.com>
  */
 public class ShardTransactionFailureTest extends AbstractActorTest {
-    private static final InMemoryDOMDataStore store =
-        new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-
     private static final SchemaContext testSchemaContext =
-        TestModel.createTestContext();
+            TestModel.createTestContext();
+    private static final TransactionType RO = TransactionType.READ_ONLY;
+    private static final TransactionType RW = TransactionType.READ_WRITE;
+    private static final TransactionType WO = TransactionType.WRITE_ONLY;
+
+    private static final ShardDataTree store = new ShardDataTree(testSchemaContext);
 
     private static final ShardIdentifier SHARD_IDENTIFIER =
         ShardIdentifier.builder().memberName("member-1")
@@ -54,11 +54,6 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
-    @BeforeClass
-    public static void staticSetup() {
-        store.onGlobalContextUpdated(testSchemaContext);
-    }
-
     private ActorRef createShard(){
         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.<String, String>emptyMap(), datastoreContext,
                 TestModel.createTestContext()));
@@ -69,7 +64,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         throws Throwable {
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+        final Props props = ShardTransaction.props(RO, store.newReadOnlyTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -86,7 +81,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             akka.pattern.Patterns.ask(subject, readData, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().close();
+        subject.underlyingActor().getDOMStoreTransaction().abort();
 
         future = akka.pattern.Patterns.ask(subject, readData, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
@@ -98,7 +93,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         throws Throwable {
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+        final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -116,7 +111,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             akka.pattern.Patterns.ask(subject, readData, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().close();
+        subject.underlyingActor().getDOMStoreTransaction().abort();
 
         future = akka.pattern.Patterns.ask(subject, readData, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
@@ -127,7 +122,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         throws Throwable {
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+        final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -145,7 +140,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             akka.pattern.Patterns.ask(subject, dataExists, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().close();
+        subject.underlyingActor().getDOMStoreTransaction().abort();
 
         future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
@@ -156,7 +151,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+        final Props props = ShardTransaction.props(WO, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -188,7 +183,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+        final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -225,7 +220,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+        final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
@@ -257,7 +252,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+        final Props props = ShardTransaction.props(RW, store.newReadWriteTransaction("test-txn", null), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
index b22001a..23984ad 100644 (file)
@@ -11,14 +11,13 @@ import akka.actor.Status.Failure;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
-import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
+import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
@@ -50,13 +49,11 @@ import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCo
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -64,6 +61,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class ShardTransactionTest extends AbstractActorTest {
 
     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
+    private static final TransactionType RO = TransactionType.READ_ONLY;
+    private static final TransactionType RW = TransactionType.READ_WRITE;
+    private static final TransactionType WO = TransactionType.WRITE_ONLY;
 
     private static final ShardIdentifier SHARD_IDENTIFIER =
         ShardIdentifier.builder().memberName("member-1")
@@ -73,46 +73,50 @@ public class ShardTransactionTest extends AbstractActorTest {
 
     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
 
-    private final InMemoryDOMDataStore store =
-            new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+    private final ShardDataTree store = new ShardDataTree(testSchemaContext);
 
-    @Before
-    public void setup() {
-        store.onGlobalContextUpdated(testSchemaContext);
-    }
+    private int txCounter = 0;
 
-    private ActorRef createShard(){
+    private ActorRef createShard() {
         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
             Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
     }
 
-    private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
-        return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
+    private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
+        return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
     }
 
-    private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
-        return newTransactionActor(transaction, null, name, version);
+    private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
+        return newTransactionActor(type, transaction, null, name, version);
     }
 
-    private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
-        return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
+    private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
+        return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
     }
 
-    private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
+    private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
             short version) {
-        Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
+        Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
                 datastoreContext, shardStats, "txn", version);
         return getSystem().actorOf(props, name);
     }
 
+    private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
+        return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
+    }
+
+    private ReadWriteShardDataTreeTransaction readWriteTransaction() {
+        return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
+    }
+
     @Test
     public void testOnReceiveReadData() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
 
-            testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
+            testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
 
-            testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
+            testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
         }
 
         private void testOnReceiveReadData(final ActorRef transaction) {
@@ -140,10 +144,10 @@ public class ShardTransactionTest extends AbstractActorTest {
             final ActorRef shard = createShard();
 
             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
-                    store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
+                    RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
 
             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
-                    store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
+                    RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
         }
 
         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
@@ -167,7 +171,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadDataHeliumR1() throws Exception {
         new JavaTestKit(getSystem()) {{
-            ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+            ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
 
             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
@@ -185,10 +189,10 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
 
-            testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
+            testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
                     "testDataExistsPositiveRO"));
 
-            testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
+            testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
                     "testDataExistsPositiveRW"));
         }
 
@@ -215,10 +219,10 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
 
-            testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
+            testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
                     "testDataExistsNegativeRO"));
 
-            testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
+            testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
                     "testDataExistsNegativeRW"));
         }
 
@@ -253,9 +257,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveWriteData() throws Exception {
+    public void testOnReceiveWriteData() {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
@@ -276,9 +280,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveHeliumR1WriteData() throws Exception {
+    public void testOnReceiveHeliumR1WriteData() {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
 
             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
@@ -296,9 +300,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     }
 
     @Test
-    public void testOnReceiveMergeData() throws Exception {
+    public void testOnReceiveMergeData() {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+            final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
                     "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
@@ -321,7 +325,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveHeliumR1MergeData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
 
             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
@@ -341,7 +345,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDeleteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testDeleteData");
 
             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
@@ -362,8 +366,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveBatchedModifications() throws Exception {
         new JavaTestKit(getSystem()) {{
 
-            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
-            final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
+            ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
+            DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
+            ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
+            final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
 
             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
@@ -405,10 +411,10 @@ public class ShardTransactionTest extends AbstractActorTest {
             DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
             assertEquals("getPath", deletePath, delete.getPath());
 
-            InOrder inOrder = Mockito.inOrder(mockWriteTx);
-            inOrder.verify(mockWriteTx).write(writePath, writeData);
-            inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
-            inOrder.verify(mockWriteTx).delete(deletePath);
+            InOrder inOrder = Mockito.inOrder(mockModification);
+            inOrder.verify(mockModification).write(writePath, writeData);
+            inOrder.verify(mockModification).merge(mergePath, mergeData);
+            inOrder.verify(mockModification).delete(deletePath);
         }};
     }
 
@@ -416,7 +422,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
         new JavaTestKit(getSystem()) {{
 
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -448,7 +454,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
         new JavaTestKit(getSystem()) {{
 
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -475,8 +481,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
         new JavaTestKit(getSystem()) {{
 
-            DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
-            final ActorRef transaction = newTransactionActor(mockWriteTx,
+            ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
+            DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
+            ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
+            final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
                     "testOnReceiveBatchedModificationsFailure");
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -485,7 +493,7 @@ public class ShardTransactionTest extends AbstractActorTest {
             YangInstanceIdentifier path = TestModel.TEST_PATH;
             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-            doThrow(new TestException()).when(mockWriteTx).write(path, node);
+            doThrow(new TestException()).when(mockModification).write(path, node);
 
             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
             batched.addModification(new WriteModification(path, node));
@@ -511,7 +519,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
         new JavaTestKit(getSystem()) {{
 
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -535,7 +543,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+            final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
                     "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -549,7 +557,7 @@ public class ShardTransactionTest extends AbstractActorTest {
 
         // test
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+            final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
                     "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
 
             JavaTestKit watcher = new JavaTestKit(getSystem());
@@ -565,13 +573,13 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveCreateSnapshot() throws Exception {
         new JavaTestKit(getSystem()) {{
-            ShardTest.writeToStore(store, TestModel.TEST_PATH,
+            ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
-            NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
+            NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
                     YangInstanceIdentifier.builder().build());
 
-            final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
                     "testOnReceiveCreateSnapshot");
 
             watch(transaction);
@@ -594,7 +602,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+            final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
                     "testReadWriteTxOnReceiveCloseTransaction");
 
             watch(transaction);
@@ -609,7 +617,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
                     "testWriteTxOnReceiveCloseTransaction");
 
             watch(transaction);
@@ -624,7 +632,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
+            final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
                     "testReadOnlyTxOnReceiveCloseTransaction");
 
             watch(transaction);
@@ -638,7 +646,7 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test(expected=UnknownMessageException.class)
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+        final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
@@ -653,7 +661,7 @@ public class ShardTransactionTest extends AbstractActorTest {
                 500, TimeUnit.MILLISECONDS).build();
 
         new JavaTestKit(getSystem()) {{
-            final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
+            final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
                     "testShardTransactionInactivity");
 
             watch(transaction);
index caabb32..96cd3e4 100644 (file)
@@ -20,7 +20,6 @@ import akka.pattern.Patterns;
 import akka.testkit.TestActorRef;
 import akka.util.Timeout;
 import com.google.common.base.Optional;
-import com.google.common.util.concurrent.MoreExecutors;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -32,6 +31,7 @@ import org.junit.Test;
 import org.mockito.InOrder;
 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
 import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
@@ -56,8 +56,6 @@ import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Compos
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -65,7 +63,9 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgum
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -110,8 +110,8 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
         NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
 
-        InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
-        store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+        DataTree store = InMemoryDataTreeFactory.getInstance().create();
+        store.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -154,10 +154,8 @@ public class PreLithiumShardTest extends AbstractShardTest {
     @Test
     public void testHelium2VersionRecovery() throws Exception {
 
-        // Set up the InMemorySnapshotStore.
-
-        InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
-        testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
+        DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+        testStore.setSchemaContext(SCHEMA_CONTEXT);
 
         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
 
@@ -217,7 +215,7 @@ public class PreLithiumShardTest extends AbstractShardTest {
 
             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
 
-            InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
 
             String transactionID1 = "tx1";
             MutableCompositeModification modification1 = new MutableCompositeModification();

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.