CDS: Implement front-end support for local transactions 23/20523/2
authorRobert Varga <rovarga@cisco.com>
Fri, 24 Apr 2015 11:33:24 +0000 (13:33 +0200)
committerGerrit Code Review <gerrit@opendaylight.org>
Sat, 16 May 2015 01:38:03 +0000 (01:38 +0000)
Implemented support on the TransactionProxy front-end for optimizations
where the Shard is local to the controller instance. In this mode, the
Shard's DataTree obtained from the FindPrimaryShard message is used to
prepare the modifications completely on the front-end. On ready, the
DataTreeModification instance is passed to the Shard via
the ReadyLocalTransaction message. The Shard then does a direct commit,
eliding the 3PC from the front-end (it's a no-op as it is for remote
write-only txns).

TransactionContext instances are now obtained via an
AbstractTransactionContextFactory passed to TransactionProxy of which
there are 2 kinds: one for single, unchained txns and one for chained
tnxs. Each creates a different DOM transaction instance to handle
preperation of modifications. The DOM transacton is wrapped in a
LocalTransactionContext which the TransactionProxy interfaces with.

Change-Id: I0322b586f394e4b6c7793b8287ac804b41964378
Signed-off-by: Robert Varga <rovarga@cisco.com>
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit 2f7c93174d7834a4c4aedacc9b88aa53a5a0422c)

39 files changed:
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemoryJournal.java
opendaylight/md-sal/sal-clustering-config/src/main/resources/initial/akka.conf
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextImpl.java with 87% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionFutureCallback.java with 55% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionReadyReplyMapper.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionReadyReplyMapper.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/CarsModel.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/PeopleModel.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2.conf [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member2.conf [new file with mode: 0644]
opendaylight/md-sal/sal-dom-spi/src/main/java/org/opendaylight/controller/sal/core/spi/data/AbstractSnapshotBackedTransactionChain.java

index d482e28..0e2449e 100644 (file)
@@ -36,13 +36,23 @@ import scala.concurrent.Future;
  */
 public class InMemoryJournal extends AsyncWriteJournal {
 
+    private static class WriteMessagesComplete {
+        final CountDownLatch latch;
+        final Class<?> ofType;
+
+        public WriteMessagesComplete(int count, Class<?> ofType) {
+            this.latch = new CountDownLatch(count);
+            this.ofType = ofType;
+        }
+    }
+
     static final Logger LOG = LoggerFactory.getLogger(InMemoryJournal.class);
 
     private static final Map<String, Map<Long, Object>> journals = new ConcurrentHashMap<>();
 
     private static final Map<String, CountDownLatch> deleteMessagesCompleteLatches = new ConcurrentHashMap<>();
 
-    private static final Map<String, CountDownLatch> writeMessagesCompleteLatches = new ConcurrentHashMap<>();
+    private static final Map<String, WriteMessagesComplete> writeMessagesComplete = new ConcurrentHashMap<>();
 
     private static final Map<String, CountDownLatch> blockReadMessagesLatches = new ConcurrentHashMap<>();
 
@@ -107,7 +117,7 @@ public class InMemoryJournal extends AsyncWriteJournal {
     }
 
     public static void waitForWriteMessagesComplete(String persistenceId) {
-        if(!Uninterruptibles.awaitUninterruptibly(writeMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
+        if(!Uninterruptibles.awaitUninterruptibly(writeMessagesComplete.get(persistenceId).latch, 5, TimeUnit.SECONDS)) {
             throw new AssertionError("Journal write messages did not complete");
         }
     }
@@ -117,7 +127,11 @@ public class InMemoryJournal extends AsyncWriteJournal {
     }
 
     public static void addWriteMessagesCompleteLatch(String persistenceId, int count) {
-        writeMessagesCompleteLatches.put(persistenceId, new CountDownLatch(count));
+        writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, null));
+    }
+
+    public static void addWriteMessagesCompleteLatch(String persistenceId, int count, Class<?> ofType) {
+        writeMessagesComplete.put(persistenceId, new WriteMessagesComplete(count, ofType));
     }
 
     public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) {
@@ -193,9 +207,11 @@ public class InMemoryJournal extends AsyncWriteJournal {
                         journal.put(repr.sequenceNr(), repr.payload());
                     }
 
-                    CountDownLatch latch = writeMessagesCompleteLatches.get(repr.persistenceId());
-                    if(latch != null) {
-                        latch.countDown();
+                    WriteMessagesComplete complete = writeMessagesComplete.get(repr.persistenceId());
+                    if(complete != null) {
+                        if(complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) {
+                            complete.latch.countDown();
+                        }
                     }
                 }
 
index 8d65e59..a4d554c 100644 (file)
@@ -17,10 +17,12 @@ odl-cluster-data {
       serializers {
         java = "akka.serialization.JavaSerializer"
         proto = "akka.remote.serialization.ProtobufSerializer"
+        readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
       }
 
       serialization-bindings {
         "com.google.protobuf.Message" = proto
+        "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
       }
 
       default-dispatcher {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
new file mode 100644 (file)
index 0000000..78e059c
--- /dev/null
@@ -0,0 +1,205 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.util.Try;
+
+/**
+ * Factory for creating local and remote TransactionContext instances. Maintains a cache of known local
+ * transaction factories.
+ */
+abstract class AbstractTransactionContextFactory<F extends LocalTransactionFactory>
+        implements ShardInfoListener, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContextFactory.class);
+
+    protected static final AtomicLong TX_COUNTER = new AtomicLong();
+
+    private final ConcurrentMap<String, F> knownLocal = new ConcurrentHashMap<>();
+    private final ActorContext actorContext;
+
+    protected AbstractTransactionContextFactory(final ActorContext actorContext) {
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+    }
+
+    final ActorContext getActorContext() {
+        return actorContext;
+    }
+
+    private TransactionContext maybeCreateLocalTransactionContext(final TransactionProxy parent, final String shardName) {
+        final LocalTransactionFactory local = knownLocal.get(shardName);
+        if (local != null) {
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Tx {} - Creating local component for shard {} using factory {}",
+                        parent.getIdentifier(), shardName, local);
+            }
+            return createLocalTransactionContext(local, parent);
+        }
+
+        return null;
+    }
+
+    private void onFindPrimaryShardSuccess(PrimaryShardInfo primaryShardInfo, TransactionProxy parent,
+            String shardName, TransactionContextWrapper transactionContextAdapter) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {}: Found primary {} for shard {}", parent.getIdentifier(),
+                    primaryShardInfo.getPrimaryShardActor(), shardName);
+        }
+
+        updateShardInfo(shardName, primaryShardInfo);
+
+        TransactionContext localContext = maybeCreateLocalTransactionContext(parent, shardName);
+        if(localContext != null) {
+            transactionContextAdapter.executePriorTransactionOperations(localContext);
+        } else {
+            RemoteTransactionContextSupport remote = new RemoteTransactionContextSupport(transactionContextAdapter,
+                    parent, shardName);
+            remote.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
+        }
+    }
+
+    private void onFindPrimaryShardFailure(Throwable failure, TransactionProxy parent,
+            String shardName, TransactionContextWrapper transactionContextAdapter) {
+        LOG.debug("Tx {}: Find primary for shard {} failed", parent.getIdentifier(), shardName, failure);
+
+        transactionContextAdapter.executePriorTransactionOperations(new NoOpTransactionContext(failure,
+                parent.getIdentifier(), parent.getLimiter()));
+    }
+
+    final TransactionContextWrapper newTransactionAdapter(final TransactionProxy parent, final String shardName) {
+        final TransactionContextWrapper transactionContextAdapter = new TransactionContextWrapper(parent.getIdentifier());
+
+        Future<PrimaryShardInfo> findPrimaryFuture = findPrimaryShard(shardName);
+        if(findPrimaryFuture.isCompleted()) {
+            Try<PrimaryShardInfo> maybe = findPrimaryFuture.value().get();
+            if(maybe.isSuccess()) {
+                onFindPrimaryShardSuccess(maybe.get(), parent, shardName, transactionContextAdapter);
+            } else {
+                onFindPrimaryShardFailure(maybe.failed().get(), parent, shardName, transactionContextAdapter);
+            }
+        } else {
+            findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
+                @Override
+                public void onComplete(final Throwable failure, final PrimaryShardInfo primaryShardInfo) {
+                    if (failure == null) {
+                        onFindPrimaryShardSuccess(primaryShardInfo, parent, shardName, transactionContextAdapter);
+                    } else {
+                        onFindPrimaryShardFailure(failure, parent, shardName, transactionContextAdapter);
+                    }
+                }
+            }, actorContext.getClientDispatcher());
+        }
+
+        return transactionContextAdapter;
+    }
+
+    private void updateShardInfo(final String shardName, final PrimaryShardInfo primaryShardInfo) {
+        final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
+        if (maybeDataTree.isPresent()) {
+            knownLocal.put(shardName, factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), maybeDataTree.get()));
+            LOG.debug("Shard {} resolved to local data tree", shardName);
+        }
+    }
+
+    @Override
+    public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
+        final F existing = knownLocal.get(shardName);
+        if (existing != null) {
+            if (primaryShardInfo != null) {
+                final Optional<DataTree> maybeDataTree = primaryShardInfo.getLocalShardDataTree();
+                if (maybeDataTree.isPresent()) {
+                    final DataTree newDataTree = maybeDataTree.get();
+                    final DataTree oldDataTree = dataTreeForFactory(existing);
+                    if (!oldDataTree.equals(newDataTree)) {
+                        final F newChain = factoryForShard(shardName, primaryShardInfo.getPrimaryShardActor(), newDataTree);
+                        knownLocal.replace(shardName, existing, newChain);
+                        LOG.debug("Replaced shard {} local data tree to {}", shardName, newDataTree);
+                    }
+
+                    return;
+                }
+            }
+            if (knownLocal.remove(shardName, existing)) {
+                LOG.debug("Shard {} invalidated data tree {}", shardName, existing);
+            } else {
+                LOG.debug("Shard {} failed to invalidate data tree {} ... strange", shardName, existing);
+            }
+        }
+    }
+
+    protected String getMemberName() {
+        String memberName = getActorContext().getCurrentMemberName();
+        if (memberName == null) {
+            memberName = "UNKNOWN-MEMBER";
+        }
+
+        return memberName;
+    }
+
+    /**
+     * Create an identifier for the next TransactionProxy attached to this component
+     * factory.
+     * @return Transaction identifier, may not be null.
+     */
+    protected abstract TransactionIdentifier nextIdentifier();
+
+    /**
+     * Find the primary shard actor.
+     *
+     * @param shardName Shard name
+     * @return Future containing shard information.
+     */
+    protected abstract Future<PrimaryShardInfo> findPrimaryShard(String shardName);
+
+    /**
+     * Create local transaction factory for specified shard, backed by specified shard leader
+     * and data tree instance.
+     *
+     * @param shardName
+     * @param shardLeader
+     * @param dataTree Backing data tree instance. The data tree may only be accessed in
+     *                 read-only manner.
+     * @return Transaction factory for local use.
+     */
+    protected abstract F factoryForShard(String shardName, ActorSelection shardLeader, DataTree dataTree);
+
+    /**
+     * Extract the backing data tree from a particular factory.
+     *
+     * @param factory Transaction factory
+     * @return Backing data tree
+     */
+    protected abstract DataTree dataTreeForFactory(F factory);
+
+    /**
+     * Callback invoked from child transactions to push any futures, which need to
+     * be waited for before the next transaction is allocated.
+     * @param cohortFutures Collection of futures
+     */
+    protected abstract <T> void onTransactionReady(@Nonnull TransactionIdentifier transaction, @Nonnull Collection<Future<T>> cohortFutures);
+
+    private static TransactionContext createLocalTransactionContext(final LocalTransactionFactory factory, final TransactionProxy parent) {
+        return new LocalTransactionContext(parent.getIdentifier(), factory.newReadWriteTransaction(parent.getIdentifier()));
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedTransactionProxy.java
deleted file mode 100644 (file)
index 9a800c1..0000000
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.dispatch.OnComplete;
-import java.util.List;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-final class ChainedTransactionProxy extends TransactionProxy {
-    private static final Logger LOG = LoggerFactory.getLogger(ChainedTransactionProxy.class);
-
-    /**
-     * Stores the ready Futures from the previous Tx in the chain.
-     */
-    private final List<Future<Object>> previousReadyFutures;
-
-    /**
-     * Stores the ready Futures from this transaction when it is readied.
-     */
-    private volatile List<Future<Object>> readyFutures;
-
-    ChainedTransactionProxy(ActorContext actorContext, TransactionType transactionType,
-            String transactionChainId, List<Future<Object>> previousReadyFutures) {
-        super(actorContext, transactionType, transactionChainId);
-        this.previousReadyFutures = previousReadyFutures;
-    }
-
-    List<Future<Object>> getReadyFutures() {
-        return readyFutures;
-    }
-
-    boolean isReady() {
-        return readyFutures != null;
-    }
-
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    @Override
-    public AbstractThreePhaseCommitCohort<?> ready() {
-        final AbstractThreePhaseCommitCohort<?> ret = super.ready();
-        readyFutures = (List)ret.getCohortFutures();
-        LOG.debug("onTransactionReady {} pending readyFutures size {} chain {}", getIdentifier(),
-            readyFutures.size(), getTransactionChainId());
-        return ret;
-    }
-
-    /**
-     * This method is overridden to ensure the previous Tx's ready operations complete
-     * before we initiate the next Tx in the chain to avoid creation failures if the
-     * previous Tx's ready operations haven't completed yet.
-     */
-    @Override
-    protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(final String shardName) {
-        // Check if there are any previous ready Futures, otherwise let the super class handle it.
-        if(previousReadyFutures.isEmpty()) {
-            return super.sendFindPrimaryShardAsync(shardName);
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Waiting for {} previous ready futures for Tx {} on chain {}",
-                    previousReadyFutures.size(), getIdentifier(), getTransactionChainId());
-        }
-
-        // Combine the ready Futures into 1.
-        Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
-                previousReadyFutures, getActorContext().getClientDispatcher());
-
-        // Add a callback for completion of the combined Futures.
-        final Promise<PrimaryShardInfo> returnPromise = akka.dispatch.Futures.promise();
-        OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
-            @Override
-            public void onComplete(Throwable failure, Iterable<Object> notUsed) {
-                if(failure != null) {
-                    // A Ready Future failed so fail the returned Promise.
-                    returnPromise.failure(failure);
-                } else {
-                    LOG.debug("Previous Tx readied - sending FindPrimaryShard for {} on chain {}",
-                            getIdentifier(), getTransactionChainId());
-
-                    // Send the FindPrimaryShard message and use the resulting Future to complete the
-                    // returned Promise.
-                    returnPromise.completeWith(ChainedTransactionProxy.super.sendFindPrimaryShardAsync(shardName));
-                }
-            }
-        };
-
-        combinedFutures.onComplete(onComplete, getActorContext().getClientDispatcher());
-
-        return returnPromise.future();
-    }
-}
\ No newline at end of file
index 8051f7d..1826665 100644 (file)
@@ -63,6 +63,8 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
 
     private final String type;
 
+    private final TransactionContextFactory txContextFactory;
+
     public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
             Configuration configuration, DatastoreContext datastoreContext) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
@@ -85,6 +87,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
 
+        this.txContextFactory = TransactionContextFactory.create(actorContext);
 
         datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType());
         datastoreConfigMXBean.setContext(datastoreContext);
@@ -96,10 +99,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
 
     public DistributedDataStore(ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+        this.txContextFactory = TransactionContextFactory.create(actorContext);
         this.type = UNKNOWN_TYPE;
         this.waitTillReadyTimeInMillis =
                 actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
-
     }
 
     public void setCloseable(AutoCloseable closeable) {
@@ -144,24 +147,24 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
 
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy(actorContext);
+        return txContextFactory.createTransactionChain();
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionType.READ_ONLY);
+       return new TransactionProxy(txContextFactory, TransactionType.READ_ONLY);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
         actorContext.acquireTxCreationPermit();
-        return new TransactionProxy(actorContext, TransactionType.WRITE_ONLY);
+        return new TransactionProxy(txContextFactory, TransactionType.WRITE_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
         actorContext.acquireTxCreationPermit();
-        return new TransactionProxy(actorContext, TransactionType.READ_WRITE);
+        return new TransactionProxy(txContextFactory, TransactionType.READ_WRITE);
     }
 
     @Override
@@ -182,7 +185,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         datastoreConfigMXBean.unregisterMBean();
         datastoreInfoMXBean.unregisterMBean();
 
-        if(closeable != null) {
+        if (closeable != null) {
             try {
                 closeable.close();
             } catch (Exception e) {
@@ -190,6 +193,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
             }
         }
 
+        txContextFactory.close();
         actorContext.shutdown();
         DistributedDataStoreFactory.destroyInstance(this);
     }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..0ea1029
--- /dev/null
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * Fake {@link DOMStoreThreePhaseCommitCohort} instantiated for local transactions to conform with the DOM
+ * transaction APIs. It is only used to hold the data from a local DOM transaction ready operation and to
+ * initiate direct or coordinated commits from the front-end by sending the ReadyLocalTransaction message.
+ * It is not actually called by the front-end to perform 3PC thus the canCommit/preCommit/commit methods
+ * are no-ops.
+ */
+abstract class LocalThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalThreePhaseCommitCohort.class);
+
+    private final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction;
+    private final DataTreeModification modification;
+    private final ActorContext actorContext;
+    private final ActorSelection leader;
+
+    protected LocalThreePhaseCommitCohort(final ActorContext actorContext, final ActorSelection leader,
+            final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final DataTreeModification modification) {
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.leader = Preconditions.checkNotNull(leader);
+        this.transaction = Preconditions.checkNotNull(transaction);
+        this.modification = Preconditions.checkNotNull(modification);
+    }
+
+    private Future<Object> initiateCommit(final boolean immediate) {
+        final ReadyLocalTransaction message = new ReadyLocalTransaction(transaction.getIdentifier().toString(),
+                modification, immediate);
+        return actorContext.executeOperationAsync(leader, message);
+    }
+
+    Future<ActorSelection> initiateCoordinatedCommit() {
+        final Future<Object> messageFuture = initiateCommit(false);
+        final Future<ActorSelection> ret = TransactionReadyReplyMapper.transform(messageFuture, actorContext,
+                transaction.getIdentifier());
+        ret.onComplete(new OnComplete<ActorSelection>() {
+            @Override
+            public void onComplete(final Throwable failure, final ActorSelection success) throws Throwable {
+                if (failure != null) {
+                    LOG.info("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure);
+                    transactionAborted(transaction);
+                    return;
+                }
+
+                LOG.debug("Transaction {} resolved to actor {}", transaction.getIdentifier(), success);
+            }
+        }, actorContext.getClientDispatcher());
+
+        return ret;
+    }
+
+    Future<Object> initiateDirectCommit() {
+        final Future<Object> messageFuture = initiateCommit(true);
+        messageFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(final Throwable failure, final Object message) throws Throwable {
+                if (failure != null) {
+                    LOG.error("Failed to prepare transaction {} on backend", transaction.getIdentifier(), failure);
+                    transactionAborted(transaction);
+                } else if (CommitTransactionReply.SERIALIZABLE_CLASS.isInstance(message)) {
+                    LOG.debug("Transaction {} committed successfully", transaction.getIdentifier());
+                    transactionCommitted(transaction);
+                } else {
+                    LOG.error("Transaction {} resulted in unhandled message type {}, aborting", message.getClass());
+                    transactionAborted(transaction);
+                }
+            }
+        }, actorContext.getClientDispatcher());
+
+        return messageFuture;
+    }
+
+    @Override
+    public final ListenableFuture<Boolean> canCommit() {
+        // Intended no-op
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final ListenableFuture<Void> preCommit() {
+        // Intended no-op
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final ListenableFuture<Void> abort() {
+        // Intended no-op
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final ListenableFuture<Void> commit() {
+        // Intended no-op
+        throw new UnsupportedOperationException();
+    }
+
+    protected abstract void transactionAborted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction);
+    protected abstract void transactionCommitted(SnapshotBackedWriteTransaction<TransactionIdentifier> transaction);
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionChain.java
new file mode 100644 (file)
index 0000000..93e09db
--- /dev/null
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+/**
+ * Transaction chain instantiated on top of a locally-available DataTree. It does not instantiate
+ * a transaction in the leader and rather chains transactions on top of themselves.
+ */
+final class LocalTransactionChain extends AbstractSnapshotBackedTransactionChain<TransactionIdentifier>
+        implements LocalTransactionFactory {
+    private static final Throwable ABORTED = new Throwable("Transaction aborted");
+    private final TransactionChainProxy parent;
+    private final ActorSelection leader;
+    private final DataTree tree;
+
+    LocalTransactionChain(final TransactionChainProxy parent, final ActorSelection leader, final DataTree tree) {
+        this.parent = Preconditions.checkNotNull(parent);
+        this.leader = Preconditions.checkNotNull(leader);
+        this.tree = Preconditions.checkNotNull(tree);
+    }
+
+    DataTree getDataTree() {
+        return tree;
+    }
+
+    @Override
+    protected TransactionIdentifier nextTransactionIdentifier() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected boolean getDebugTransactions() {
+        return false;
+    }
+
+    @Override
+    protected DataTreeSnapshot takeSnapshot() {
+        return tree.takeSnapshot();
+    }
+
+    @Override
+    protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction, final DataTreeModification modification) {
+        return new LocalThreePhaseCommitCohort(parent.getActorContext(), leader, transaction, modification) {
+            @Override
+            protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+                onTransactionFailed(transaction, ABORTED);
+            }
+
+            @Override
+            protected void transactionCommitted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+                onTransactionCommited(transaction);
+            }
+        };
+    }
+
+    @Override
+    public DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier) {
+        return super.newReadWriteTransaction(identifier);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
new file mode 100644 (file)
index 0000000..01a778f
--- /dev/null
@@ -0,0 +1,103 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import scala.concurrent.Future;
+
+/**
+ * Processes front-end transaction operations locally before being committed to the destination shard.
+ * Instances of this class are used when the destination shard is local to the caller.
+ *
+ * @author Thomas Pantelis
+ */
+final class LocalTransactionContext extends AbstractTransactionContext {
+    private final DOMStoreReadWriteTransaction delegate;
+
+    LocalTransactionContext(TransactionIdentifier identifier, DOMStoreReadWriteTransaction delegate) {
+        super(identifier);
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        delegate.write(path, data);
+    }
+
+    @Override
+    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+        delegate.merge(path, data);
+    }
+
+    @Override
+    public void deleteData(YangInstanceIdentifier path) {
+        delegate.delete(path);
+    }
+
+    @Override
+    public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
+        Futures.addCallback(delegate.read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
+            @Override
+            public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
+                proxyFuture.set(result);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                proxyFuture.setException(t);
+            }
+        });
+    }
+
+    @Override
+    public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
+        Futures.addCallback(delegate.exists(path), new FutureCallback<Boolean>() {
+            @Override
+            public void onSuccess(Boolean result) {
+                proxyFuture.set(result);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                proxyFuture.setException(t);
+            }
+        });
+    }
+
+    private LocalThreePhaseCommitCohort ready() {
+        return (LocalThreePhaseCommitCohort) delegate.ready();
+    }
+
+    @Override
+    public Future<ActorSelection> readyTransaction() {
+        return ready().initiateCoordinatedCommit();
+    }
+
+    @Override
+    public Future<Object> directCommit() {
+        return ready().initiateDirectCommit();
+    }
+
+    @Override
+    public boolean supportsDirectCommit() {
+        return true;
+    }
+
+    @Override
+    public void closeTransaction() {
+        delegate.close();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactory.java
new file mode 100644 (file)
index 0000000..8ca4424
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+
+/**
+ * A factory for creating local transactions used by {@link AbstractTransactionContextFactory} to instantiate
+ * transactions on shards which are co-located with the shard leader.
+ *
+ * @author Thomas Pantelis
+ */
+interface LocalTransactionFactory {
+    DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier);
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionFactoryImpl.java
new file mode 100644 (file)
index 0000000..dce9b5c
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link LocalTransactionFactory} for instantiating backing transactions which are
+ * disconnected from each other, ie not chained. These are used by {@link AbstractTransactionContextFactory}
+ * to instantiate transactions on shards which are co-located with the shard leader.
+ */
+final class LocalTransactionFactoryImpl extends TransactionReadyPrototype<TransactionIdentifier>
+        implements LocalTransactionFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LocalTransactionFactoryImpl.class);
+    private final ActorSelection leader;
+    private final DataTree dataTree;
+    private final ActorContext actorContext;
+
+    LocalTransactionFactoryImpl(final ActorContext actorContext, final ActorSelection leader, final DataTree dataTree) {
+        this.leader = Preconditions.checkNotNull(leader);
+        this.dataTree = Preconditions.checkNotNull(dataTree);
+        this.actorContext = actorContext;
+    }
+
+    DataTree getDataTree() {
+        return dataTree;
+    }
+
+    @Override
+    public DOMStoreReadWriteTransaction newReadWriteTransaction(TransactionIdentifier identifier) {
+        return SnapshotBackedTransactions.newReadWriteTransaction(identifier, false, dataTree.takeSnapshot(), this);
+    }
+
+    @Override
+    protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> tx) {
+        // No-op
+    }
+
+    @Override
+    protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<TransactionIdentifier> tx,
+            final DataTreeModification tree) {
+        return new LocalThreePhaseCommitCohort(actorContext, leader, tx, tree) {
+            @Override
+            protected void transactionAborted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+                // No-op
+                LOG.debug("Transaction {} aborted", transaction);
+            }
+
+            @Override
+            protected void transactionCommitted(final SnapshotBackedWriteTransaction<TransactionIdentifier> transaction) {
+                // No-op
+                LOG.debug("Transaction {} committed", transaction);
+            }
+        };
+    }
+}
@@ -9,7 +9,6 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import akka.dispatch.Mapper;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
@@ -20,7 +19,6 @@ import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
@@ -34,8 +32,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
-public class TransactionContextImpl extends AbstractTransactionContext {
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
+/**
+ * Redirects front-end transaction operations to a shard for processing. Instances of this class are used
+ * when the destination shard is remote to the caller.
+ *
+ * @author Thomas Pantelis
+ */
+public class RemoteTransactionContext extends AbstractTransactionContext {
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class);
 
     private final ActorContext actorContext;
     private final ActorSelection actor;
@@ -46,7 +50,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     private BatchedModifications batchedModifications;
     private int totalBatchedModificationsSent;
 
-    protected TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+    protected RemoteTransactionContext(ActorSelection actor, TransactionIdentifier identifier,
             ActorContext actorContext, boolean isTxActorLocal,
             short remoteTransactionVersion, OperationCompleter operationCompleter) {
         super(identifier);
@@ -82,6 +86,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
     @Override
     public void closeTransaction() {
         LOG.debug("Tx {} closeTransaction called", getIdentifier());
+        TransactionContextCleanup.untrack(this);
 
         actorContext.sendOperationAsync(getActor(), CloseTransaction.INSTANCE.toSerializable());
     }
@@ -115,27 +120,7 @@ public class TransactionContextImpl extends AbstractTransactionContext {
         // Transform the last reply Future into a Future that returns the cohort actor path from
         // the last reply message. That's the end result of the ready operation.
 
-        return readyReplyFuture.transform(new Mapper<Object, ActorSelection>() {
-            @Override
-            public ActorSelection checkedApply(Object serializedReadyReply) {
-                LOG.debug("Tx {} readyTransaction", getIdentifier());
-
-                // At this point the ready operation succeeded and we need to extract the cohort
-                // actor path from the reply.
-                if(ReadyTransactionReply.isSerializedType(serializedReadyReply)) {
-                    ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
-                    return actorContext.actorSelection(extractCohortPathFrom(readyTxReply));
-                }
-
-                // Throwing an exception here will fail the Future.
-                throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
-                        getIdentifier(), serializedReadyReply.getClass()));
-            }
-        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
-    }
-
-    protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
-        return readyTxReply.getCohortPath();
+        return TransactionReadyReplyMapper.transform(readyReplyFuture, actorContext, getIdentifier());
     }
 
     private BatchedModifications newBatchedModifications() {
@@ -1,4 +1,5 @@
 /*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
@@ -10,13 +11,9 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -28,46 +25,40 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
- * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
- * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
- * retry task after a short delay.
+ * Handles creation of TransactionContext instances for remote transactions. This class creates
+ * remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit,
+ * if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay.
  * <p>
  * The end result from a completed CreateTransaction message is a TransactionContext that is
  * used to perform transaction operations. Transaction operations that occur before the
- * CreateTransaction completes are cache and executed once the CreateTransaction completes,
- * successfully or not.
+ * CreateTransaction completes are cache via a TransactionContextWrapper and executed once the
+ * CreateTransaction completes, successfully or not.
  */
-final class TransactionFutureCallback extends OnComplete<Object> {
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionFutureCallback.class);
+final class RemoteTransactionContextSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class);
 
     /**
      * Time interval in between transaction create retries.
      */
     private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS);
 
-    /**
-     * The list of transaction operations to execute once the CreateTransaction completes.
-     */
-    @GuardedBy("txOperationsOnComplete")
-    private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
-    private final TransactionProxy proxy;
+    private final TransactionProxy parent;
     private final String shardName;
 
-    /**
-     * The TransactionContext resulting from the CreateTransaction reply.
-     */
-    private volatile TransactionContext transactionContext;
-
     /**
      * The target primary shard.
      */
     private volatile ActorSelection primaryShard;
     private volatile int createTxTries;
 
-    TransactionFutureCallback(final TransactionProxy proxy, final String shardName) {
-        this.proxy = Preconditions.checkNotNull(proxy);
+    private final TransactionContextWrapper transactionContextAdapter;
+
+    RemoteTransactionContextSupport(final TransactionContextWrapper transactionContextAdapter, final TransactionProxy parent,
+            final String shardName) {
+        this.parent = Preconditions.checkNotNull(parent);
         this.shardName = shardName;
-        createTxTries = (int) (proxy.getActorContext().getDatastoreContext().
+        this.transactionContextAdapter = transactionContextAdapter;
+        createTxTries = (int) (parent.getActorContext().getDatastoreContext().
                 getShardLeaderElectionTimeout().duration().toMillis() /
                 CREATE_TX_TRY_INTERVAL.toMillis());
     }
@@ -76,24 +67,20 @@ final class TransactionFutureCallback extends OnComplete<Object> {
         return shardName;
     }
 
-    TransactionContext getTransactionContext() {
-        return transactionContext;
-    }
-
     private TransactionType getTransactionType() {
-        return proxy.getTransactionType();
-    }
-
-    private TransactionIdentifier getIdentifier() {
-        return proxy.getIdentifier();
+        return parent.getType();
     }
 
     private ActorContext getActorContext() {
-        return proxy.getActorContext();
+        return parent.getActorContext();
     }
 
     private Semaphore getOperationLimiter() {
-        return proxy.getOperationLimiter();
+        return parent.getLimiter();
+    }
+
+    private TransactionIdentifier getIdentifier() {
+        return parent.getIdentifier();
     }
 
     /**
@@ -110,43 +97,13 @@ final class TransactionFutureCallback extends OnComplete<Object> {
             // For write-only Tx's we prepare the transaction modifications directly on the shard actor
             // to avoid the overhead of creating a separate transaction actor.
             // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
-            executeTxOperatonsOnComplete(proxy.createValidTransactionContext(this.primaryShard,
+            transactionContextAdapter.executePriorTransactionOperations(createValidTransactionContext(this.primaryShard,
                     this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
         } else {
             tryCreateTransaction();
         }
     }
 
-    /**
-     * Adds a TransactionOperation to be executed after the CreateTransaction completes.
-     */
-    private void addTxOperationOnComplete(TransactionOperation operation) {
-        boolean invokeOperation = true;
-        synchronized(txOperationsOnComplete) {
-            if(transactionContext == null) {
-                LOG.debug("Tx {} Adding operation on complete", getIdentifier());
-
-                invokeOperation = false;
-                txOperationsOnComplete.add(operation);
-            }
-        }
-
-        if(invokeOperation) {
-            operation.invoke(transactionContext);
-        }
-    }
-
-    void enqueueTransactionOperation(final TransactionOperation op) {
-
-        if (transactionContext != null) {
-            op.invoke(transactionContext);
-        } else {
-            // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
-            // callback to be executed after the Tx is created.
-            addTxOperationOnComplete(op);
-        }
-    }
-
     /**
      * Performs a CreateTransaction try async.
      */
@@ -156,15 +113,19 @@ final class TransactionFutureCallback extends OnComplete<Object> {
         }
 
         Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
-            getTransactionType().ordinal(), proxy.getTransactionChainId()).toSerializable();
+            getTransactionType().ordinal(), getIdentifier().getChainId()).toSerializable();
 
         Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage);
 
-        createTxFuture.onComplete(this, getActorContext().getClientDispatcher());
+        createTxFuture.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                onCreateTransactionComplete(failure, response);
+            }
+        }, getActorContext().getClientDispatcher());
     }
 
-    @Override
-    public void onComplete(Throwable failure, Object response) {
+    private void onCreateTransactionComplete(Throwable failure, Object response) {
         if(failure instanceof NoShardLeaderException) {
             // There's no leader for the shard yet - schedule and try again, unless we're out
             // of retries. Note: createTxTries is volatile as it may be written by different
@@ -188,12 +149,7 @@ final class TransactionFutureCallback extends OnComplete<Object> {
         createTransactionContext(failure, response);
     }
 
-    void createTransactionContext(Throwable failure, Object response) {
-        // Mainly checking for state violation here to perform a volatile read of "initialized" to
-        // ensure updates to operationLimter et al are visible to this thread (ie we're doing
-        // "piggy-back" synchronization here).
-        proxy.ensureInitializied();
-
+    private void createTransactionContext(Throwable failure, Object response) {
         // Create the TransactionContext from the response or failure. Store the new
         // TransactionContext locally until we've completed invoking the
         // TransactionOperations. This avoids thread timing issues which could cause
@@ -217,47 +173,33 @@ final class TransactionFutureCallback extends OnComplete<Object> {
             localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
         }
 
-        executeTxOperatonsOnComplete(localTransactionContext);
-    }
-
-    private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
-        while(true) {
-            // Access to txOperationsOnComplete and transactionContext must be protected and atomic
-            // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
-            // issues and ensure no TransactionOperation is missed and that they are processed
-            // in the order they occurred.
-
-            // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
-            // in case a TransactionOperation results in another transaction operation being
-            // queued (eg a put operation from a client read Future callback that is notified
-            // synchronously).
-            Collection<TransactionOperation> operationsBatch = null;
-            synchronized(txOperationsOnComplete) {
-                if(txOperationsOnComplete.isEmpty()) {
-                    // We're done invoking the TransactionOperations so we can now publish the
-                    // TransactionContext.
-                    transactionContext = localTransactionContext;
-                    break;
-                }
-
-                operationsBatch = new ArrayList<>(txOperationsOnComplete);
-                txOperationsOnComplete.clear();
-            }
-
-            // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
-            // A slight down-side is that we need to re-acquire the lock below but this should
-            // be negligible.
-            for(TransactionOperation oper: operationsBatch) {
-                oper.invoke(localTransactionContext);
-            }
-        }
+        transactionContextAdapter.executePriorTransactionOperations(localTransactionContext);
     }
 
     private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
         LOG.debug("Tx {} Received {}", getIdentifier(), reply);
 
-        return proxy.createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
+        return createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
                 reply.getTransactionPath(), reply.getVersion());
     }
 
-}
\ No newline at end of file
+    private TransactionContext createValidTransactionContext(ActorSelection transactionActor, String transactionPath,
+            short remoteTransactionVersion) {
+        // TxActor is always created where the leader of the shard is.
+        // Check if TxActor is created in the same node
+        boolean isTxActorLocal = getActorContext().isPathLocal(transactionPath);
+        final TransactionContext ret;
+
+        if (remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
+            ret = new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
+                getActorContext(), isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+        } else {
+            ret = new RemoteTransactionContext(transactionActor, getIdentifier(), getActorContext(),
+                isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
+        }
+
+        TransactionContextCleanup.track(this, ret);
+        return ret;
+    }
+}
+
index 57749a1..52e7a78 100644 (file)
@@ -63,7 +63,7 @@ public class ThreePhaseCommitCohortProxy extends AbstractThreePhaseCommitCohort<
                 }
                 return null;
             }
-        }, TransactionProxy.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
+        }, TransactionReadyReplyMapper.SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
     }
 
     @Override
index 97f3e74..b44f0b1 100644 (file)
 /*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorSelection;
+import akka.dispatch.OnComplete;
 import com.google.common.base.Preconditions;
-import java.util.Collections;
-import java.util.List;
+import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
+import scala.concurrent.Promise;
 
 /**
- * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
+ * A chain of {@link TransactionProxy}s. It allows a single open transaction to be open
+ * at a time. For remote transactions, it also tracks the outstanding readiness requests
+ * towards the shard and unblocks operations only after all have completed.
  */
-public class TransactionChainProxy implements DOMStoreTransactionChain {
-
-    private interface State {
-        boolean isReady();
+final class TransactionChainProxy extends AbstractTransactionContextFactory<LocalTransactionChain> implements DOMStoreTransactionChain {
+    private static abstract class State {
+        /**
+         * Check if it is okay to allocate a new transaction.
+         * @throws IllegalStateException if a transaction may not be allocated.
+         */
+        abstract void checkReady();
 
-        List<Future<Object>> getPreviousReadyFutures();
+        /**
+         * Return the future which needs to be waited for before shard information
+         * is returned (which unblocks remote transactions).
+         * @return Future to wait for, or null of no wait is necessary
+         */
+        abstract Future<?> previousFuture();
     }
 
-    private static class Allocated implements State {
-        private final ChainedTransactionProxy transaction;
+    private static abstract class Pending extends State {
+        private final TransactionIdentifier transaction;
+        private final Future<?> previousFuture;
 
-        Allocated(ChainedTransactionProxy transaction) {
-            this.transaction = transaction;
+        Pending(final TransactionIdentifier transaction, final Future<?> previousFuture) {
+            this.previousFuture = previousFuture;
+            this.transaction = Preconditions.checkNotNull(transaction);
         }
 
         @Override
-        public boolean isReady() {
-            return transaction.isReady();
+        final Future<?> previousFuture() {
+            return previousFuture;
+        }
+
+        final TransactionIdentifier getIdentifier() {
+            return transaction;
+        }
+    }
+
+    private static final class Allocated extends Pending {
+        Allocated(final TransactionIdentifier transaction, final Future<?> previousFuture) {
+            super(transaction, previousFuture);
         }
 
         @Override
-        public List<Future<Object>> getPreviousReadyFutures() {
-            return transaction.getReadyFutures();
+        void checkReady() {
+            throw new IllegalStateException(String.format("Previous transaction %s is not ready yet", getIdentifier()));
         }
     }
 
-    private static abstract class AbstractDefaultState implements State {
+    private static final class Submitted extends Pending {
+        Submitted(final TransactionIdentifier transaction, final Future<?> previousFuture) {
+            super(transaction, previousFuture);
+        }
+
         @Override
-        public List<Future<Object>> getPreviousReadyFutures() {
-            return Collections.emptyList();
+        void checkReady() {
+            // Okay to allocate
         }
     }
 
-    private static final State IDLE_STATE = new AbstractDefaultState() {
+    private static abstract class DefaultState extends State {
         @Override
-        public boolean isReady() {
-            return true;
+        final Future<?> previousFuture() {
+            return null;
+        }
+    }
+
+    private static final State IDLE_STATE = new DefaultState() {
+        @Override
+        void checkReady() {
+            // Okay to allocate
         }
     };
 
-    private static final State CLOSED_STATE = new AbstractDefaultState() {
+    private static final State CLOSED_STATE = new DefaultState() {
         @Override
-        public boolean isReady() {
+        void checkReady() {
             throw new TransactionChainClosedException("Transaction chain has been closed");
         }
     };
 
-    private static final AtomicInteger counter = new AtomicInteger(0);
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionChainProxy.class);
+    private static final AtomicInteger CHAIN_COUNTER = new AtomicInteger();
+    private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "currentState");
 
-    private final ActorContext actorContext;
     private final String transactionChainId;
+    private final TransactionContextFactory parent;
     private volatile State currentState = IDLE_STATE;
 
-    public TransactionChainProxy(ActorContext actorContext) {
-        this.actorContext = actorContext;
-        transactionChainId = actorContext.getCurrentMemberName() + "-txn-chain-" + counter.incrementAndGet();
+    TransactionChainProxy(final TransactionContextFactory parent) {
+        super(parent.getActorContext());
+        transactionChainId = parent.getActorContext().getCurrentMemberName() + "-txn-chain-" + CHAIN_COUNTER.incrementAndGet();
+        this.parent = parent;
     }
 
     public String getTransactionChainId() {
@@ -88,22 +131,19 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        State localState = currentState;
-        checkReadyState(localState);
-
-        return new ChainedTransactionProxy(actorContext, TransactionType.READ_ONLY,
-                transactionChainId, localState.getPreviousReadyFutures());
+        currentState.checkReady();
+        return new TransactionProxy(this, TransactionType.READ_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        actorContext.acquireTxCreationPermit();
+        getActorContext().acquireTxCreationPermit();
         return allocateWriteTransaction(TransactionType.READ_WRITE);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        actorContext.acquireTxCreationPermit();
+        getActorContext().acquireTxCreationPermit();
         return allocateWriteTransaction(TransactionType.WRITE_ONLY);
     }
 
@@ -112,24 +152,106 @@ public class TransactionChainProxy implements DOMStoreTransactionChain {
         currentState = CLOSED_STATE;
 
         // Send a close transaction chain request to each and every shard
-        actorContext.broadcast(new CloseTransactionChain(transactionChainId).toSerializable());
+        getActorContext().broadcast(new CloseTransactionChain(transactionChainId).toSerializable());
+        parent.removeTransactionChain(this);
     }
 
-    private ChainedTransactionProxy allocateWriteTransaction(TransactionType type) {
+    private TransactionProxy allocateWriteTransaction(final TransactionType type) {
         State localState = currentState;
+        localState.checkReady();
 
-        checkReadyState(localState);
+        final TransactionProxy ret = new TransactionProxy(this, type);
+        currentState = new Allocated(ret.getIdentifier(), localState.previousFuture());
+        return ret;
+    }
 
-        // Pass the ready Futures from the previous Tx.
-        ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type,
-                transactionChainId, localState.getPreviousReadyFutures());
+    @Override
+    protected LocalTransactionChain factoryForShard(final String shardName, final ActorSelection shardLeader, final DataTree dataTree) {
+        final LocalTransactionChain ret = new LocalTransactionChain(this, shardLeader, dataTree);
+        LOG.debug("Allocated transaction chain {} for shard {} leader {}", ret, shardName, shardLeader);
+        return ret;
+    }
 
-        currentState = new Allocated(txProxy);
+    @Override
+    protected DataTree dataTreeForFactory(final LocalTransactionChain factory) {
+        return factory.getDataTree();
+    }
+
+    /**
+     * This method is overridden to ensure the previous Tx's ready operations complete
+     * before we initiate the next Tx in the chain to avoid creation failures if the
+     * previous Tx's ready operations haven't completed yet.
+     */
+    @Override
+    protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName) {
+        // Read current state atomically
+        final State localState = currentState;
+
+        // There are no outstanding futures, shortcut
+        final Future<?> previous = localState.previousFuture();
+        if (previous == null) {
+            return parent.findPrimaryShard(shardName);
+        }
+
+        LOG.debug("Waiting for ready futures for on chain {}", getTransactionChainId());
+
+        // Add a callback for completion of the combined Futures.
+        final Promise<PrimaryShardInfo> returnPromise = akka.dispatch.Futures.promise();
+
+        final OnComplete onComplete = new OnComplete() {
+            @Override
+            public void onComplete(final Throwable failure, final Object notUsed) {
+                if (failure != null) {
+                    // A Ready Future failed so fail the returned Promise.
+                    returnPromise.failure(failure);
+                } else {
+                    LOG.debug("Previous Tx readied - proceeding to FindPrimaryShard on chain {}",
+                            getTransactionChainId());
+
+                    // Send the FindPrimaryShard message and use the resulting Future to complete the
+                    // returned Promise.
+                    returnPromise.completeWith(parent.findPrimaryShard(shardName));
+                }
+            }
+        };
 
-        return txProxy;
+        previous.onComplete(onComplete, getActorContext().getClientDispatcher());
+        return returnPromise.future();
     }
 
-    private void checkReadyState(State state) {
-        Preconditions.checkState(state.isReady(), "Previous transaction is not ready yet");
+    @Override
+    protected <T> void onTransactionReady(final TransactionIdentifier transaction, final Collection<Future<T>> cohortFutures) {
+        final State localState = currentState;
+        Preconditions.checkState(localState instanceof Allocated, "Readying transaction %s while state is %s", transaction, localState);
+        final TransactionIdentifier currentTx = ((Allocated)localState).getIdentifier();
+        Preconditions.checkState(transaction.equals(currentTx), "Readying transaction %s while %s is allocated", transaction, currentTx);
+
+        // Transaction ready and we are not waiting for futures -- go to idle
+        if (cohortFutures.isEmpty()) {
+            currentState = IDLE_STATE;
+            return;
+        }
+
+        // Combine the ready Futures into 1
+        final Future<Iterable<T>> combined = akka.dispatch.Futures.sequence(
+                cohortFutures, getActorContext().getClientDispatcher());
+
+        // Record the we have outstanding futures
+        final State newState = new Submitted(transaction, combined);
+        currentState = newState;
+
+        // Attach a completion reset, but only if we do not allocate a transaction
+        // in-between
+        combined.onComplete(new OnComplete<Iterable<T>>() {
+            @Override
+            public void onComplete(final Throwable arg0, final Iterable<T> arg1) {
+                STATE_UPDATER.compareAndSet(TransactionChainProxy.this, newState, IDLE_STATE);
+            }
+        }, getActorContext().getClientDispatcher());
+    }
+
+    @Override
+    protected TransactionIdentifier nextIdentifier() {
+        return TransactionIdentifier.create(getMemberName(), TX_COUNTER.getAndIncrement(), transactionChainId);
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextCleanup.java
new file mode 100644 (file)
index 0000000..8998f5c
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.FinalizablePhantomReference;
+import com.google.common.base.FinalizableReferenceQueue;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PhantomReference that closes remote transactions for a TransactionContext when it's
+ * garbage collected. This is used for read-only transactions as they're not explicitly closed
+ * by clients. So the only way to detect that a transaction is no longer in use and it's safe
+ * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
+ * but TransactionProxy instances should generally be short-lived enough to avoid being moved
+ * to the old generation space and thus should be cleaned up in a timely manner as the GC
+ * runs on the young generation (eden, swap1...) space much more frequently.
+ */
+final class TransactionContextCleanup extends FinalizablePhantomReference<RemoteTransactionContextSupport> {
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionContextCleanup.class);
+    /**
+     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
+     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
+     * trickery to clean up its internal thread when the bundle is unloaded.
+     */
+    private static final FinalizableReferenceQueue QUEUE = new FinalizableReferenceQueue();
+
+    /**
+     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
+     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
+     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
+     * and thus becomes eligible for garbage collection.
+     */
+    private static final Map<TransactionContext, TransactionContextCleanup> CACHE = new ConcurrentHashMap<>();
+
+    private final TransactionContext cleanup;
+
+    private TransactionContextCleanup(RemoteTransactionContextSupport referent, TransactionContext cleanup) {
+        super(referent, QUEUE);
+        this.cleanup = cleanup;
+    }
+
+    static void track(final RemoteTransactionContextSupport referent, final TransactionContext cleanup) {
+        final TransactionContextCleanup ret = new TransactionContextCleanup(referent, cleanup);
+        CACHE.put(cleanup, ret);
+    }
+
+    @Override
+    public void finalizeReferent() {
+        LOG.trace("Cleaning up {} Tx actors {}", cleanup);
+
+        if (CACHE.remove(cleanup) != null) {
+            cleanup.closeTransaction();
+        }
+    }
+
+    static void untrack(final RemoteTransactionContext cleanup) {
+        CACHE.remove(cleanup);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextFactory.java
new file mode 100644 (file)
index 0000000..8d7ca99
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import java.util.ArrayList;
+import java.util.Collection;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListenerRegistration;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import scala.concurrent.Future;
+
+/**
+ * An {@link AbstractTransactionContextFactory} which produces TransactionContext instances for single
+ * transactions (ie not chained).
+ */
+final class TransactionContextFactory extends AbstractTransactionContextFactory<LocalTransactionFactoryImpl> {
+
+    @GuardedBy("childChains")
+    private final Collection<TransactionChainProxy> childChains = new ArrayList<>();
+
+    private final ShardInfoListenerRegistration<TransactionContextFactory> reg;
+
+    private TransactionContextFactory(final ActorContext actorContext) {
+        super(actorContext);
+        this.reg = actorContext.registerShardInfoListener(this);
+    }
+
+    static TransactionContextFactory create(final ActorContext actorContext) {
+        return new TransactionContextFactory(actorContext);
+    }
+
+    @Override
+    public void close() {
+        reg.close();
+    }
+
+    @Override
+    protected TransactionIdentifier nextIdentifier() {
+        return TransactionIdentifier.create(getMemberName(), TX_COUNTER.getAndIncrement(), null);
+    }
+
+    @Override
+    protected LocalTransactionFactoryImpl factoryForShard(final String shardName, final ActorSelection shardLeader, final DataTree dataTree) {
+        return new LocalTransactionFactoryImpl(getActorContext(), shardLeader, dataTree);
+    }
+
+    @Override
+    protected Future<PrimaryShardInfo> findPrimaryShard(final String shardName) {
+        return getActorContext().findPrimaryShardAsync(shardName);
+    }
+
+    @Override
+    protected <T> void onTransactionReady(final TransactionIdentifier transaction, final Collection<Future<T>> cohortFutures) {
+        // Transactions are disconnected, this is a no-op
+    }
+
+    DOMStoreTransactionChain createTransactionChain() {
+        final TransactionChainProxy ret = new TransactionChainProxy(this);
+
+        synchronized (childChains) {
+            childChains.add(ret);
+        }
+
+        return ret;
+    }
+
+    void removeTransactionChain(final TransactionChainProxy chain) {
+        synchronized (childChains) {
+            childChains.remove(chain);
+        }
+    }
+
+    @Override
+    public void onShardInfoUpdated(final String shardName, final PrimaryShardInfo primaryShardInfo) {
+        synchronized (childChains) {
+            for (TransactionChainProxy chain : childChains) {
+                chain.onShardInfoUpdated(shardName, primaryShardInfo);
+            }
+            super.onShardInfoUpdated(shardName, primaryShardInfo);
+        }
+    }
+
+    @Override
+    protected DataTree dataTreeForFactory(final LocalTransactionFactoryImpl factory) {
+        return factory.getDataTree();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContextWrapper.java
new file mode 100644 (file)
index 0000000..137f652
--- /dev/null
@@ -0,0 +1,116 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
+ * TransactionContext instance are cached until the TransactionContext instance becomes available at which
+ * time they are executed.
+ *
+ * @author Thomas Pantelis
+ */
+class TransactionContextWrapper {
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
+
+    /**
+     * The list of transaction operations to execute once the TransactionContext becomes available.
+     */
+    @GuardedBy("queuedTxOperations")
+    private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
+
+    /**
+     * The resulting TransactionContext.
+     */
+    private volatile TransactionContext transactionContext;
+
+    private final TransactionIdentifier identifier;
+
+    TransactionContextWrapper(TransactionIdentifier identifier) {
+        this.identifier = identifier;
+    }
+
+    TransactionContext getTransactionContext() {
+        return transactionContext;
+    }
+
+    TransactionIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    /**
+     * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
+     */
+    private void enqueueTransactionOperation(TransactionOperation operation) {
+        boolean invokeOperation = true;
+        synchronized(queuedTxOperations) {
+            if(transactionContext == null) {
+                LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
+
+                invokeOperation = false;
+                queuedTxOperations.add(operation);
+            }
+        }
+
+        if(invokeOperation) {
+            operation.invoke(transactionContext);
+        }
+    }
+
+    void maybeExecuteTransactionOperation(final TransactionOperation op) {
+
+        if (transactionContext != null) {
+            op.invoke(transactionContext);
+        } else {
+            // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
+            // callback to be executed after the Tx is created.
+            enqueueTransactionOperation(op);
+        }
+    }
+
+    void executePriorTransactionOperations(TransactionContext localTransactionContext) {
+        while(true) {
+            // Access to queuedTxOperations and transactionContext must be protected and atomic
+            // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
+            // issues and ensure no TransactionOperation is missed and that they are processed
+            // in the order they occurred.
+
+            // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
+            // in case a TransactionOperation results in another transaction operation being
+            // queued (eg a put operation from a client read Future callback that is notified
+            // synchronously).
+            Collection<TransactionOperation> operationsBatch = null;
+            synchronized(queuedTxOperations) {
+                if(queuedTxOperations.isEmpty()) {
+                    // We're done invoking the TransactionOperations so we can now publish the
+                    // TransactionContext.
+                    transactionContext = localTransactionContext;
+                    break;
+                }
+
+                operationsBatch = new ArrayList<>(queuedTxOperations);
+                queuedTxOperations.clear();
+            }
+
+            // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
+            // A slight down-side is that we need to re-acquire the lock below but this should
+            // be negligible.
+            for(TransactionOperation oper: operationsBatch) {
+                oper.invoke(localTransactionContext);
+            }
+        }
+    }
+}
index 5081c9b..82258b4 100644 (file)
@@ -5,33 +5,28 @@
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
-
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
@@ -42,265 +37,139 @@ import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 
 /**
- * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
- * <p>
- * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
- * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
- * be created on each of those shards by the TransactionProxy
- *</p>
- * <p>
- * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
- * shards will be executed.
- * </p>
+ * A transaction potentially spanning multiple backend shards.
  */
 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
-
     private static enum TransactionState {
         OPEN,
         READY,
         CLOSED,
     }
-
-    static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
-                                                              new Mapper<Throwable, Throwable>() {
-        @Override
-        public Throwable apply(Throwable failure) {
-            return failure;
-        }
-    };
-
-    private static final AtomicLong counter = new AtomicLong();
-
     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
 
-    /**
-     * Stores the remote Tx actors for each requested data store path to be used by the
-     * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
-     * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
-     * remoteTransactionActors list so they will be visible to the thread accessing the
-     * PhantomReference.
-     */
-    List<ActorSelection> remoteTransactionActors;
-    volatile AtomicBoolean remoteTransactionActorsMB;
-
-    /**
-     * Stores the create transaction results per shard.
-     */
-    private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
-
-    private final TransactionType transactionType;
-    final ActorContext actorContext;
-    private final SchemaContext schemaContext;
+    private final Map<String, TransactionContextWrapper> txContextAdapters = new HashMap<>();
+    private final AbstractTransactionContextFactory<?> txContextFactory;
+    private final TransactionType type;
     private TransactionState state = TransactionState.OPEN;
+    private volatile OperationCompleter operationCompleter;
+    private volatile Semaphore operationLimiter;
 
-    private volatile boolean initialized;
-    private Semaphore operationLimiter;
-    private OperationCompleter operationCompleter;
+    @VisibleForTesting
+    public TransactionProxy(final AbstractTransactionContextFactory<?> txContextFactory, final TransactionType type) {
+        super(txContextFactory.nextIdentifier(), false);
+        this.txContextFactory = txContextFactory;
+        this.type = Preconditions.checkNotNull(type);
 
-    public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
-        this(actorContext, transactionType, "");
+        LOG.debug("New {} Tx - {}", type, getIdentifier());
     }
 
-    public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
-        super(createIdentifier(actorContext, transactionChainId));
-        this.actorContext = Preconditions.checkNotNull(actorContext,
-            "actorContext should not be null");
-        this.transactionType = Preconditions.checkNotNull(transactionType,
-            "transactionType should not be null");
-        this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
-            "schemaContext should not be null");
-
-        LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
-    }
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+        Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
 
-    private static TransactionIdentifier createIdentifier(ActorContext actorContext, String transactionChainId) {
-        String memberName = actorContext.getCurrentMemberName();
-        if (memberName == null) {
-            memberName = "UNKNOWN-MEMBER";
-        }
+        LOG.debug("Tx {} exists {}", getIdentifier(), path);
 
-        return TransactionIdentifier.create(memberName, counter.getAndIncrement(), transactionChainId);
-    }
+        throttleOperation();
 
-    @VisibleForTesting
-    boolean hasTransactionContext() {
-        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            if(transactionContext != null) {
-                return true;
+        final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
+        TransactionContextWrapper contextAdapter = getContextAdapter(path);
+        contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
+            @Override
+            public void invoke(TransactionContext transactionContext) {
+                transactionContext.dataExists(path, proxyFuture);
             }
-        }
-
-        return false;
-    }
+        });
 
-    private static boolean isRootPath(YangInstanceIdentifier path) {
-        return !path.getPathArguments().iterator().hasNext();
+        return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
     @Override
     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
-
-        Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
-                "Read operation on write-only transaction is not allowed");
+        Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
 
         LOG.debug("Tx {} read {}", getIdentifier(), path);
 
-        final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
-
-        if(isRootPath(path)){
-            readAllData(path, proxyFuture);
+        if (YangInstanceIdentifier.EMPTY.equals(path)) {
+            return readAllData();
         } else {
             throttleOperation();
 
-            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
-                @Override
-                public void invoke(TransactionContext transactionContext) {
-                    transactionContext.readData(path, proxyFuture);
-                }
-            });
-
-        }
-
-        return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
-    }
-
-    private void readAllData(final YangInstanceIdentifier path,
-                             final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
-        Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
-        List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
-
-        for(String shardName : allShardNames){
-            final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
-
-            throttleOperation();
-
-            TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
-            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
-                @Override
-                public void invoke(TransactionContext transactionContext) {
-                    transactionContext.readData(path, subProxyFuture);
-                }
-            });
-
-            futures.add(subProxyFuture);
+            return singleShardRead(shardNameFromIdentifier(path), path);
         }
-
-        final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
-
-        future.addListener(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
-                            future.get(), actorContext.getSchemaContext()));
-                } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
-                    proxyFuture.setException(e);
-                }
-            }
-        }, actorContext.getActorSystem().dispatcher());
     }
 
-    @Override
-    public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
-
-        Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
-                "Exists operation on write-only transaction is not allowed");
-
-        LOG.debug("Tx {} exists {}", getIdentifier(), path);
-
-        throttleOperation();
-
-        final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
-
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+    private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> singleShardRead(
+            final String shardName, final YangInstanceIdentifier path) {
+        final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
+        TransactionContextWrapper contextAdapter = getContextAdapter(shardName);
+        contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
-                transactionContext.dataExists(path, proxyFuture);
+                transactionContext.readData(path, proxyFuture);
             }
         });
 
         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
     }
 
-    private void checkModificationState() {
-        Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
-                "Modification operation on read-only transaction is not allowed");
-        Preconditions.checkState(state == TransactionState.OPEN,
-                "Transaction is sealed - further modifications are not allowed");
-    }
-
-    private void throttleOperation() {
-        throttleOperation(1);
-    }
+    private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readAllData() {
+        final Set<String> allShardNames = txContextFactory.getActorContext().getConfiguration().getAllShardNames();
+        final Collection<CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException>> futures = new ArrayList<>(allShardNames.size());
 
-    private void throttleOperation(int acquirePermits) {
-        if(!initialized) {
-            // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
-            operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
-            operationCompleter = new OperationCompleter(operationLimiter);
-
-            // Make sure we write this last because it's volatile and will also publish the non-volatile writes
-            // above as well so they'll be visible to other threads.
-            initialized = true;
+        for (String shardName : allShardNames) {
+            futures.add(singleShardRead(shardName, YangInstanceIdentifier.EMPTY));
         }
 
-        try {
-            if(!operationLimiter.tryAcquire(acquirePermits,
-                    actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
-                LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
-            }
-        } catch (InterruptedException e) {
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
-            } else {
-                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
+        final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> listFuture = Futures.allAsList(futures);
+        final ListenableFuture<Optional<NormalizedNode<?, ?>>> aggregateFuture;
+
+        aggregateFuture = Futures.transform(listFuture, new Function<List<Optional<NormalizedNode<?, ?>>>, Optional<NormalizedNode<?, ?>>>() {
+            @Override
+            public Optional<NormalizedNode<?, ?>> apply(final List<Optional<NormalizedNode<?, ?>>> input) {
+                try {
+                    return NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.EMPTY, input, txContextFactory.getActorContext().getSchemaContext());
+                } catch (DataValidationFailedException e) {
+                    throw new IllegalArgumentException("Failed to aggregate", e);
+                }
             }
-        }
-    }
+        });
 
-    final void ensureInitializied() {
-        Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier());
+        return MappingCheckedFuture.create(aggregateFuture, ReadFailedException.MAPPER);
     }
 
     @Override
-    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-
+    public void delete(final YangInstanceIdentifier path) {
         checkModificationState();
 
-        LOG.debug("Tx {} write {}", getIdentifier(), path);
+        LOG.debug("Tx {} delete {}", getIdentifier(), path);
 
         throttleOperation();
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+        TransactionContextWrapper contextAdapter = getContextAdapter(path);
+        contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
-                transactionContext.writeData(path, data);
+                transactionContext.deleteData(path);
             }
         });
     }
 
     @Override
     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-
         checkModificationState();
 
         LOG.debug("Tx {} merge {}", getIdentifier(), path);
 
         throttleOperation();
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+        TransactionContextWrapper contextAdapter = getContextAdapter(path);
+        contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
                 transactionContext.mergeData(path, data);
@@ -309,23 +178,29 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     @Override
-    public void delete(final YangInstanceIdentifier path) {
-
+    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
         checkModificationState();
 
-        LOG.debug("Tx {} delete {}", getIdentifier(), path);
+        LOG.debug("Tx {} write {}", getIdentifier(), path);
 
         throttleOperation();
 
-        TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
-        txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+        TransactionContextWrapper contextAdapter = getContextAdapter(path);
+        contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
-                transactionContext.deleteData(path);
+                transactionContext.writeData(path, data);
             }
         });
     }
 
+    private void checkModificationState() {
+        Preconditions.checkState(type != TransactionType.READ_ONLY,
+                "Modification operation on read-only transaction is not allowed");
+        Preconditions.checkState(state == TransactionState.OPEN,
+                "Transaction is sealed - further modifications are not allowed");
+    }
+
     private boolean seal(final TransactionState newState) {
         if (state == TransactionState.OPEN) {
             state = newState;
@@ -336,59 +211,88 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
     }
 
     @Override
-    public AbstractThreePhaseCommitCohort<?> ready() {
-        Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
-                "Read-only transactions cannot be readied");
+    public final void close() {
+        if (!seal(TransactionState.CLOSED)) {
+            Preconditions.checkState(state == TransactionState.CLOSED, "Transaction %s is ready, it cannot be closed",
+                getIdentifier());
+            // Idempotent no-op as per AutoCloseable recommendation
+            return;
+        }
+
+        for (TransactionContextWrapper contextAdapter : txContextAdapters.values()) {
+            contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
+                @Override
+                public void invoke(TransactionContext transactionContext) {
+                    transactionContext.closeTransaction();
+                }
+            });
+        }
+
+
+        txContextAdapters.clear();
+    }
+
+    @Override
+    public final AbstractThreePhaseCommitCohort<?> ready() {
+        Preconditions.checkState(type != TransactionType.READ_ONLY, "Read-only transactions cannot be readied");
 
         final boolean success = seal(TransactionState.READY);
         Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
 
-        LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
-                    txFutureCallbackMap.size());
-
-        if (txFutureCallbackMap.isEmpty()) {
-            TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
-            return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
+        LOG.debug("Tx {} Readying {} components for commit", getIdentifier(), txContextAdapters.size());
+
+        final AbstractThreePhaseCommitCohort<?> ret;
+        switch (txContextAdapters.size()) {
+        case 0:
+            TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(txContextFactory.getActorContext());
+            ret = NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
+            break;
+        case 1:
+            final Entry<String, TransactionContextWrapper> e = Iterables.getOnlyElement(txContextAdapters.entrySet());
+            ret = createSingleCommitCohort(e.getKey(), e.getValue());
+            break;
+        default:
+            ret = createMultiCommitCohort(txContextAdapters.entrySet());
         }
 
-        throttleOperation(txFutureCallbackMap.size());
-
-        final boolean isSingleShard = txFutureCallbackMap.size() == 1;
-        return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
+        txContextFactory.onTransactionReady(getIdentifier(), ret.getCohortFutures());
+        return ret;
     }
 
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
-        TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
+    private AbstractThreePhaseCommitCohort<?> createSingleCommitCohort(final String shardName,
+            final TransactionContextWrapper contextAdapter) {
+        throttleOperation();
 
-        LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
-                txFutureCallback.getShardName(), getTransactionChainId());
+        LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), shardName);
 
         final OperationCallback.Reference operationCallbackRef =
                 new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
-        final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
+
+        final TransactionContext transactionContext = contextAdapter.getTransactionContext();
         final Future future;
-        if (transactionContext != null) {
-            // avoid the creation of a promise and a TransactionOperation
-            future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
-        } else {
+        if (transactionContext == null) {
             final Promise promise = akka.dispatch.Futures.promise();
-            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+            contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
                 @Override
                 public void invoke(TransactionContext transactionContext) {
                     promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
                 }
             });
             future = promise.future();
+        } else {
+            // avoid the creation of a promise and a TransactionOperation
+            future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
         }
 
-        return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
+        return new SingleCommitCohortProxy(txContextFactory.getActorContext(), future, getIdentifier().toString(),
+                operationCallbackRef);
     }
 
     private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
             OperationCallback.Reference operationCallbackRef) {
-        if(transactionContext.supportsDirectCommit()) {
-            TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
+        if (transactionContext.supportsDirectCommit()) {
+            TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(
+                    txContextFactory.getActorContext());
             operationCallbackRef.set(rateLimitingCallback);
             rateLimitingCallback.run();
             return transactionContext.directCommit();
@@ -397,153 +301,106 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
         }
     }
 
-    private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
-        List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
-        for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
+    private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort(
+            final Set<Entry<String, TransactionContextWrapper>> txContextAdapterEntries) {
 
-            LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
-                        txFutureCallback.getShardName(), getTransactionChainId());
+        throttleOperation();
+        final List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txContextAdapterEntries.size());
+        for (Entry<String, TransactionContextWrapper> e : txContextAdapterEntries) {
+            LOG.debug("Tx {} Readying transaction for shard {}", getIdentifier(), e.getKey());
 
-            final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
-            final Future<ActorSelection> future;
+            TransactionContextWrapper contextAdapter = e.getValue();
+            final TransactionContext transactionContext = contextAdapter.getTransactionContext();
+            Future<ActorSelection> future;
             if (transactionContext != null) {
                 // avoid the creation of a promise and a TransactionOperation
                 future = transactionContext.readyTransaction();
             } else {
                 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
-                txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
+                contextAdapter.maybeExecuteTransactionOperation(new TransactionOperation() {
                     @Override
                     public void invoke(TransactionContext transactionContext) {
                         promise.completeWith(transactionContext.readyTransaction());
                     }
                 });
+
                 future = promise.future();
             }
 
             cohortFutures.add(future);
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
+        return new ThreePhaseCommitCohortProxy(txContextFactory.getActorContext(), cohortFutures, getIdentifier().toString());
     }
 
-    @Override
-    public void close() {
-        if (!seal(TransactionState.CLOSED)) {
-            if (state == TransactionState.CLOSED) {
-                // Idempotent no-op as per AutoCloseable recommendation
-                return;
-            }
-
-            throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
-                getIdentifier()));
-        }
-
-        for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
-            txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
-                @Override
-                public void invoke(TransactionContext transactionContext) {
-                    transactionContext.closeTransaction();
-                }
-            });
-        }
-
-        txFutureCallbackMap.clear();
-
-        if(remoteTransactionActorsMB != null) {
-            remoteTransactionActors.clear();
-            remoteTransactionActorsMB.set(true);
-        }
-    }
-
-    private String shardNameFromIdentifier(YangInstanceIdentifier path){
+    private static String shardNameFromIdentifier(final YangInstanceIdentifier path) {
         return ShardStrategyFactory.getStrategy(path).findShard(path);
     }
 
-    protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
-        return actorContext.findPrimaryShardAsync(shardName);
+    private TransactionContextWrapper getContextAdapter(final YangInstanceIdentifier path) {
+        return getContextAdapter(shardNameFromIdentifier(path));
     }
 
-    final TransactionType getTransactionType() {
-        return transactionType;
-    }
+    private TransactionContextWrapper getContextAdapter(final String shardName) {
+        final TransactionContextWrapper existing = txContextAdapters.get(shardName);
+        if (existing != null) {
+            return existing;
+        }
 
-    final Semaphore getOperationLimiter() {
-        return operationLimiter;
+        final TransactionContextWrapper fresh = txContextFactory.newTransactionAdapter(this, shardName);
+        txContextAdapters.put(shardName, fresh);
+        return fresh;
     }
 
-    private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
-        String shardName = shardNameFromIdentifier(path);
-        return getOrCreateTxFutureCallback(shardName);
+    TransactionType getType() {
+        return type;
     }
 
-    private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
-        TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
-        if(txFutureCallback == null) {
-            Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
-
-            final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
+    boolean isReady() {
+        return state != TransactionState.OPEN;
+    }
 
-            txFutureCallback = newTxFutureCallback;
-            txFutureCallbackMap.put(shardName, txFutureCallback);
+    ActorContext getActorContext() {
+        return txContextFactory.getActorContext();
+    }
 
-            findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
-                @Override
-                public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
-                    if(failure != null) {
-                        newTxFutureCallback.createTransactionContext(failure, null);
-                    } else {
-                        newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
-                    }
-                }
-            }, actorContext.getClientDispatcher());
+    OperationCompleter getCompleter() {
+        OperationCompleter ret = operationCompleter;
+        if (ret == null) {
+            final Semaphore s = getLimiter();
+            ret = new OperationCompleter(s);
+            operationCompleter = ret;
         }
 
-        return txFutureCallback;
+        return ret;
     }
 
-    String getTransactionChainId() {
-        return getIdentifier().getChainId();
+    Semaphore getLimiter() {
+        Semaphore ret = operationLimiter;
+        if (ret == null) {
+            // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
+            ret = new Semaphore(getActorContext().getTransactionOutstandingOperationLimit());
+            operationLimiter = ret;
+        }
+        return ret;
     }
 
-    protected ActorContext getActorContext() {
-        return actorContext;
+    void throttleOperation() {
+        throttleOperation(1);
     }
 
-    TransactionContext createValidTransactionContext(ActorSelection transactionActor,
-            String transactionPath, short remoteTransactionVersion) {
-
-        if (transactionType == TransactionType.READ_ONLY) {
-            // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
-            // to close the remote Tx's when this instance is no longer in use and is garbage
-            // collected.
-
-            if(remoteTransactionActorsMB == null) {
-                remoteTransactionActors = Lists.newArrayList();
-                remoteTransactionActorsMB = new AtomicBoolean();
-
-                TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
+    private void throttleOperation(int acquirePermits) {
+        try {
+            if (!getLimiter().tryAcquire(acquirePermits,
+                getActorContext().getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
+                LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
+            }
+        } catch (InterruptedException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier(), e);
+            } else {
+                LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
             }
-
-            // Add the actor to the remoteTransactionActors list for access by the
-            // cleanup PhantonReference.
-            remoteTransactionActors.add(transactionActor);
-
-            // Write to the memory barrier volatile to publish the above update to the
-            // remoteTransactionActors list for thread visibility.
-            remoteTransactionActorsMB.set(true);
-        }
-
-        // TxActor is always created where the leader of the shard is.
-        // Check if TxActor is created in the same node
-        boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
-
-        if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
-            return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
-                    actorContext, isTxActorLocal, remoteTransactionVersion,
-                    operationCompleter);
-        } else {
-            return new TransactionContextImpl(transactionActor, getIdentifier(),
-                    actorContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
         }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxyCleanupPhantomReference.java
deleted file mode 100644 (file)
index 77834d9..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import akka.actor.ActorSelection;
-import com.google.common.base.FinalizablePhantomReference;
-import com.google.common.base.FinalizableReferenceQueue;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A PhantomReference that closes remote transactions for a TransactionProxy when it's
- * garbage collected. This is used for read-only transactions as they're not explicitly closed
- * by clients. So the only way to detect that a transaction is no longer in use and it's safe
- * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
- * but TransactionProxy instances should generally be short-lived enough to avoid being moved
- * to the old generation space and thus should be cleaned up in a timely manner as the GC
- * runs on the young generation (eden, swap1...) space much more frequently.
- */
-final class TransactionProxyCleanupPhantomReference
-                                       extends FinalizablePhantomReference<TransactionProxy> {
-    private static final Logger LOG = LoggerFactory.getLogger(TransactionProxyCleanupPhantomReference.class);
-    /**
-     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
-     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
-     * trickery to clean up its internal thread when the bundle is unloaded.
-     */
-    private static final FinalizableReferenceQueue phantomReferenceQueue =
-                                                                  new FinalizableReferenceQueue();
-
-    /**
-     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
-     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
-     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
-     * and thus becomes eligible for garbage collection.
-     */
-    private static final Map<TransactionProxyCleanupPhantomReference,
-                             TransactionProxyCleanupPhantomReference> phantomReferenceCache =
-                                                                        new ConcurrentHashMap<>();
-
-    private final List<ActorSelection> remoteTransactionActors;
-    private final AtomicBoolean remoteTransactionActorsMB;
-    private final ActorContext actorContext;
-    private final TransactionIdentifier identifier;
-
-    private TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
-        super(referent, phantomReferenceQueue);
-
-        // Note we need to cache the relevant fields from the TransactionProxy as we can't
-        // have a hard reference to the TransactionProxy instance itself.
-
-        remoteTransactionActors = referent.remoteTransactionActors;
-        remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
-        actorContext = referent.actorContext;
-        identifier = referent.getIdentifier();
-    }
-
-    static void track(TransactionProxy referent) {
-        final TransactionProxyCleanupPhantomReference ret = new TransactionProxyCleanupPhantomReference(referent);
-        phantomReferenceCache.put(ret, ret);
-    }
-
-    @Override
-    public void finalizeReferent() {
-        LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
-                remoteTransactionActors.size(), identifier);
-
-        phantomReferenceCache.remove(this);
-
-        // Access the memory barrier volatile to ensure all previous updates to the
-        // remoteTransactionActors list are visible to this thread.
-
-        if(remoteTransactionActorsMB.get()) {
-            for(ActorSelection actor : remoteTransactionActors) {
-                LOG.trace("Sending CloseTransaction to {}", actor);
-                actorContext.sendOperationAsync(actor, CloseTransaction.INSTANCE.toSerializable());
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionReadyReplyMapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionReadyReplyMapper.java
new file mode 100644 (file)
index 0000000..22bfbb1
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import akka.dispatch.Mapper;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * A {@link Mapper} extracting the {@link ActorSelection} pointing to the actor which
+ * is backing a particular transaction.
+ *
+ * This class is not for general consumption. It is public only to support the pre-lithium compatibility
+ * package.
+ *
+ * TODO: once we remove compatibility, make this class package-private and final.
+ */
+public class TransactionReadyReplyMapper extends Mapper<Object, ActorSelection> {
+    protected static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new Mapper<Throwable, Throwable>() {
+        @Override
+        public Throwable apply(final Throwable failure) {
+            return failure;
+        }
+    };
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionReadyReplyMapper.class);
+    private final TransactionIdentifier identifier;
+    private final ActorContext actorContext;
+
+    protected TransactionReadyReplyMapper(final ActorContext actorContext, final TransactionIdentifier identifier) {
+        this.actorContext = Preconditions.checkNotNull(actorContext);
+        this.identifier = Preconditions.checkNotNull(identifier);
+    }
+
+    protected final ActorContext getActorContext() {
+        return actorContext;
+    }
+
+    protected String extractCohortPathFrom(final ReadyTransactionReply readyTxReply) {
+        return readyTxReply.getCohortPath();
+    }
+
+    @Override
+    public final ActorSelection checkedApply(final Object serializedReadyReply) {
+        LOG.debug("Tx {} readyTransaction", identifier);
+
+        // At this point the ready operation succeeded and we need to extract the cohort
+        // actor path from the reply.
+        if (ReadyTransactionReply.isSerializedType(serializedReadyReply)) {
+            ReadyTransactionReply readyTxReply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
+            return actorContext.actorSelection(extractCohortPathFrom(readyTxReply));
+        }
+
+        // Throwing an exception here will fail the Future.
+        throw new IllegalArgumentException(String.format("%s: Invalid reply type %s",
+                identifier, serializedReadyReply.getClass()));
+    }
+
+    static Future<ActorSelection> transform(final Future<Object> readyReplyFuture, final ActorContext actorContext,
+            final TransactionIdentifier identifier) {
+        return readyReplyFuture.transform(new TransactionReadyReplyMapper(actorContext, identifier),
+            SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
+    }
+}
index 249a115..dc82565 100644 (file)
@@ -10,12 +10,11 @@ package org.opendaylight.controller.cluster.datastore.compat;
 import akka.actor.ActorSelection;
 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.OperationCompleter;
-import org.opendaylight.controller.cluster.datastore.TransactionContextImpl;
+import org.opendaylight.controller.cluster.datastore.RemoteTransactionContext;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -30,7 +29,8 @@ import scala.concurrent.Future;
  *
  * @author Thomas Pantelis
  */
-public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
+@Deprecated
+public class PreLithiumTransactionContextImpl extends RemoteTransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(PreLithiumTransactionContextImpl.class);
 
     private final String transactionPath;
@@ -69,7 +69,7 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
     }
 
     @Override
-    protected String extractCohortPathFrom(ReadyTransactionReply readyTxReply) {
+    protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
         // In base Helium we used to return the local path of the actor which represented
         // a remote ThreePhaseCommitCohort. The local path would then be converted to
         // a remote path using this resolvePath method. To maintain compatibility with
@@ -77,11 +77,11 @@ public class PreLithiumTransactionContextImpl extends TransactionContextImpl {
         // At some point in the future when upgrades from Helium are not supported
         // we could remove this code to resolvePath and just use the cohortPath as the
         // resolved cohortPath
-        if(getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-            return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath());
+        if (getRemoteTransactionVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+            return PreLithiumTransactionReadyReplyMapper.transform(readyReplyFuture, getActorContext(), getIdentifier(), transactionPath);
+        } else {
+            return super.transformReadyReply(readyReplyFuture);
         }
-
-        return readyTxReply.getCohortPath();
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionReadyReplyMapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionReadyReplyMapper.java
new file mode 100644 (file)
index 0000000..c8fab08
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.compat;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.datastore.TransactionReadyReplyMapper;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import scala.concurrent.Future;
+import akka.actor.ActorSelection;
+
+/**
+ * A {@link Mapper} extracting the {@link ActorSelection} pointing to the actor which
+ * is backing a particular transaction. This class supports the Helium base release
+ * behavior.
+ */
+@Deprecated
+public final class PreLithiumTransactionReadyReplyMapper extends TransactionReadyReplyMapper {
+    private final String transactionPath;
+
+    private PreLithiumTransactionReadyReplyMapper(ActorContext actorContext, TransactionIdentifier identifier, final String transactionPath) {
+        super(actorContext, identifier);
+        this.transactionPath = Preconditions.checkNotNull(transactionPath);
+    }
+
+    @Override
+    protected String extractCohortPathFrom(final ReadyTransactionReply readyTxReply) {
+        return getActorContext().resolvePath(transactionPath, readyTxReply.getCohortPath());
+    }
+
+    public static Future<ActorSelection> transform(final Future<Object> readyReplyFuture, final ActorContext actorContext,
+        final TransactionIdentifier identifier, final String transactionPath) {
+        return readyReplyFuture.transform(new PreLithiumTransactionReadyReplyMapper(actorContext, identifier, transactionPath),
+            SAME_FAILURE_TRANSFORMER, actorContext.getClientDispatcher());
+    }
+}
index 6d952f9..f8cd18c 100644 (file)
@@ -13,14 +13,16 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 /**
  * Message notifying the shard leader to apply modifications which have been
  * prepared locally against its DataTree. This message is not directly serializable,
- * simply because the leader and sender need to be on the same system.
+ * simply because the leader and sender need to be on the same system. When it needs
+ * to be sent out to a remote system, it needs to be intercepted by {@link ReadyLocalTransactionSerializer}
+ * and turned into {@link BatchedModifications}.
  */
 public final class ReadyLocalTransaction {
     private final DataTreeModification modification;
     private final String transactionID;
     private final boolean doCommitOnReady;
 
-    public ReadyLocalTransaction(final String transactionID, DataTreeModification modification, boolean doCommitOnReady) {
+    public ReadyLocalTransaction(final String transactionID, final DataTreeModification modification, final boolean doCommitOnReady) {
         this.transactionID = Preconditions.checkNotNull(transactionID);
         this.modification = Preconditions.checkNotNull(modification);
         this.doCommitOnReady = doCommitOnReady;
index 18a8798..ad05a1c 100644 (file)
@@ -28,8 +28,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.util.concurrent.RateLimiter;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
@@ -65,7 +70,7 @@ import scala.concurrent.duration.FiniteDuration;
  * easily. An ActorContext can be freely passed around to local object instances
  * but should not be passed to actors especially remote actors
  */
-public class ActorContext {
+public class ActorContext implements RemovalListener<String, Future<PrimaryShardInfo>> {
     private static final Logger LOG = LoggerFactory.getLogger(ActorContext.class);
     private static final String DISTRIBUTED_DATA_STORE_METRIC_REGISTRY = "distributed-data-store";
     private static final String METRIC_RATE = "rate";
@@ -104,6 +109,8 @@ public class ActorContext {
     private volatile SchemaContext schemaContext;
     private volatile boolean updated;
     private final MetricRegistry metricRegistry = MetricsReporter.getInstance(DatastoreContext.METRICS_DOMAIN).getMetricsRegistry();
+    @GuardedBy("shardInfoListeners")
+    private final Collection<ShardInfoListenerRegistration<?>> shardInfoListeners = new ArrayList<>();
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
             ClusterWrapper clusterWrapper, Configuration configuration) {
@@ -146,6 +153,7 @@ public class ActorContext {
 
         primaryShardInfoCache = CacheBuilder.newBuilder()
                 .expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
+                .removalListener(this)
                 .build();
     }
 
@@ -236,6 +244,12 @@ public class ActorContext {
         ActorSelection actorSelection = actorSystem.actorSelection(primaryActorPath);
         PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.fromNullable(localShardDataTree));
         primaryShardInfoCache.put(shardName, Futures.successful(info));
+
+        synchronized (shardInfoListeners) {
+            for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
+                reg.getInstance().onShardInfoUpdated(shardName, info);
+            }
+        }
         return info;
     }
 
@@ -566,4 +580,28 @@ public class ActorContext {
     Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
         return primaryShardInfoCache;
     }
+
+    public <T extends ShardInfoListener> ShardInfoListenerRegistration<T> registerShardInfoListener(final T listener) {
+        final ShardInfoListenerRegistration<T> reg = new ShardInfoListenerRegistration<T>(listener, this);
+
+        synchronized (shardInfoListeners) {
+            shardInfoListeners.add(reg);
+        }
+        return reg;
+    }
+
+    protected void removeShardInfoListener(final ShardInfoListenerRegistration<?> registration) {
+        synchronized (shardInfoListeners) {
+            shardInfoListeners.remove(registration);
+        }
+    }
+
+    @Override
+    public void onRemoval(final RemovalNotification<String, Future<PrimaryShardInfo>> notification) {
+        synchronized (shardInfoListeners) {
+            for (ShardInfoListenerRegistration<?> reg : shardInfoListeners) {
+                reg.getInstance().onShardInfoUpdated(notification.getKey(), null);
+            }
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListener.java
new file mode 100644 (file)
index 0000000..83c5d37
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+
+/**
+ * Listener interface used to register for primary shard information changes.
+ * Implementations of this interface can be registered with {@link ActorContext}
+ * to receive notifications about shard information changes.
+ */
+public interface ShardInfoListener {
+    /**
+     * Update {@link PrimaryShardInfo} for a particular shard.
+     * @param shardName Shard name
+     * @param primaryShardInfo New {@link PrimaryShardInfo}, null if the information
+     *                         became unavailable.
+     */
+    void onShardInfoUpdated(@Nonnull String shardName, @Nullable PrimaryShardInfo primaryShardInfo);
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ShardInfoListenerRegistration.java
new file mode 100644 (file)
index 0000000..3dca66d
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+
+/**
+ * Registration of a {@link ShardInfoListener} instance.
+ *
+ * @param <T> Type of listener
+ */
+public class ShardInfoListenerRegistration<T extends ShardInfoListener> extends AbstractObjectRegistration<T> {
+    private final ActorContext parent;
+
+    protected ShardInfoListenerRegistration(final T instance, final ActorContext parent) {
+        super(instance);
+        this.parent = Preconditions.checkNotNull(parent);
+    }
+
+    @Override
+    protected void removeRegistration() {
+        parent.removeShardInfoListener(this);
+    }
+}
index c22b7ac..0b2d3ce 100644 (file)
@@ -66,6 +66,8 @@ import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListener;
+import org.opendaylight.controller.cluster.datastore.utils.ShardInfoListenerRegistration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
@@ -111,6 +113,8 @@ public abstract class AbstractTransactionProxyTest {
     @Mock
     protected ActorContext mockActorContext;
 
+    protected TransactionContextFactory mockComponentFactory;
+
     private SchemaContext schemaContext;
 
     @Mock
@@ -150,6 +154,10 @@ public abstract class AbstractTransactionProxyTest {
         doReturn(mockClusterWrapper).when(mockActorContext).getClusterWrapper();
         doReturn(dataStoreContextBuilder.build()).when(mockActorContext).getDatastoreContext();
         doReturn(10).when(mockActorContext).getTransactionOutstandingOperationLimit();
+        doReturn(mock(ShardInfoListenerRegistration.class)).when(mockActorContext).registerShardInfoListener(
+                any(ShardInfoListener.class));
+
+        mockComponentFactory = TransactionContextFactory.create(mockActorContext);
 
         Timer timer = new MetricRegistry().timer("test");
         doReturn(timer).when(mockActorContext).getOperationTimer(any(String.class));
@@ -260,6 +268,7 @@ public abstract class AbstractTransactionProxyTest {
         return Futures.successful(new BatchedModificationsReply(count));
     }
 
+    @SuppressWarnings("unchecked")
     protected Future<Object> incompleteFuture() {
         return mock(Future.class);
     }
index 94cc6e5..f3d93b8 100644 (file)
@@ -2,30 +2,32 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.cluster.Cluster;
+import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
-import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -37,19 +39,38 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
+public class DistributedDataStoreIntegrationTest {
+
+    private static ActorSystem system;
 
     private final DatastoreContext.Builder datastoreContextBuilder =
             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100);
 
+    @BeforeClass
+    public static void setUpClass() throws IOException {
+        system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+        Cluster.get(system).join(member1Address);
+    }
+
+    @AfterClass
+    public static void tearDownClass() throws IOException {
+        JavaTestKit.shutdownActorSystem(system);
+        system = null;
+    }
+
+    protected ActorSystem getSystem() {
+        return system;
+    }
+
     @Test
     public void testWriteTransactionWithSingleShard() throws Exception{
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore =
                     setupDistributedDataStore("transactionIntegrationTest", "test-1");
 
@@ -65,47 +86,59 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test
     public void testWriteTransactionWithMultipleShards() throws Exception{
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore =
                     setupDistributedDataStore("testWriteTransactionWithMultipleShards", "cars-1", "people-1");
 
             DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
             assertNotNull("newWriteOnlyTransaction returned null", writeTx);
 
-            YangInstanceIdentifier nodePath1 = CarsModel.BASE_PATH;
-            NormalizedNode<?, ?> nodeToWrite1 = CarsModel.emptyContainer();
-            writeTx.write(nodePath1, nodeToWrite1);
+            writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+            writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
 
-            YangInstanceIdentifier nodePath2 = PeopleModel.BASE_PATH;
-            NormalizedNode<?, ?> nodeToWrite2 = PeopleModel.emptyContainer();
-            writeTx.write(nodePath2, nodeToWrite2);
+            doCommit(writeTx.ready());
 
-            DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+            writeTx = dataStore.newWriteOnlyTransaction();
 
-            doCommit(cohort);
+            writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+            writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
+
+            doCommit(writeTx.ready());
+
+            writeTx = dataStore.newWriteOnlyTransaction();
+
+            MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+            YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+            writeTx.write(carPath, car);
+
+            MapEntryNode person = PeopleModel.newPersonEntry("jack");
+            YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
+            writeTx.write(personPath, person);
+
+            doCommit(writeTx.ready());
 
             // Verify the data in the store
 
             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
-            Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath1).get();
+            Optional<NormalizedNode<?, ?>> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
             assertEquals("isPresent", true, optional.isPresent());
-            assertEquals("Data node", nodeToWrite1, optional.get());
+            assertEquals("Data node", car, optional.get());
 
-            optional = readTx.read(nodePath2).get();
+            optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
             assertEquals("isPresent", true, optional.isPresent());
-            assertEquals("Data node", nodeToWrite2, optional.get());
+            assertEquals("Data node", person, optional.get());
 
             cleanup(dataStore);
         }};
     }
 
     @Test
-    public void testReadWriteTransaction() throws Exception{
+    public void testReadWriteTransactionWithSingleShard() throws Exception{
         System.setProperty("shard.persistent", "true");
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore =
-                    setupDistributedDataStore("testReadWriteTransaction", "test-1");
+                    setupDistributedDataStore("testReadWriteTransactionWithSingleShard", "test-1");
 
             // 1. Create a read-write Tx
 
@@ -147,9 +180,65 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testReadWriteTransactionWithMultipleShards() throws Exception{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+            DistributedDataStore dataStore =
+                    setupDistributedDataStore("testReadWriteTransactionWithMultipleShards", "cars-1", "people-1");
+
+            DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
+            assertNotNull("newReadWriteTransaction returned null", readWriteTx);
+
+            readWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+            readWriteTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+            doCommit(readWriteTx.ready());
+
+            readWriteTx = dataStore.newReadWriteTransaction();
+
+            readWriteTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+            readWriteTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
+
+            doCommit(readWriteTx.ready());
+
+            readWriteTx = dataStore.newReadWriteTransaction();
+
+            MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+            YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+            readWriteTx.write(carPath, car);
+
+            MapEntryNode person = PeopleModel.newPersonEntry("jack");
+            YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
+            readWriteTx.write(personPath, person);
+
+            Boolean exists = readWriteTx.exists(carPath).checkedGet(5, TimeUnit.SECONDS);
+            assertEquals("exists", true, exists);
+
+            Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", car, optional.get());
+
+            doCommit(readWriteTx.ready());
+
+            // Verify the data in the store
+
+            DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+
+            optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", car, optional.get());
+
+            optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", person, optional.get());
+
+            cleanup(dataStore);
+        }};
+    }
+
     private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
             final boolean writeOnly) throws Exception {
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             String shardName = "test-1";
 
             // Setup the InMemoryJournal to block shard recovery to ensure the shard isn't
@@ -251,7 +340,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test
     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             String testName = "testTransactionReadsWithShardNotInitiallyReady";
             String shardName = "test-1";
 
@@ -324,7 +413,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test(expected=NotInitializedException.class)
     public void testTransactionCommitFailureWithShardNotInitialized() throws Throwable{
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             String testName = "testTransactionCommitFailureWithShardNotInitialized";
             String shardName = "test-1";
 
@@ -394,7 +483,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test(expected=NotInitializedException.class)
     public void testTransactionReadFailureWithShardNotInitialized() throws Throwable{
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             String testName = "testTransactionReadFailureWithShardNotInitialized";
             String shardName = "test-1";
 
@@ -466,7 +555,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
     }
 
     private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly) throws Throwable {
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             String testName = "testTransactionCommitFailureWithNoShardLeader";
             String shardName = "default";
 
@@ -548,7 +637,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
     @Test
     public void testTransactionAbort() throws Exception{
         System.setProperty("shard.persistent", "true");
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore =
                     setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
 
@@ -571,9 +660,9 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
     }
 
     @Test
-    public void testTransactionChain() throws Exception{
-        new IntegrationTestKit(getSystem()) {{
-            DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChain", "test-1");
+    public void testTransactionChainWithSingleShard() throws Exception{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+            DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard", "test-1");
 
             // 1. Create a Tx chain and write-only Tx
 
@@ -658,9 +747,74 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testTransactionChainWithMultipleShards() throws Exception{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+            DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithMultipleShards",
+                    "cars-1", "people-1");
+
+            DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+            DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+            assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+            writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+            writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+            writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+            writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
+
+            DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
+
+            DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
+
+            MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+            YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+            readWriteTx.write(carPath, car);
+
+            MapEntryNode person = PeopleModel.newPersonEntry("jack");
+            YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
+            readWriteTx.merge(personPath, person);
+
+            Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", car, optional.get());
+
+            optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", person, optional.get());
+
+            DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
+
+            writeTx = txChain.newWriteOnlyTransaction();
+
+            //writeTx.delete(personPath);
+
+            DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
+
+            doCommit(cohort1);
+            doCommit(cohort2);
+            doCommit(cohort3);
+
+            txChain.close();
+
+            DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+
+            optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
+            assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", car, optional.get());
+
+            optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
+            //assertEquals("isPresent", false, optional.isPresent());
+            assertEquals("isPresent", true, optional.isPresent());
+
+            cleanup(dataStore);
+        }};
+    }
+
     @Test
     public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore = setupDistributedDataStore(
                     "testCreateChainedTransactionsInQuickSuccession", "test-1");
 
@@ -691,7 +845,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test
     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception{
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore = setupDistributedDataStore(
                     "testCreateChainedTransactionAfterEmptyTxReadied", "test-1");
 
@@ -714,7 +868,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test
     public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore = setupDistributedDataStore(
                     "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
 
@@ -734,7 +888,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test
     public void testCreateChainedTransactionAfterClose() throws Throwable {
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore = setupDistributedDataStore(
                     "testCreateChainedTransactionAfterClose", "test-1");
 
@@ -750,7 +904,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
 
     @Test
     public void testChangeListenerRegistration() throws Exception{
-        new IntegrationTestKit(getSystem()) {{
+        new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
             DistributedDataStore dataStore =
                     setupDistributedDataStore("testChangeListenerRegistration", "test-1");
 
@@ -796,129 +950,4 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             cleanup(dataStore);
         }};
     }
-
-    class IntegrationTestKit extends ShardTestKit {
-
-        IntegrationTestKit(ActorSystem actorSystem) {
-            super(actorSystem);
-        }
-
-        DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
-            return setupDistributedDataStore(typeName, true, shardNames);
-        }
-
-        DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
-                String... shardNames) {
-            MockClusterWrapper cluster = new MockClusterWrapper();
-            Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
-            ShardStrategyFactory.setConfiguration(config);
-
-            datastoreContextBuilder.dataStoreType(typeName);
-
-            DatastoreContext datastoreContext = datastoreContextBuilder.build();
-
-            DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster,
-                    config, datastoreContext);
-
-            SchemaContext schemaContext = SchemaContextHelper.full();
-            dataStore.onGlobalContextUpdated(schemaContext);
-
-            if(waitUntilLeader) {
-                for(String shardName: shardNames) {
-                    ActorRef shard = null;
-                    for(int i = 0; i < 20 * 5 && shard == null; i++) {
-                        Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
-                        Optional<ActorRef> shardReply = dataStore.getActorContext().findLocalShard(shardName);
-                        if(shardReply.isPresent()) {
-                            shard = shardReply.get();
-                        }
-                    }
-
-                    assertNotNull("Shard was not created", shard);
-
-                    waitUntilLeader(shard);
-                }
-            }
-
-            return dataStore;
-        }
-
-        void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
-                NormalizedNode<?, ?> nodeToWrite) throws Exception {
-
-            // 1. Create a write-only Tx
-
-            DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
-            assertNotNull("newWriteOnlyTransaction returned null", writeTx);
-
-            // 2. Write some data
-
-            writeTx.write(nodePath, nodeToWrite);
-
-            // 3. Ready the Tx for commit
-
-            DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
-
-            // 4. Commit the Tx
-
-            doCommit(cohort);
-
-            // 5. Verify the data in the store
-
-            DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
-
-            Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
-            assertEquals("isPresent", true, optional.isPresent());
-            assertEquals("Data node", nodeToWrite, optional.get());
-        }
-
-        void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
-            Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
-            assertEquals("canCommit", true, canCommit);
-            cohort.preCommit().get(5, TimeUnit.SECONDS);
-            cohort.commit().get(5, TimeUnit.SECONDS);
-        }
-
-        void cleanup(DistributedDataStore dataStore) {
-            dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
-        }
-
-        void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
-                throws Exception {
-            try {
-                callable.call();
-                fail("Expected " + expType.getSimpleName());
-            } catch(Exception e) {
-                assertEquals("Exception type", expType, e.getClass());
-            }
-        }
-
-        void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
-                Class<? extends Exception> expType) throws Exception {
-            assertExceptionOnCall(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    txChain.newWriteOnlyTransaction();
-                    return null;
-                }
-            }, expType);
-
-            assertExceptionOnCall(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    txChain.newReadWriteTransaction();
-                    return null;
-                }
-            }, expType);
-
-            assertExceptionOnCall(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    txChain.newReadOnlyTransaction();
-                    return null;
-                }
-            }, expType);
-        }
-    }
-
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
new file mode 100644 (file)
index 0000000..7cef2fd
--- /dev/null
@@ -0,0 +1,327 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.cluster.Cluster;
+import akka.testkit.JavaTestKit;
+import com.google.common.base.Optional;
+import com.typesafe.config.ConfigFactory;
+import java.math.BigInteger;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
+import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+
+/**
+ * End-to-end distributed data store tests that exercise remote shards and transactions.
+ *
+ * @author Thomas Pantelis
+ */
+public class DistributedDataStoreRemotingIntegrationTest {
+
+    private static final String[] SHARD_NAMES = {"cars", "people"};
+
+    private ActorSystem leaderSystem;
+    private ActorSystem followerSystem;
+
+    private final DatastoreContext.Builder leaderDatastoreContextBuilder =
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
+
+    private final DatastoreContext.Builder followerDatastoreContextBuilder =
+            DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(200);
+
+    private DistributedDataStore followerDistributedDataStore;
+    private DistributedDataStore leaderDistributedDataStore;
+    private IntegrationTestKit followerTestKit;
+    private IntegrationTestKit leaderTestKit;
+
+    @Before
+    public void setUpClass() {
+        leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
+        Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+        Cluster.get(leaderSystem).join(member1Address);
+
+        followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
+        Cluster.get(followerSystem).join(member1Address);
+    }
+
+    @After
+    public void tearDownClass() {
+        JavaTestKit.shutdownActorSystem(leaderSystem);
+        JavaTestKit.shutdownActorSystem(followerSystem);
+    }
+
+    private void initDatastores(String type) {
+        leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
+
+        String moduleShardsConfig = "module-shards-member1-and-2.conf";
+
+        followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
+        followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
+
+        leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, true, SHARD_NAMES);
+
+        leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
+    }
+
+    private void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
+        Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", true, optional.isPresent());
+
+        CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME);
+        for(NormalizedNode<?, ?> entry: entries) {
+            listBuilder.withChild((MapEntryNode) entry);
+        }
+
+        assertEquals("Car list node", listBuilder.build(), optional.get());
+    }
+
+    private void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path, NormalizedNode<?, ?> expNode)
+            throws Exception {
+        Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", true, optional.isPresent());
+        assertEquals("Data node", expNode, optional.get());
+    }
+
+    private void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
+        Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
+        assertEquals("exists", true, exists);
+    }
+
+    @Test
+    public void testWriteTransactionWithSingleShard() throws Exception {
+        String testName = "testWriteTransactionWithSingleShard";
+        initDatastores(testName);
+
+        String followerCarShardName = "member-2-shard-cars-" + testName;
+        InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
+
+        DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+        assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+
+        MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
+        writeTx.merge(car1Path, car1);
+
+        MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
+        YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
+        writeTx.merge(car2Path, car2);
+
+        followerTestKit.doCommit(writeTx.ready());
+
+        verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
+
+        // Test delete
+
+        writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+
+        writeTx.delete(car1Path);
+
+        followerTestKit.doCommit(writeTx.ready());
+
+        verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
+
+        verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
+
+        // Re-instate the follower member 2 as a single-node to verify replication and recovery.
+
+        InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
+
+        JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+        JavaTestKit.shutdownActorSystem(followerSystem, null, true);
+
+        ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
+
+        DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder).
+                setupDistributedDataStore(testName, "module-shards-member2", true, SHARD_NAMES);
+
+        verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
+
+        JavaTestKit.shutdownActorSystem(newSystem);
+    }
+
+    @Test
+    public void testReadWriteTransactionWithSingleShard() throws Exception {
+        initDatastores("testReadWriteTransactionWithSingleShard");
+
+        DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+        assertNotNull("newReadWriteTransaction returned null", rwTx);
+
+        rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+
+        MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        rwTx.merge(CarsModel.newCarPath("optima"), car1);
+
+        verifyCars(rwTx, car1);
+
+        MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
+        YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
+        rwTx.merge(car2Path, car2);
+
+        verifyExists(rwTx, car2Path);
+
+        followerTestKit.doCommit(rwTx.ready());
+
+        verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
+    }
+
+    @Test
+    public void testWriteTransactionWithMultipleShards() throws Exception {
+        initDatastores("testWriteTransactionWithMultipleShards");
+
+        DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
+        assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+        YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
+        NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
+        writeTx.write(carsPath, carsNode);
+
+        YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
+        NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
+        writeTx.write(peoplePath, peopleNode);
+
+        followerTestKit.doCommit(writeTx.ready());
+
+        DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+
+        verifyNode(readTx, carsPath, carsNode);
+        verifyNode(readTx, peoplePath, peopleNode);
+    }
+
+    @Test
+    public void testReadWriteTransactionWithMultipleShards() throws Exception {
+        initDatastores("testReadWriteTransactionWithMultipleShards");
+
+        DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+        assertNotNull("newReadWriteTransaction returned null", rwTx);
+
+        YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
+        NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
+        rwTx.write(carsPath, carsNode);
+
+        YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
+        NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
+        rwTx.write(peoplePath, peopleNode);
+
+        followerTestKit.doCommit(rwTx.ready());
+
+        DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+
+        verifyNode(readTx, carsPath, carsNode);
+        verifyNode(readTx, peoplePath, peopleNode);
+    }
+
+    @Test
+    public void testTransactionChain() throws Exception {
+        initDatastores("testTransactionChain");
+
+        DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        // Add the top-level cars container with write-only.
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+        writeTx.ready();
+
+        // Verify the top-level cars container with read-only.
+
+        verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+        // Perform car operations with read-write.
+
+        DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+
+        verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
+
+        rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+
+        MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
+        rwTx.write(car1Path, car1);
+
+        verifyExists(rwTx, car1Path);
+
+        verifyCars(rwTx, car1);
+
+        MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
+        rwTx.merge(CarsModel.newCarPath("sportage"), car2);
+
+        rwTx.delete(car1Path);
+
+        followerTestKit.doCommit(rwTx.ready());
+
+        txChain.close();
+
+        verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
+    }
+
+    @Test
+    public void testReadyLocalTransactionForwardedToLeader() throws Exception {
+        initDatastores("testReadyLocalTransactionForwardedToLeader");
+
+        Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
+        assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
+
+        TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
+        dataTree.setSchemaContext(SchemaContextHelper.full());
+        DataTreeModification modification = dataTree.takeSnapshot().newModification();
+
+        new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
+        new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
+
+        MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        new WriteModification(CarsModel.newCarPath("optima"), car).apply(modification);
+
+        String transactionID = "tx-1";
+        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(transactionID , modification, true);
+
+        carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+        followerTestKit.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+
+        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
new file mode 100644 (file)
index 0000000..109e77c
--- /dev/null
@@ -0,0 +1,167 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+class IntegrationTestKit extends ShardTestKit {
+
+    DatastoreContext.Builder datastoreContextBuilder;
+
+    IntegrationTestKit(ActorSystem actorSystem, Builder datastoreContextBuilder) {
+        super(actorSystem);
+        this.datastoreContextBuilder = datastoreContextBuilder;
+    }
+
+    DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
+        return setupDistributedDataStore(typeName, true, shardNames);
+    }
+
+    DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
+            String... shardNames) {
+        return setupDistributedDataStore(typeName, "module-shards.conf", waitUntilLeader, shardNames);
+    }
+
+    DistributedDataStore setupDistributedDataStore(String typeName, String moduleShardsConfig, boolean waitUntilLeader,
+            String... shardNames) {
+        ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
+        Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
+        ShardStrategyFactory.setConfiguration(config);
+
+        datastoreContextBuilder.dataStoreType(typeName);
+
+        DatastoreContext datastoreContext = datastoreContextBuilder.build();
+
+        DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, datastoreContext);
+
+        SchemaContext schemaContext = SchemaContextHelper.full();
+        dataStore.onGlobalContextUpdated(schemaContext);
+
+        if(waitUntilLeader) {
+            waitUntilLeader(dataStore.getActorContext(), shardNames);
+        }
+
+        return dataStore;
+    }
+
+    void waitUntilLeader(ActorContext actorContext, String... shardNames) {
+        for(String shardName: shardNames) {
+            ActorRef shard = null;
+            for(int i = 0; i < 20 * 5 && shard == null; i++) {
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+                Optional<ActorRef> shardReply = actorContext.findLocalShard(shardName);
+                if(shardReply.isPresent()) {
+                    shard = shardReply.get();
+                }
+            }
+
+            assertNotNull("Shard was not created", shard);
+
+            waitUntilLeader(shard);
+        }
+    }
+
+    void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
+            NormalizedNode<?, ?> nodeToWrite) throws Exception {
+
+        // 1. Create a write-only Tx
+
+        DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
+        assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+        // 2. Write some data
+
+        writeTx.write(nodePath, nodeToWrite);
+
+        // 3. Ready the Tx for commit
+
+        DOMStoreThreePhaseCommitCohort cohort = writeTx.ready();
+
+        // 4. Commit the Tx
+
+        doCommit(cohort);
+
+        // 5. Verify the data in the store
+
+        DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+
+        Optional<NormalizedNode<?, ?>> optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", true, optional.isPresent());
+        assertEquals("Data node", nodeToWrite, optional.get());
+    }
+
+    void doCommit(final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
+        Boolean canCommit = cohort.canCommit().get(7, TimeUnit.SECONDS);
+        assertEquals("canCommit", true, canCommit);
+        cohort.preCommit().get(5, TimeUnit.SECONDS);
+        cohort.commit().get(5, TimeUnit.SECONDS);
+    }
+
+    void cleanup(DistributedDataStore dataStore) {
+        if(dataStore != null) {
+            dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
+        }
+    }
+
+    void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
+            throws Exception {
+        try {
+            callable.call();
+            fail("Expected " + expType.getSimpleName());
+        } catch(Exception e) {
+            assertEquals("Exception type", expType, e.getClass());
+        }
+    }
+
+    void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
+            Class<? extends Exception> expType) throws Exception {
+        assertExceptionOnCall(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                txChain.newWriteOnlyTransaction();
+                return null;
+            }
+        }, expType);
+
+        assertExceptionOnCall(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                txChain.newReadWriteTransaction();
+                return null;
+            }
+        }, expType);
+
+        assertExceptionOnCall(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                txChain.newReadOnlyTransaction();
+                return null;
+            }
+        }, expType);
+    }
+}
\ No newline at end of file
index 76f299b..11ca195 100644 (file)
@@ -16,7 +16,6 @@ import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
@@ -32,7 +31,6 @@ import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.DefaultShardStrategy;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
@@ -48,7 +46,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
     @Test
     public void testNewReadOnlyTransaction() throws Exception {
 
-     DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadOnlyTransaction();
+     DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadOnlyTransaction();
          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
 
     }
@@ -56,7 +54,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
     @SuppressWarnings("resource")
     @Test
     public void testNewReadWriteTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newReadWriteTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newReadWriteTransaction();
         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
 
     }
@@ -64,29 +62,29 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
     @SuppressWarnings("resource")
     @Test
     public void testNewWriteOnlyTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(mockActorContext).newWriteOnlyTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(mockComponentFactory).newWriteOnlyTransaction();
         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
 
     }
 
     @Test
     public void testClose() throws Exception {
-        new TransactionChainProxy(mockActorContext).close();
+        new TransactionChainProxy(mockComponentFactory).close();
 
         verify(mockActorContext, times(1)).broadcast(anyObject());
     }
 
     @Test
     public void testTransactionChainsHaveUniqueId(){
-        TransactionChainProxy one = new TransactionChainProxy(mock(ActorContext.class));
-        TransactionChainProxy two = new TransactionChainProxy(mock(ActorContext.class));
+        TransactionChainProxy one = new TransactionChainProxy(mockComponentFactory);
+        TransactionChainProxy two = new TransactionChainProxy(mockComponentFactory);
 
         Assert.assertNotEquals(one.getTransactionChainId(), two.getTransactionChainId());
     }
 
     @Test
     public void testRateLimitingUsedInReadWriteTxCreation(){
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         txChainProxy.newReadWriteTransaction();
 
@@ -95,7 +93,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testRateLimitingUsedInWriteOnlyTxCreation(){
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         txChainProxy.newWriteOnlyTransaction();
 
@@ -105,7 +103,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
     @Test
     public void testRateLimitingNotUsedInReadOnlyTxCreation(){
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         txChainProxy.newReadOnlyTransaction();
 
@@ -120,7 +118,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
     public void testChainedWriteOnlyTransactions() throws Exception {
         dataStoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
 
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         ActorRef txActorRef1 = setupActorContextWithoutInitialCreateTransaction(getSystem());
 
@@ -186,7 +184,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
      */
     @Test
     public void testChainedReadWriteTransactions() throws Exception {
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         ActorRef txActorRef1 = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE);
 
@@ -257,7 +255,7 @@ public class TransactionChainProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModifications(actorRef, 1);
 
-        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockActorContext);
+        TransactionChainProxy txChainProxy = new TransactionChainProxy(mockComponentFactory);
 
         DOMStoreWriteTransaction writeTx1 = txChainProxy.newWriteOnlyTransaction();
 
index d37a9db..6cf6315 100644 (file)
@@ -78,7 +78,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     public void testRead() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
@@ -107,7 +107,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
     }
@@ -119,7 +119,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
     }
@@ -138,7 +138,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(Futures.failed(exToThrow)).when(mockActorContext).executeOperationAsync(
                 any(ActorSelection.class), any());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
     }
@@ -179,7 +179,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(readSerializedDataReply(expectedNode)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
 
@@ -199,7 +199,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test(expected=IllegalStateException.class)
     public void testReadPreConditionCheck() {
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
         transactionProxy.read(TestModel.TEST_PATH);
     }
 
@@ -216,7 +216,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
             eq(getSystem().actorSelection(actorRef.path())), eqCreateTransaction(memberName, READ_ONLY));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
     }
@@ -225,7 +225,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     public void testExists() throws Exception {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         doReturn(dataExistsSerializedReply(false)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
@@ -259,8 +259,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(Futures.successful(new Object())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
     }
@@ -272,7 +271,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(Futures.failed(new TestException())).when(mockActorContext).
                 executeOperationAsync(eq(actorSelection(actorRef)), eqSerializedDataExists());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
     }
@@ -288,7 +287,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -306,7 +305,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test(expected=IllegalStateException.class)
     public void testExistsPreConditionCheck() {
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
         transactionProxy.exists(TestModel.TEST_PATH);
     }
 
@@ -319,7 +318,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModifications(actorRef, 1);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -342,7 +341,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         final NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
-        final TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        final TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         final CountDownLatch readComplete = new CountDownLatch(1);
         final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
@@ -382,13 +381,13 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
     @Test(expected=IllegalStateException.class)
     public void testWritePreConditionCheck() {
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
     }
 
     @Test(expected=IllegalStateException.class)
     public void testWriteAfterReadyPreConditionCheck() {
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.ready();
 
@@ -404,7 +403,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModifications(actorRef, 1);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
@@ -418,7 +417,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModifications(actorRef, 1);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -436,7 +435,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModifications(actorRef, 1);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -464,7 +463,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModificationsReady(actorRef, true);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -494,7 +493,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModificationsReady(actorRef, true);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -519,7 +518,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         expectBatchedModificationsReady(actorRef1);
         expectBatchedModificationsReady(actorRef2);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -542,7 +541,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModificationsReady(actorRef, true);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -571,7 +570,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModificationsReady(actorRef, true);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -603,7 +602,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectFailedBatchedModifications(actorRef);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
@@ -617,7 +616,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     private void testWriteOnlyTxWithFindPrimaryShardFailure(Exception toThrow) throws Exception {
         doReturn(Futures.failed(toThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -661,7 +660,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModificationsReady(actorRef2);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.write(TestModel.JUNK_PATH, ImmutableNodes.containerNode(TestModel.JUNK_QNAME));
         transactionProxy.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -677,8 +676,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
     @Test
     public void testGetIdentifier() {
         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                TransactionType.READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         Object id = transactionProxy.getIdentifier();
         assertNotNull("getIdentifier returned null", id);
@@ -692,7 +690,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedReadData());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -720,7 +718,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         setupActorContextWithInitialCreateTransaction(getSystem(), READ_ONLY);
         doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         // negative test case with null as the reply
         doReturn(readDataReply(null)).when(mockActorContext).executeOperationAsync(
@@ -759,7 +757,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         expectBatchedModificationsReady(actorRef, true);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
@@ -811,7 +809,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         doReturn(true).when(mockActorContext).isPathLocal(actorPath);
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         long start = System.nanoTime();
 
@@ -860,7 +858,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         doReturn(true).when(mockActorContext).isPathLocal(anyString());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         long start = System.nanoTime();
 
@@ -1198,7 +1196,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         YangInstanceIdentifier deletePath1 = TestModel.TEST_PATH;
         YangInstanceIdentifier deletePath2 = TestModel.OUTER_LIST_PATH;
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, type);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, type);
 
         transactionProxy.write(writePath1, writeNode1);
         transactionProxy.write(writePath2, writeNode2);
@@ -1276,7 +1274,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
         doReturn(dataExistsSerializedReply(true)).when(mockActorContext).executeOperationAsync(
                 eq(actorSelection(actorRef)), eqSerializedDataExists());
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         transactionProxy.write(writePath1, writeNode1);
         transactionProxy.write(writePath2, writeNode2);
@@ -1352,7 +1350,7 @@ public class TransactionProxyTest extends AbstractTransactionProxyTest {
 
         doReturn(getSystem().dispatchers().defaultGlobalDispatcher()).when(mockActorContext).getClientDispatcher();
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_ONLY);
 
         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(
                 YangInstanceIdentifier.builder().build()).get(5, TimeUnit.SECONDS);
index 5681b76..a6656b2 100644 (file)
@@ -156,7 +156,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, READ_WRITE);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, READ_WRITE);
 
         Optional<NormalizedNode<?, ?>> readOptional = transactionProxy.read(TestModel.TEST_PATH).
                 get(5, TimeUnit.SECONDS);
@@ -228,7 +228,7 @@ public class PreLithiumTransactionProxyTest extends AbstractTransactionProxyTest
         doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
                 eq(actorRef.path().toString()));
 
-        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, WRITE_ONLY);
+        TransactionProxy transactionProxy = new TransactionProxy(mockComponentFactory, WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, testNode);
 
index 93b552a..468e2da 100644 (file)
@@ -23,13 +23,13 @@ public class CarsModel {
     public static final QName BASE_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13",
         "cars");
 
-    public static final YangInstanceIdentifier BASE_PATH = YangInstanceIdentifier.of(BASE_QNAME);
-
     public static final QName CARS_QNAME = QName.create(BASE_QNAME, "cars");
     public static final QName CAR_QNAME = QName.create(CARS_QNAME, "car");
     public static final QName CAR_NAME_QNAME = QName.create(CAR_QNAME, "name");
     public static final QName CAR_PRICE_QNAME = QName.create(CAR_QNAME, "price");
 
+    public static final YangInstanceIdentifier BASE_PATH = YangInstanceIdentifier.of(BASE_QNAME);
+    public static final YangInstanceIdentifier CAR_LIST_PATH = BASE_PATH.node(CAR_QNAME);
 
     public static NormalizedNode<?, ?> create(){
 
@@ -69,4 +69,17 @@ public class CarsModel {
             .build();
     }
 
+    public static NormalizedNode<?, ?> newCarMapNode() {
+        return ImmutableNodes.mapNodeBuilder(CAR_QNAME).build();
+    }
+
+    public static MapEntryNode newCarEntry(String name, BigInteger price) {
+        return ImmutableNodes.mapEntryBuilder(CAR_QNAME, CAR_NAME_QNAME, name)
+                .withChild(ImmutableNodes.leafNode(CAR_NAME_QNAME, name))
+                .withChild(ImmutableNodes.leafNode(CAR_PRICE_QNAME, price)).build();
+    }
+
+    public static YangInstanceIdentifier newCarPath(String name) {
+        return YangInstanceIdentifier.builder(CAR_LIST_PATH).nodeWithKey(CAR_QNAME, CAR_NAME_QNAME, name).build();
+    }
 }
index a7cb14f..fbe3df9 100644 (file)
@@ -22,13 +22,13 @@ public class PeopleModel {
     public static final QName BASE_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:people", "2014-03-13",
         "people");
 
-    public static final YangInstanceIdentifier BASE_PATH = YangInstanceIdentifier.of(BASE_QNAME);
     public static final QName PEOPLE_QNAME = QName.create(BASE_QNAME, "people");
     public static final QName PERSON_QNAME = QName.create(PEOPLE_QNAME, "person");
     public static final QName PERSON_NAME_QNAME = QName.create(PERSON_QNAME, "name");
     public static final QName PERSON_AGE_QNAME = QName.create(PERSON_QNAME, "age");
 
-
+    public static final YangInstanceIdentifier BASE_PATH = YangInstanceIdentifier.of(BASE_QNAME);
+    public static final YangInstanceIdentifier PERSON_LIST_PATH = BASE_PATH.node(PERSON_QNAME);
 
     public static NormalizedNode<?, ?> create(){
 
@@ -69,4 +69,16 @@ public class PeopleModel {
             .build();
     }
 
+    public static NormalizedNode<?, ?> newPersonMapNode() {
+        return ImmutableNodes.mapNodeBuilder(PERSON_QNAME).build();
+    }
+
+    public static MapEntryNode newPersonEntry(String name) {
+        return ImmutableNodes.mapEntryBuilder(PERSON_QNAME, PERSON_NAME_QNAME, name)
+                .withChild(ImmutableNodes.leafNode(PERSON_NAME_QNAME, name)).build();
+    }
+
+    public static YangInstanceIdentifier newPersonPath(String name) {
+        return YangInstanceIdentifier.builder(PERSON_LIST_PATH).nodeWithKey(PERSON_QNAME, PERSON_NAME_QNAME, name).build();
+    }
 }
index 0363462..8f5550f 100644 (file)
@@ -63,10 +63,12 @@ Member1 {
       serializers {
           java = "akka.serialization.JavaSerializer"
           proto = "akka.remote.serialization.ProtobufSerializer"
+          readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
       }
 
       serialization-bindings {
           "com.google.protobuf.Message" = proto
+          "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
       }
     }
     remote {
@@ -113,10 +115,12 @@ Member2 {
       serializers {
           java = "akka.serialization.JavaSerializer"
           proto = "akka.remote.serialization.ProtobufSerializer"
+          readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
       }
 
       serialization-bindings {
           "com.google.protobuf.Message" = proto
+          "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
       }
     }
     remote {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member1-and-2.conf
new file mode 100644 (file)
index 0000000..61c3538
--- /dev/null
@@ -0,0 +1,26 @@
+module-shards = [
+    {
+        name = "people"
+        shards = [
+            {
+                name="people"
+                replicas = [
+                    "member-1",
+                    "member-2"
+                ]
+            }
+        ]
+    },
+    {
+        name = "cars"
+        shards = [
+            {
+                name="cars"
+                replicas = [
+                    "member-1",
+                    "member-2"
+                ]
+            }
+        ]
+    }
+]
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member2.conf b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/module-shards-member2.conf
new file mode 100644 (file)
index 0000000..fd996bd
--- /dev/null
@@ -0,0 +1,24 @@
+module-shards = [
+    {
+        name = "people"
+        shards = [
+            {
+                name="people"
+                replicas = [
+                    "member-2"
+                ]
+            }
+        ]
+    },
+    {
+        name = "cars"
+        shards = [
+            {
+                name="cars"
+                replicas = [
+                    "member-2"
+                ]
+            }
+        ]
+    }
+]
\ No newline at end of file
index b7776b2..7683937 100644 (file)
@@ -120,19 +120,26 @@ public abstract class AbstractSnapshotBackedTransactionChain<T> extends Transact
 
     @Override
     public final DOMStoreReadTransaction newReadOnlyTransaction() {
+        return newReadOnlyTransaction(nextTransactionIdentifier());
+    }
+
+    protected DOMStoreReadTransaction newReadOnlyTransaction(T transactionId) {
         final Entry<State, DataTreeSnapshot> entry = getSnapshot();
-        return SnapshotBackedTransactions.newReadTransaction(nextTransactionIdentifier(), getDebugTransactions(), entry.getValue());
+        return SnapshotBackedTransactions.newReadTransaction(transactionId, getDebugTransactions(), entry.getValue());
     }
 
     @Override
     public final DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        return newReadWriteTransaction(nextTransactionIdentifier());
+    }
+
+    protected DOMStoreReadWriteTransaction newReadWriteTransaction(T transactionId) {
         Entry<State, DataTreeSnapshot> entry;
         DOMStoreReadWriteTransaction ret;
 
         do {
             entry = getSnapshot();
-            ret = new SnapshotBackedReadWriteTransaction<T>(nextTransactionIdentifier(),
-                getDebugTransactions(), entry.getValue(), this);
+            ret = new SnapshotBackedReadWriteTransaction<T>(transactionId, getDebugTransactions(), entry.getValue(), this);
         } while (!recordTransaction(entry.getKey(), ret));
 
         return ret;
@@ -140,13 +147,16 @@ public abstract class AbstractSnapshotBackedTransactionChain<T> extends Transact
 
     @Override
     public final DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        return newWriteOnlyTransaction(nextTransactionIdentifier());
+    }
+
+    protected DOMStoreWriteTransaction newWriteOnlyTransaction(T transactionId) {
         Entry<State, DataTreeSnapshot> entry;
         DOMStoreWriteTransaction ret;
 
         do {
             entry = getSnapshot();
-            ret = new SnapshotBackedWriteTransaction<T>(nextTransactionIdentifier(),
-                getDebugTransactions(), entry.getValue(), this);
+            ret = new SnapshotBackedWriteTransaction<T>(transactionId, getDebugTransactions(), entry.getValue(), this);
         } while (!recordTransaction(entry.getKey(), ret));
 
         return ret;

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.