Merge "Fix for Bug 2290."
authorDevin Avery <devin.avery@brocade.com>
Tue, 4 Nov 2014 20:07:31 +0000 (20:07 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 4 Nov 2014 20:07:31 +0000 (20:07 +0000)
30 files changed:
opendaylight/md-sal/benchmark-data-store/pom.xml
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryBrokerWriteTransactionBenchmark.java [new file with mode: 0644]
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryDatastoreWriteTransactionBenchmark.java
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryWriteTransactionBenchmark.java [new file with mode: 0644]
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryBrokerWriteTransactionBenchmark.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto
opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java

index ac384319b8bb64c90fdf23abebf32df7a70e2de8..7d47add017802bd1e077cd97b8178e16d2757784 100644 (file)
@@ -40,6 +40,14 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-inmemory-datastore</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-broker-impl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
@@ -69,4 +77,4 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       </plugin>
     </plugins>
   </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryBrokerWriteTransactionBenchmark.java b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryBrokerWriteTransactionBenchmark.java
new file mode 100644 (file)
index 0000000..fdd715e
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.benchmark;
+
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * @author Lukas Sedlak <lsedlak@cisco.com>
+ */
+public abstract class AbstractInMemoryBrokerWriteTransactionBenchmark extends AbstractInMemoryWriteTransactionBenchmark {
+
+    protected DOMDataBrokerImpl domBroker;
+
+    protected void initTestNode() throws Exception {
+        final YangInstanceIdentifier testPath = YangInstanceIdentifier.builder(BenchmarkModel.TEST_PATH)
+            .build();
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        writeTx.put(LogicalDatastoreType.OPERATIONAL, testPath, provideOuterListNode());
+
+        writeTx.submit().get();
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write100KSingleNodeWithOneInnerItemInOneCommitBenchmark() throws Exception {
+
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_100K; ++outerListKey) {
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_100K_PATHS[outerListKey], OUTER_LIST_ONE_ITEM_INNER_LIST[outerListKey]);
+        }
+
+        writeTx.submit().get();
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write100KSingleNodeWithOneInnerItemInCommitPerWriteBenchmark() throws Exception {
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_100K; ++outerListKey) {
+            DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_100K_PATHS[outerListKey], OUTER_LIST_ONE_ITEM_INNER_LIST[outerListKey]);
+
+            writeTx.submit().get();
+        }
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write50KSingleNodeWithTwoInnerItemsInOneCommitBenchmark() throws Exception {
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_50K; ++outerListKey) {
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_50K_PATHS[outerListKey], OUTER_LIST_TWO_ITEM_INNER_LIST[outerListKey]);
+        }
+
+        writeTx.submit().get();
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write50KSingleNodeWithTwoInnerItemsInCommitPerWriteBenchmark() throws Exception {
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_50K; ++outerListKey) {
+            DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_50K_PATHS[outerListKey], OUTER_LIST_TWO_ITEM_INNER_LIST[outerListKey]);
+            writeTx.submit().get();
+        }
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write10KSingleNodeWithTenInnerItemsInOneCommitBenchmark() throws Exception {
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_10K; ++outerListKey) {
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_10K_PATHS[outerListKey], OUTER_LIST_TEN_ITEM_INNER_LIST[outerListKey]);
+        }
+        writeTx.submit().get();
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write10KSingleNodeWithTenInnerItemsInCommitPerWriteBenchmark() throws Exception {
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_10K; ++outerListKey) {
+            DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_10K_PATHS[outerListKey], OUTER_LIST_TEN_ITEM_INNER_LIST[outerListKey]);
+            writeTx.submit().get();
+        }
+    }
+}
index aa5ef61ce4c3965f289905641676194d656d6357..fce0642860ffaa1827fe09f399911a3b50095ab2 100644 (file)
@@ -12,14 +12,6 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Warmup;
@@ -27,67 +19,10 @@ import org.openjdk.jmh.annotations.Warmup;
 /**
  * @author Lukas Sedlak <lsedlak@cisco.com>
  */
-public abstract class AbstractInMemoryDatastoreWriteTransactionBenchmark {
+public abstract class AbstractInMemoryDatastoreWriteTransactionBenchmark extends AbstractInMemoryWriteTransactionBenchmark {
 
-    private static final int WARMUP_ITERATIONS = 20;
-    private static final int MEASUREMENT_ITERATIONS = 20;
-
-    private static final int OUTER_LIST_100K = 100000;
-    private static final int OUTER_LIST_50K = 50000;
-    private static final int OUTER_LIST_10K = 10000;
-
-    private static final YangInstanceIdentifier[] OUTER_LIST_100K_PATHS = initOuterListPaths(OUTER_LIST_100K);
-    private static final YangInstanceIdentifier[] OUTER_LIST_50K_PATHS = initOuterListPaths(OUTER_LIST_50K);
-    private static final YangInstanceIdentifier[] OUTER_LIST_10K_PATHS = initOuterListPaths(OUTER_LIST_10K);
-
-    private static YangInstanceIdentifier[] initOuterListPaths(final int outerListPathsCount) {
-        final YangInstanceIdentifier[] paths = new YangInstanceIdentifier[outerListPathsCount];
-
-        for (int outerListKey = 0; outerListKey < outerListPathsCount; ++outerListKey) {
-            paths[outerListKey] = YangInstanceIdentifier.builder(BenchmarkModel.OUTER_LIST_PATH)
-                .nodeWithKey(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
-                .build();
-        }
-        return paths;
-    }
-
-    private static final MapNode ONE_ITEM_INNER_LIST = initInnerListItems(1);
-    private static final MapNode TWO_ITEM_INNER_LIST = initInnerListItems(2);
-    private static final MapNode TEN_ITEM_INNER_LIST = initInnerListItems(10);
-
-    private static MapNode initInnerListItems(final int count) {
-        final CollectionNodeBuilder<MapEntryNode, MapNode> mapEntryBuilder = ImmutableNodes
-            .mapNodeBuilder(BenchmarkModel.INNER_LIST_QNAME);
-
-        for (int i = 1; i <= count; ++i) {
-            mapEntryBuilder
-                .withChild(ImmutableNodes.mapEntry(BenchmarkModel.INNER_LIST_QNAME, BenchmarkModel.NAME_QNAME, i));
-        }
-        return mapEntryBuilder.build();
-    }
-
-    private static final NormalizedNode<?, ?>[] OUTER_LIST_ONE_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_100K, ONE_ITEM_INNER_LIST);
-    private static final NormalizedNode<?, ?>[] OUTER_LIST_TWO_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_50K, TWO_ITEM_INNER_LIST);
-    private static final NormalizedNode<?, ?>[] OUTER_LIST_TEN_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_10K, TEN_ITEM_INNER_LIST);
-
-    private static NormalizedNode<?,?>[] initOuterListItems(int outerListItemsCount, MapNode innerList) {
-        final NormalizedNode<?,?>[] outerListItems = new NormalizedNode[outerListItemsCount];
-
-        for (int i = 0; i < outerListItemsCount; ++i) {
-            int outerListKey = i;
-            outerListItems[i] = ImmutableNodes.mapEntryBuilder(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
-                .withChild(innerList).build();
-        }
-        return outerListItems;
-    }
-
-    protected SchemaContext schemaContext;
     protected InMemoryDOMDataStore domStore;
 
-    abstract public void setUp() throws Exception;
-
-    abstract public void tearDown();
-
     protected void initTestNode() throws Exception {
         final YangInstanceIdentifier testPath = YangInstanceIdentifier.builder(BenchmarkModel.TEST_PATH)
             .build();
@@ -100,15 +35,6 @@ public abstract class AbstractInMemoryDatastoreWriteTransactionBenchmark {
         cohort.commit().get();
     }
 
-    private DataContainerChild<?, ?> provideOuterListNode() {
-        return ImmutableContainerNodeBuilder
-            .create()
-            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BenchmarkModel.TEST_QNAME))
-            .withChild(
-                ImmutableNodes.mapNodeBuilder(BenchmarkModel.OUTER_LIST_QNAME)
-                    .build()).build();
-    }
-
     @Benchmark
     @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
     @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
diff --git a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryWriteTransactionBenchmark.java b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryWriteTransactionBenchmark.java
new file mode 100644 (file)
index 0000000..6c256eb
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.benchmark;
+
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public abstract class AbstractInMemoryWriteTransactionBenchmark {
+    protected static final int OUTER_LIST_100K = 100000;
+    protected static final int OUTER_LIST_50K = 50000;
+    protected static final int OUTER_LIST_10K = 10000;
+
+    protected static final YangInstanceIdentifier[] OUTER_LIST_100K_PATHS = initOuterListPaths(OUTER_LIST_100K);
+    protected static final YangInstanceIdentifier[] OUTER_LIST_50K_PATHS = initOuterListPaths(OUTER_LIST_50K);
+    protected static final YangInstanceIdentifier[] OUTER_LIST_10K_PATHS = initOuterListPaths(OUTER_LIST_10K);
+
+    private static YangInstanceIdentifier[] initOuterListPaths(final int outerListPathsCount) {
+        final YangInstanceIdentifier[] paths = new YangInstanceIdentifier[outerListPathsCount];
+
+        for (int outerListKey = 0; outerListKey < outerListPathsCount; ++outerListKey) {
+            paths[outerListKey] = YangInstanceIdentifier.builder(BenchmarkModel.OUTER_LIST_PATH)
+                .nodeWithKey(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
+                .build();
+        }
+        return paths;
+    }
+
+    protected static final int WARMUP_ITERATIONS = 20;
+    protected static final int MEASUREMENT_ITERATIONS = 20;
+
+    protected static final MapNode ONE_ITEM_INNER_LIST = initInnerListItems(1);
+    protected static final MapNode TWO_ITEM_INNER_LIST = initInnerListItems(2);
+    protected static final MapNode TEN_ITEM_INNER_LIST = initInnerListItems(10);
+
+    private static MapNode initInnerListItems(final int count) {
+        final CollectionNodeBuilder<MapEntryNode, MapNode> mapEntryBuilder = ImmutableNodes
+            .mapNodeBuilder(BenchmarkModel.INNER_LIST_QNAME);
+
+        for (int i = 1; i <= count; ++i) {
+            mapEntryBuilder
+                .withChild(ImmutableNodes.mapEntry(BenchmarkModel.INNER_LIST_QNAME, BenchmarkModel.NAME_QNAME, i));
+        }
+        return mapEntryBuilder.build();
+    }
+
+    protected static final NormalizedNode<?, ?>[] OUTER_LIST_ONE_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_100K, ONE_ITEM_INNER_LIST);
+    protected static final NormalizedNode<?, ?>[] OUTER_LIST_TWO_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_50K, TWO_ITEM_INNER_LIST);
+    protected static final NormalizedNode<?, ?>[] OUTER_LIST_TEN_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_10K, TEN_ITEM_INNER_LIST);
+
+    private static NormalizedNode<?,?>[] initOuterListItems(final int outerListItemsCount, final MapNode innerList) {
+        final NormalizedNode<?,?>[] outerListItems = new NormalizedNode[outerListItemsCount];
+
+        for (int i = 0; i < outerListItemsCount; ++i) {
+            int outerListKey = i;
+            outerListItems[i] = ImmutableNodes.mapEntryBuilder(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
+                .withChild(innerList).build();
+        }
+        return outerListItems;
+    }
+
+    protected SchemaContext schemaContext;
+    abstract public void setUp() throws Exception;
+    abstract public void tearDown();
+
+    protected static DataContainerChild<?, ?> provideOuterListNode() {
+        return ImmutableContainerNodeBuilder
+            .create()
+            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BenchmarkModel.TEST_QNAME))
+            .withChild(
+                ImmutableNodes.mapNodeBuilder(BenchmarkModel.OUTER_LIST_QNAME)
+                    .build()).build();
+    }
+}
diff --git a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryBrokerWriteTransactionBenchmark.java b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryBrokerWriteTransactionBenchmark.java
new file mode 100644 (file)
index 0000000..a46a6d1
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.benchmark;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+@State(Scope.Thread)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+public class InMemoryBrokerWriteTransactionBenchmark extends AbstractInMemoryBrokerWriteTransactionBenchmark {
+    private ListeningExecutorService executor;
+
+    @Setup(Level.Trial)
+    @Override
+    public void setUp() throws Exception {
+        ListeningExecutorService dsExec = MoreExecutors.sameThreadExecutor();
+        executor = MoreExecutors.listeningDecorator(
+            MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)Executors.newFixedThreadPool(1), 1L, TimeUnit.SECONDS));
+
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", dsExec,
+            MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", dsExec,
+            MoreExecutors.sameThreadExecutor());
+        Map<LogicalDatastoreType, DOMStore> datastores = ImmutableMap.of(
+            LogicalDatastoreType.OPERATIONAL, (DOMStore)operStore,
+            LogicalDatastoreType.CONFIGURATION, configStore);
+
+        domBroker = new DOMDataBrokerImpl(datastores, executor);
+        schemaContext = BenchmarkModel.createTestContext();
+        configStore.onGlobalContextUpdated(schemaContext);
+        operStore.onGlobalContextUpdated(schemaContext);
+        initTestNode();
+    }
+
+    @Override
+    public void tearDown() {
+        domBroker.close();
+        executor.shutdown();
+    }
+}
index 69af77c7496b26ed02033d97fd8572025814cc9f..fd9912244a620c6bc060cb465dd0da51449ce47d 100644 (file)
@@ -358,6 +358,11 @@ public class RaftActorTest extends AbstractActorTest {
                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
 
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                // Wait for akka's recovery to complete so it doesn't interfere.
+                mockRaftActor.waitForRecoveryComplete();
+
                 ByteString snapshotBytes  = fromObject(Arrays.asList(
                         new MockRaftActorContext.MockPayload("A"),
                         new MockRaftActorContext.MockPayload("B"),
@@ -367,8 +372,6 @@ public class RaftActorTest extends AbstractActorTest {
                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
 
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
                 CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
@@ -402,8 +405,6 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
 
-                mockRaftActor.waitForRecoveryComplete();
-
                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
 
             }};
@@ -430,6 +431,9 @@ public class RaftActorTest extends AbstractActorTest {
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+                // Wait for akka's recovery to complete so it doesn't interfere.
+                mockRaftActor.waitForRecoveryComplete();
+
                 ByteString snapshotBytes  = fromObject(Arrays.asList(
                         new MockRaftActorContext.MockPayload("A"),
                         new MockRaftActorContext.MockPayload("B"),
@@ -470,8 +474,6 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
 
-                mockRaftActor.waitForRecoveryComplete();
-
                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
             }};
     }
index e43b44582d77b5526234e301535d368a2f83fede..2a79a5b8275ca8a547b39268dbc8764aa529def6 100644 (file)
@@ -11,17 +11,17 @@ public final class ThreePhaseCommitCohortMessages {
   public interface CanCommitTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     boolean hasTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     java.lang.String getTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
@@ -122,17 +122,17 @@ public final class ThreePhaseCommitCohortMessages {
     }
 
     private int bitField0_;
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     public static final int TRANSACTIONID_FIELD_NUMBER = 1;
     private java.lang.Object transactionId_;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public boolean hasTransactionId() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public java.lang.String getTransactionId() {
       java.lang.Object ref = transactionId_;
@@ -149,7 +149,7 @@ public final class ThreePhaseCommitCohortMessages {
       }
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public com.google.protobuf.ByteString
         getTransactionIdBytes() {
@@ -173,10 +173,6 @@ public final class ThreePhaseCommitCohortMessages {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
-      if (!hasTransactionId()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -376,10 +372,6 @@ public final class ThreePhaseCommitCohortMessages {
       }
 
       public final boolean isInitialized() {
-        if (!hasTransactionId()) {
-
-          return false;
-        }
         return true;
       }
 
@@ -402,16 +394,16 @@ public final class ThreePhaseCommitCohortMessages {
       }
       private int bitField0_;
 
-      // required string transactionId = 1;
+      // optional string transactionId = 1;
       private java.lang.Object transactionId_ = "";
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public boolean hasTransactionId() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public java.lang.String getTransactionId() {
         java.lang.Object ref = transactionId_;
@@ -425,7 +417,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public com.google.protobuf.ByteString
           getTransactionIdBytes() {
@@ -441,7 +433,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionId(
           java.lang.String value) {
@@ -454,7 +446,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder clearTransactionId() {
         bitField0_ = (bitField0_ & ~0x00000001);
@@ -463,7 +455,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionIdBytes(
           com.google.protobuf.ByteString value) {
@@ -894,17 +886,17 @@ public final class ThreePhaseCommitCohortMessages {
   public interface AbortTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     boolean hasTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     java.lang.String getTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
@@ -1005,17 +997,17 @@ public final class ThreePhaseCommitCohortMessages {
     }
 
     private int bitField0_;
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     public static final int TRANSACTIONID_FIELD_NUMBER = 1;
     private java.lang.Object transactionId_;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public boolean hasTransactionId() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public java.lang.String getTransactionId() {
       java.lang.Object ref = transactionId_;
@@ -1032,7 +1024,7 @@ public final class ThreePhaseCommitCohortMessages {
       }
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public com.google.protobuf.ByteString
         getTransactionIdBytes() {
@@ -1056,10 +1048,6 @@ public final class ThreePhaseCommitCohortMessages {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
-      if (!hasTransactionId()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -1259,10 +1247,6 @@ public final class ThreePhaseCommitCohortMessages {
       }
 
       public final boolean isInitialized() {
-        if (!hasTransactionId()) {
-
-          return false;
-        }
         return true;
       }
 
@@ -1285,16 +1269,16 @@ public final class ThreePhaseCommitCohortMessages {
       }
       private int bitField0_;
 
-      // required string transactionId = 1;
+      // optional string transactionId = 1;
       private java.lang.Object transactionId_ = "";
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public boolean hasTransactionId() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public java.lang.String getTransactionId() {
         java.lang.Object ref = transactionId_;
@@ -1308,7 +1292,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public com.google.protobuf.ByteString
           getTransactionIdBytes() {
@@ -1324,7 +1308,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionId(
           java.lang.String value) {
@@ -1337,7 +1321,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder clearTransactionId() {
         bitField0_ = (bitField0_ & ~0x00000001);
@@ -1346,7 +1330,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionIdBytes(
           com.google.protobuf.ByteString value) {
@@ -1682,17 +1666,17 @@ public final class ThreePhaseCommitCohortMessages {
   public interface CommitTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     boolean hasTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     java.lang.String getTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
@@ -1793,17 +1777,17 @@ public final class ThreePhaseCommitCohortMessages {
     }
 
     private int bitField0_;
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     public static final int TRANSACTIONID_FIELD_NUMBER = 1;
     private java.lang.Object transactionId_;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public boolean hasTransactionId() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public java.lang.String getTransactionId() {
       java.lang.Object ref = transactionId_;
@@ -1820,7 +1804,7 @@ public final class ThreePhaseCommitCohortMessages {
       }
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public com.google.protobuf.ByteString
         getTransactionIdBytes() {
@@ -1844,10 +1828,6 @@ public final class ThreePhaseCommitCohortMessages {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
-      if (!hasTransactionId()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -2047,10 +2027,6 @@ public final class ThreePhaseCommitCohortMessages {
       }
 
       public final boolean isInitialized() {
-        if (!hasTransactionId()) {
-
-          return false;
-        }
         return true;
       }
 
@@ -2073,16 +2049,16 @@ public final class ThreePhaseCommitCohortMessages {
       }
       private int bitField0_;
 
-      // required string transactionId = 1;
+      // optional string transactionId = 1;
       private java.lang.Object transactionId_ = "";
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public boolean hasTransactionId() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public java.lang.String getTransactionId() {
         java.lang.Object ref = transactionId_;
@@ -2096,7 +2072,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public com.google.protobuf.ByteString
           getTransactionIdBytes() {
@@ -2112,7 +2088,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionId(
           java.lang.String value) {
@@ -2125,7 +2101,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder clearTransactionId() {
         bitField0_ = (bitField0_ & ~0x00000001);
@@ -2134,7 +2110,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionIdBytes(
           com.google.protobuf.ByteString value) {
@@ -3136,11 +3112,11 @@ public final class ThreePhaseCommitCohortMessages {
     java.lang.String[] descriptorData = {
       "\n\014Cohort.proto\022!org.opendaylight.control" +
       "ler.mdsal\"-\n\024CanCommitTransaction\022\025\n\rtra" +
-      "nsactionId\030\001 \002(\t\".\n\031CanCommitTransaction" +
+      "nsactionId\030\001 \001(\t\".\n\031CanCommitTransaction" +
       "Reply\022\021\n\tcanCommit\030\001 \002(\010\")\n\020AbortTransac" +
-      "tion\022\025\n\rtransactionId\030\001 \002(\t\"\027\n\025AbortTran" +
+      "tion\022\025\n\rtransactionId\030\001 \001(\t\"\027\n\025AbortTran" +
       "sactionReply\"*\n\021CommitTransaction\022\025\n\rtra" +
-      "nsactionId\030\001 \002(\t\"\030\n\026CommitTransactionRep" +
+      "nsactionId\030\001 \001(\t\"\030\n\026CommitTransactionRep" +
       "ly\"\026\n\024PreCommitTransaction\"\033\n\031PreCommitT" +
       "ransactionReplyBZ\n8org.opendaylight.cont" +
       "roller.protobuff.messages.cohort3pcB\036Thr",
index 96a39bddd376a9a777be0c056693cae8a892925d..3a1cfaa443a8e735965ed3b8d92f9037ee7ead36 100644 (file)
@@ -668,6 +668,16 @@ public final class ShardTransactionMessages {
      */
     com.google.protobuf.ByteString
         getTransactionChainIdBytes();
+
+    // optional int32 messageVersion = 4;
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    boolean hasMessageVersion();
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    int getMessageVersion();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransaction}
@@ -735,6 +745,11 @@ public final class ShardTransactionMessages {
               transactionChainId_ = input.readBytes();
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              messageVersion_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -877,10 +892,27 @@ public final class ShardTransactionMessages {
       }
     }
 
+    // optional int32 messageVersion = 4;
+    public static final int MESSAGEVERSION_FIELD_NUMBER = 4;
+    private int messageVersion_;
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    public boolean hasMessageVersion() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    public int getMessageVersion() {
+      return messageVersion_;
+    }
+
     private void initFields() {
       transactionId_ = "";
       transactionType_ = 0;
       transactionChainId_ = "";
+      messageVersion_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -911,6 +943,9 @@ public final class ShardTransactionMessages {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBytes(3, getTransactionChainIdBytes());
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt32(4, messageVersion_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -932,6 +967,10 @@ public final class ShardTransactionMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(3, getTransactionChainIdBytes());
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(4, messageVersion_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1054,6 +1093,8 @@ public final class ShardTransactionMessages {
         bitField0_ = (bitField0_ & ~0x00000002);
         transactionChainId_ = "";
         bitField0_ = (bitField0_ & ~0x00000004);
+        messageVersion_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -1094,6 +1135,10 @@ public final class ShardTransactionMessages {
           to_bitField0_ |= 0x00000004;
         }
         result.transactionChainId_ = transactionChainId_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.messageVersion_ = messageVersion_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1123,6 +1168,9 @@ public final class ShardTransactionMessages {
           transactionChainId_ = other.transactionChainId_;
           onChanged();
         }
+        if (other.hasMessageVersion()) {
+          setMessageVersion(other.getMessageVersion());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1339,6 +1387,39 @@ public final class ShardTransactionMessages {
         return this;
       }
 
+      // optional int32 messageVersion = 4;
+      private int messageVersion_ ;
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public boolean hasMessageVersion() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public int getMessageVersion() {
+        return messageVersion_;
+      }
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public Builder setMessageVersion(int value) {
+        bitField0_ |= 0x00000008;
+        messageVersion_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public Builder clearMessageVersion() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        messageVersion_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransaction)
     }
 
@@ -1382,6 +1463,16 @@ public final class ShardTransactionMessages {
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
+
+    // optional int32 messageVersion = 3;
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    boolean hasMessageVersion();
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    int getMessageVersion();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransactionReply}
@@ -1444,6 +1535,11 @@ public final class ShardTransactionMessages {
               transactionId_ = input.readBytes();
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              messageVersion_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1570,9 +1666,26 @@ public final class ShardTransactionMessages {
       }
     }
 
+    // optional int32 messageVersion = 3;
+    public static final int MESSAGEVERSION_FIELD_NUMBER = 3;
+    private int messageVersion_;
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    public boolean hasMessageVersion() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    public int getMessageVersion() {
+      return messageVersion_;
+    }
+
     private void initFields() {
       transactionActorPath_ = "";
       transactionId_ = "";
+      messageVersion_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1600,6 +1713,9 @@ public final class ShardTransactionMessages {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeBytes(2, getTransactionIdBytes());
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt32(3, messageVersion_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1617,6 +1733,10 @@ public final class ShardTransactionMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(2, getTransactionIdBytes());
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(3, messageVersion_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1737,6 +1857,8 @@ public final class ShardTransactionMessages {
         bitField0_ = (bitField0_ & ~0x00000001);
         transactionId_ = "";
         bitField0_ = (bitField0_ & ~0x00000002);
+        messageVersion_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
 
@@ -1773,6 +1895,10 @@ public final class ShardTransactionMessages {
           to_bitField0_ |= 0x00000002;
         }
         result.transactionId_ = transactionId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.messageVersion_ = messageVersion_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1799,6 +1925,9 @@ public final class ShardTransactionMessages {
           transactionId_ = other.transactionId_;
           onChanged();
         }
+        if (other.hasMessageVersion()) {
+          setMessageVersion(other.getMessageVersion());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1982,6 +2111,39 @@ public final class ShardTransactionMessages {
         return this;
       }
 
+      // optional int32 messageVersion = 3;
+      private int messageVersion_ ;
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public boolean hasMessageVersion() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public int getMessageVersion() {
+        return messageVersion_;
+      }
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public Builder setMessageVersion(int value) {
+        bitField0_ |= 0x00000004;
+        messageVersion_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public Builder clearMessageVersion() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        messageVersion_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransactionReply)
     }
 
@@ -7753,37 +7915,38 @@ public final class ShardTransactionMessages {
     java.lang.String[] descriptorData = {
       "\n\026ShardTransaction.proto\022!org.opendaylig" +
       "ht.controller.mdsal\032\014Common.proto\"\022\n\020Clo" +
-      "seTransaction\"\027\n\025CloseTransactionReply\"_" +
+      "seTransaction\"\027\n\025CloseTransactionReply\"w" +
       "\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" +
       "(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" +
-      "onChainId\030\003 \001(\t\"M\n\026CreateTransactionRepl" +
-      "y\022\034\n\024transactionActorPath\030\001 \002(\t\022\025\n\rtrans" +
-      "actionId\030\002 \002(\t\"\022\n\020ReadyTransaction\"*\n\025Re" +
-      "adyTransactionReply\022\021\n\tactorPath\030\001 \002(\t\"l" +
-      "\n\nDeleteData\022^\n\037instanceIdentifierPathAr",
+      "onChainId\030\003 \001(\t\022\026\n\016messageVersion\030\004 \001(\005\"" +
+      "e\n\026CreateTransactionReply\022\034\n\024transaction" +
+      "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\022\026" +
+      "\n\016messageVersion\030\003 \001(\005\"\022\n\020ReadyTransacti" +
+      "on\"*\n\025ReadyTransactionReply\022\021\n\tactorPath",
+      "\030\001 \002(\t\"l\n\nDeleteData\022^\n\037instanceIdentifi" +
+      "erPathArguments\030\001 \002(\01325.org.opendaylight" +
+      ".controller.mdsal.InstanceIdentifier\"\021\n\017" +
+      "DeleteDataReply\"j\n\010ReadData\022^\n\037instanceI" +
+      "dentifierPathArguments\030\001 \002(\01325.org.opend" +
+      "aylight.controller.mdsal.InstanceIdentif" +
+      "ier\"P\n\rReadDataReply\022?\n\016normalizedNode\030\001" +
+      " \001(\0132\'.org.opendaylight.controller.mdsal" +
+      ".Node\"\254\001\n\tWriteData\022^\n\037instanceIdentifie" +
+      "rPathArguments\030\001 \002(\01325.org.opendaylight.",
+      "controller.mdsal.InstanceIdentifier\022?\n\016n" +
+      "ormalizedNode\030\002 \002(\0132\'.org.opendaylight.c" +
+      "ontroller.mdsal.Node\"\020\n\016WriteDataReply\"\254" +
+      "\001\n\tMergeData\022^\n\037instanceIdentifierPathAr" +
       "guments\030\001 \002(\01325.org.opendaylight.control" +
-      "ler.mdsal.InstanceIdentifier\"\021\n\017DeleteDa" +
-      "taReply\"j\n\010ReadData\022^\n\037instanceIdentifie" +
-      "rPathArguments\030\001 \002(\01325.org.opendaylight." +
-      "controller.mdsal.InstanceIdentifier\"P\n\rR" +
-      "eadDataReply\022?\n\016normalizedNode\030\001 \001(\0132\'.o" +
-      "rg.opendaylight.controller.mdsal.Node\"\254\001" +
-      "\n\tWriteData\022^\n\037instanceIdentifierPathArg" +
-      "uments\030\001 \002(\01325.org.opendaylight.controll" +
-      "er.mdsal.InstanceIdentifier\022?\n\016normalize",
-      "dNode\030\002 \002(\0132\'.org.opendaylight.controlle" +
-      "r.mdsal.Node\"\020\n\016WriteDataReply\"\254\001\n\tMerge" +
-      "Data\022^\n\037instanceIdentifierPathArguments\030" +
-      "\001 \002(\01325.org.opendaylight.controller.mdsa" +
-      "l.InstanceIdentifier\022?\n\016normalizedNode\030\002" +
-      " \002(\0132\'.org.opendaylight.controller.mdsal" +
-      ".Node\"\020\n\016MergeDataReply\"l\n\nDataExists\022^\n" +
-      "\037instanceIdentifierPathArguments\030\001 \002(\01325" +
-      ".org.opendaylight.controller.mdsal.Insta" +
-      "nceIdentifier\"!\n\017DataExistsReply\022\016\n\006exis",
-      "ts\030\001 \002(\010BV\n:org.opendaylight.controller." +
-      "protobuff.messages.transactionB\030ShardTra" +
-      "nsactionMessages"
+      "ler.mdsal.InstanceIdentifier\022?\n\016normaliz" +
+      "edNode\030\002 \002(\0132\'.org.opendaylight.controll" +
+      "er.mdsal.Node\"\020\n\016MergeDataReply\"l\n\nDataE" +
+      "xists\022^\n\037instanceIdentifierPathArguments" +
+      "\030\001 \002(\01325.org.opendaylight.controller.mds",
+      "al.InstanceIdentifier\"!\n\017DataExistsReply" +
+      "\022\016\n\006exists\030\001 \002(\010BV\n:org.opendaylight.con" +
+      "troller.protobuff.messages.transactionB\030" +
+      "ShardTransactionMessages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -7807,13 +7970,13 @@ public final class ShardTransactionMessages {
           internal_static_org_opendaylight_controller_mdsal_CreateTransaction_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CreateTransaction_descriptor,
-              new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", });
+              new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", "MessageVersion", });
           internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor,
-              new java.lang.String[] { "TransactionActorPath", "TransactionId", });
+              new java.lang.String[] { "TransactionActorPath", "TransactionId", "MessageVersion", });
           internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_descriptor =
             getDescriptor().getMessageTypes().get(4);
           internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_fieldAccessorTable = new
index 49c6cd07a866199415116349041a9defe039d18c..222f68ab4f8f0572f20abde916d4b61bb6f25390 100644 (file)
@@ -5,7 +5,7 @@ option java_outer_classname = "ThreePhaseCommitCohortMessages";
 
 
 message CanCommitTransaction{
-  required string transactionId = 1;
+  optional string transactionId = 1;
 }
 
 message CanCommitTransactionReply{
@@ -14,7 +14,7 @@ message CanCommitTransactionReply{
 }
 
 message AbortTransaction{
-  required string transactionId = 1;
+  optional string transactionId = 1;
 }
 
 message AbortTransactionReply {
@@ -22,7 +22,7 @@ message AbortTransactionReply {
 }
 
 message CommitTransaction{
-  required string transactionId = 1;
+  optional string transactionId = 1;
 }
 
 message CommitTransactionReply{
index 26581478d97a1449ff88a07383e9a88c554aa9a5..c5e4ee45c037df8d3f03c6c3b2e13015c8317ef6 100644 (file)
@@ -16,12 +16,13 @@ message CreateTransaction{
   required string transactionId = 1;
   required int32  transactionType =2;
   optional string transactionChainId = 3;
+  optional int32 messageVersion = 4;
 }
 
 message CreateTransactionReply{
-required string transactionActorPath = 1;
-required string transactionId = 2;
-
+  required string transactionActorPath = 1;
+  required string transactionId = 2;
+  optional int32 messageVersion = 3;
 }
 
 message ReadyTransaction{
@@ -29,7 +30,7 @@ message ReadyTransaction{
 }
 
 message ReadyTransactionReply{
-required string actorPath = 1;
+  required string actorPath = 1;
 }
 
 message DeleteData {
index 7d67e0856fe69da8721cd7173c63ec79d294a83e..5ea9b30c63e7d3f9b44e9800eddb2b5e42f08657 100644 (file)
@@ -31,6 +31,7 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
+import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
@@ -374,7 +375,8 @@ public class Shard extends RaftActor {
     }
 
     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
-        LOG.debug("Readying transaction {}", ready.getTransactionID());
+        LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
+                ready.getTxnClientVersion());
 
         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
         // commitCoordinator in preparation for the subsequent three phase commit initiated by
@@ -382,12 +384,22 @@ public class Shard extends RaftActor {
         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
                 ready.getModification());
 
-        // Return our actor path as we'll handle the three phase commit.
-        ReadyTransactionReply readyTransactionReply =
-            new ReadyTransactionReply(Serialization.serializedActorPath(self()));
-        getSender().tell(
-            ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
-            getSelf());
+        // Return our actor path as we'll handle the three phase commit, except if the Tx client
+        // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
+        // node. In that case, the subsequent 3-phase commit messages won't contain the
+        // transactionId so to maintain backwards compatibility, we create a separate cohort actor
+        // to provide the compatible behavior.
+        ActorRef replyActorPath = self();
+        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+            LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+            replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+                    ready.getTransactionID()));
+        }
+
+        ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
+                Serialization.serializedActorPath(replyActorPath));
+        getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+                readyTransactionReply, getSelf());
     }
 
     private void handleAbortTransaction(AbortTransaction abort) {
@@ -465,10 +477,8 @@ public class Shard extends RaftActor {
         }
     }
 
-    private ActorRef createTypedTransactionActor(
-        int transactionType,
-        ShardTransactionIdentifier transactionId,
-        String transactionChainId ) {
+    private ActorRef createTypedTransactionActor(int transactionType,
+            ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
 
         DOMStoreTransactionFactory factory = store;
 
@@ -492,7 +502,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
                         schemaContext,datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
 
         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
 
@@ -501,7 +512,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
                         schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
 
 
         } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
@@ -511,7 +523,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
                         schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
@@ -521,10 +534,12 @@ public class Shard extends RaftActor {
 
     private void createTransaction(CreateTransaction createTransaction) {
         createTransaction(createTransaction.getTransactionType(),
-            createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+            createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+            createTransaction.getVersion());
     }
 
-    private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
+    private ActorRef createTransaction(int transactionType, String remoteTransactionId,
+            String transactionChainId, int clientVersion) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
@@ -533,8 +548,8 @@ public class Shard extends RaftActor {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Creating transaction : {} ", transactionId);
         }
-        ActorRef transactionActor =
-            createTypedTransactionActor(transactionType, transactionId, transactionChainId);
+        ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
+                transactionChainId, clientVersion);
 
         getSender()
             .tell(new CreateTransactionReply(
@@ -599,7 +614,7 @@ public class Shard extends RaftActor {
         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
                     listenerRegistration.path());
 
-        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
     }
 
     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
@@ -765,7 +780,8 @@ public class Shard extends RaftActor {
             // so that this actor does not get block building the snapshot
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
-                "createSnapshot" + ++createSnapshotTransactionCounter, "");
+                "createSnapshot" + ++createSnapshotTransactionCounter, "",
+                CreateTransaction.CURRENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
index d12e9997bb29175c7d6c414efdc72b339b706932..9d8f57252a2fc43bd316f213c9f8b68f4f4c453f 100644 (file)
@@ -26,8 +26,9 @@ public class ShardReadTransaction extends ShardTransaction {
     private final DOMStoreReadTransaction transaction;
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
-        super(shardActor, schemaContext, shardStats, transactionID);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+            int txnClientVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
         this.transaction = transaction;
     }
 
index b1fd02d2172cc4c28679c6e4ae86cd31cf2a2657..e558677ebbaba5598d1ca84875a6402764887f34 100644 (file)
@@ -26,8 +26,9 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction {
     private final DOMStoreReadWriteTransaction transaction;
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
-        super(transaction, shardActor, schemaContext, shardStats, transactionID);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+            int txnClientVersion) {
+        super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
         this.transaction = transaction;
     }
 
index 32de47f451d9ff53f301339975357440e651bea5..59bb4bfd77892e1a915563b3d435150de27c4ca1 100644 (file)
@@ -57,26 +57,29 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  */
 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
 
+    protected static final boolean SERIALIZED_REPLY = true;
+
     private final ActorRef shardActor;
     private final SchemaContext schemaContext;
     private final ShardStats shardStats;
     private final String transactionID;
-    protected static final boolean SERIALIZED_REPLY = true;
+    private final int txnClientVersion;
 
     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-            ShardStats shardStats, String transactionID) {
+            ShardStats shardStats, String transactionID, int txnClientVersion) {
         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
         this.schemaContext = schemaContext;
         this.shardStats = shardStats;
         this.transactionID = transactionID;
+        this.txnClientVersion = txnClientVersion;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
-            String transactionID) {
+            String transactionID, int txnClientVersion) {
         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
-           datastoreContext, shardStats, transactionID));
+           datastoreContext, shardStats, transactionID, txnClientVersion));
     }
 
     protected abstract DOMStoreTransaction getDOMStoreTransaction();
@@ -93,6 +96,10 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return schemaContext;
     }
 
+    protected int getTxnClientVersion() {
+        return txnClientVersion;
+    }
+
     @Override
     public void handleReceive(Object message) throws Exception {
         if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
@@ -169,16 +176,18 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
         final String transactionID;
+        final int txnClientVersion;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
                 SchemaContext schemaContext, DatastoreContext datastoreContext,
-                ShardStats shardStats, String transactionID) {
+                ShardStats shardStats, String transactionID, int txnClientVersion) {
             this.transaction = transaction;
             this.shardActor = shardActor;
             this.shardStats = shardStats;
             this.schemaContext = schemaContext;
             this.datastoreContext = datastoreContext;
             this.transactionID = transactionID;
+            this.txnClientVersion = txnClientVersion;
         }
 
         @Override
@@ -186,13 +195,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
             ShardTransaction tx;
             if(transaction instanceof DOMStoreReadWriteTransaction) {
                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID);
+                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
             } else if(transaction instanceof DOMStoreReadTransaction) {
                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        schemaContext, shardStats, transactionID);
+                        schemaContext, shardStats, transactionID, txnClientVersion);
             } else {
                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID);
+                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
index 943a82f6f9f03565369062bdd8f7779bea5921f0..78c6a558f4f291bca277c2793676f01fc6686cc7 100644 (file)
@@ -63,19 +63,22 @@ public class ShardTransactionChain extends AbstractUntypedActor {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId()), transactionName);
+                            createTransaction.getTransactionId(),
+                            createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId()), transactionName);
+                            createTransaction.getTransactionId(),
+                            createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId()), transactionName);
+                            createTransaction.getTransactionId(),
+                            createTransaction.getVersion()), transactionName);
         } else {
             throw new IllegalArgumentException (
                     "CreateTransaction message has unidentified transaction type=" +
index b0eaf98d59c9ccd2d1eda1f6d782736db238290e..44f2c7bd0a2794715d09cb29bb7a5ce42b352344 100644 (file)
@@ -42,8 +42,9 @@ public class ShardWriteTransaction extends ShardTransaction {
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
-        super(shardActor, schemaContext, shardStats, transactionID);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+            int txnClientVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
         this.transaction = transaction;
     }
 
@@ -144,8 +145,8 @@ public class ShardWriteTransaction extends ShardTransaction {
 
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
-        getShardActor().forward(new ForwardedReadyTransaction(transactionID, cohort, modification,
-                returnSerialized), getContext());
+        getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(),
+                cohort, modification, returnSerialized), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
         getSelf().tell(PoisonPill.getInstance(), getSelf());
index 239207a60ab58cbbbefc03ee150b9cc250230d86..ffb1ab7c55064ecf7683e1857800b256c9c82a1d 100644 (file)
@@ -49,6 +49,8 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,7 +58,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
 
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
@@ -668,8 +669,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
 
-            transactionContext = new TransactionContextImpl(transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal);
+            transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+                actorContext, schemaContext, isTxActorLocal, reply.getVersion());
         }
     }
 
@@ -712,17 +713,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         private final ActorContext actorContext;
         private final SchemaContext schemaContext;
+        private final String transactionPath;
         private final ActorSelection actor;
         private final boolean isTxActorLocal;
+        private final int remoteTransactionVersion;
 
-        private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+        private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
                 ActorContext actorContext, SchemaContext schemaContext,
-                boolean isTxActorLocal) {
+                boolean isTxActorLocal, int remoteTransactionVersion) {
             super(identifier);
+            this.transactionPath = transactionPath;
             this.actor = actor;
             this.actorContext = actorContext;
             this.schemaContext = schemaContext;
             this.isTxActorLocal = isTxActorLocal;
+            this.remoteTransactionVersion = remoteTransactionVersion;
         }
 
         private ActorSelection getActor() {
@@ -783,7 +788,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                     } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
-                        return actorContext.actorSelection(reply.getCohortPath());
+                        String cohortPath = reply.getCohortPath();
+
+                        // In Helium we used to return the local path of the actor which represented
+                        // a remote ThreePhaseCommitCohort. The local path would then be converted to
+                        // a remote path using this resolvePath method. To maintain compatibility with
+                        // a Helium node we need to continue to do this conversion.
+                        // At some point in the future when upgrades from Helium are not supported
+                        // we could remove this code to resolvePath and just use the cohortPath as the
+                        // resolved cohortPath
+                        if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+                            cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
+                        }
+
+                        return actorContext.actorSelection(cohortPath);
 
                     } else {
                         // Throwing an exception here will fail the Future.
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..30ab97c
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.compat;
+
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+
+/**
+ * An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit
+ * messages don't contain the transactionId. This actor just forwards a new message containing the
+ * transactionId to the parent Shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor {
+
+    private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
+
+    private final String transactionId;
+
+    private BackwardsCompatibleThreePhaseCommitCohort(String transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CanCommitTransaction");
+
+            getContext().parent().forward(new CanCommitTransaction(transactionId).toSerializable(),
+                    getContext());
+        } else if(message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort PreCommitTransaction");
+
+            // The Shard doesn't need the PreCommitTransaction message so just return the reply here.
+            getSender().tell(new PreCommitTransactionReply().toSerializable(), self());
+        } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CommitTransaction");
+
+            getContext().parent().forward(new CommitTransaction(transactionId).toSerializable(),
+                    getContext());
+
+            // We're done now - we can self-destruct
+            self().tell(PoisonPill.getInstance(), self());
+        } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort AbortTransaction");
+
+            getContext().parent().forward(new AbortTransaction(transactionId).toSerializable(),
+                    getContext());
+            self().tell(PoisonPill.getInstance(), self());
+        }
+    }
+
+    public static Props props(String transactionId) {
+        return Props.create(new BackwardsCompatibleThreePhaseCommitCohortCreator(transactionId));
+    }
+
+    private static class BackwardsCompatibleThreePhaseCommitCohortCreator
+                                  implements Creator<BackwardsCompatibleThreePhaseCommitCohort> {
+        private static final long serialVersionUID = 1L;
+
+        private final String transactionId;
+
+        BackwardsCompatibleThreePhaseCommitCohortCreator(String transactionId) {
+            this.transactionId = transactionId;
+        }
+
+        @Override
+        public BackwardsCompatibleThreePhaseCommitCohort create() throws Exception {
+            return new BackwardsCompatibleThreePhaseCommitCohort(transactionId);
+        }
+    }
+}
index 361d406ac80dda74fe33950ace971ece600b3d7a..bf82e660369c8d4f89e5cf48b22f6555f89bb65e 100644 (file)
@@ -13,24 +13,33 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 
 
 public class CreateTransaction implements SerializableMessage {
-    public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class;
+    public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.CreateTransaction.class;
+
+    public static final int HELIUM_1_VERSION = 1;
+    public static final int CURRENT_VERSION = HELIUM_1_VERSION;
+
     private final String transactionId;
     private final int transactionType;
     private final String transactionChainId;
+    private final int version;
 
     public CreateTransaction(String transactionId, int transactionType) {
         this(transactionId, transactionType, "");
     }
 
     public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
+        this(transactionId, transactionType, transactionChainId, CURRENT_VERSION);
+    }
 
+    private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
+            int version) {
         this.transactionId = transactionId;
         this.transactionType = transactionType;
         this.transactionChainId = transactionChainId;
-
+        this.version = version;
     }
 
-
     public String getTransactionId() {
         return transactionId;
     }
@@ -39,19 +48,25 @@ public class CreateTransaction implements SerializableMessage {
         return transactionType;
     }
 
+    public int getVersion() {
+        return version;
+    }
+
     @Override
     public Object toSerializable() {
         return ShardTransactionMessages.CreateTransaction.newBuilder()
             .setTransactionId(transactionId)
             .setTransactionType(transactionType)
-            .setTransactionChainId(transactionChainId).build();
+            .setTransactionChainId(transactionChainId)
+            .setMessageVersion(version).build();
     }
 
     public static CreateTransaction fromSerializable(Object message) {
         ShardTransactionMessages.CreateTransaction createTransaction =
             (ShardTransactionMessages.CreateTransaction) message;
         return new CreateTransaction(createTransaction.getTransactionId(),
-            createTransaction.getTransactionType(), createTransaction.getTransactionChainId());
+            createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
+            createTransaction.getMessageVersion());
     }
 
     public String getTransactionChainId() {
index 096d131d5a5517821ae2e228aa4c4332315a37a9..14620f15d98ffece07905a73cdd7d18222e9c023 100644 (file)
@@ -15,13 +15,21 @@ public class CreateTransactionReply implements SerializableMessage {
     public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
     private final String transactionPath;
     private final String transactionId;
+    private final int version;
 
     public CreateTransactionReply(String transactionPath,
         String transactionId) {
+        this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION);
+    }
+
+    public CreateTransactionReply(String transactionPath,
+                                  String transactionId, int version) {
         this.transactionPath = transactionPath;
         this.transactionId = transactionId;
+        this.version = version;
     }
 
+
     public String getTransactionPath() {
         return transactionPath;
     }
@@ -30,16 +38,21 @@ public class CreateTransactionReply implements SerializableMessage {
         return transactionId;
     }
 
+    public int getVersion() {
+        return version;
+    }
+
     public Object toSerializable(){
         return ShardTransactionMessages.CreateTransactionReply.newBuilder()
             .setTransactionActorPath(transactionPath)
             .setTransactionId(transactionId)
+            .setMessageVersion(version)
             .build();
     }
 
     public static CreateTransactionReply fromSerializable(Object serializable){
         ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
-        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId());
+        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion());
     }
 
 }
index 180108f2186b6efbab2977f1d54cf33811a4c638..38886c9a583c500e4fc9b86f54422c001482e777 100644 (file)
@@ -20,14 +20,16 @@ public class ForwardedReadyTransaction {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final Modification modification;
     private final boolean returnSerialized;
+    private final int txnClientVersion;
 
-    public ForwardedReadyTransaction(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
-            Modification modification, boolean returnSerialized) {
+    public ForwardedReadyTransaction(String transactionID, int txnClientVersion,
+            DOMStoreThreePhaseCommitCohort cohort, Modification modification,
+            boolean returnSerialized) {
         this.transactionID = transactionID;
         this.cohort = cohort;
         this.modification = modification;
         this.returnSerialized = returnSerialized;
-
+        this.txnClientVersion = txnClientVersion;
     }
 
     public String getTransactionID() {
@@ -45,4 +47,8 @@ public class ForwardedReadyTransaction {
     public boolean isReturnSerialized() {
         return returnSerialized;
     }
+
+    public int getTxnClientVersion() {
+        return txnClientVersion;
+    }
 }
index eee489177a5d1496b78bd47a8bf873d959082e06..282e23ed3bab4458ef771b338d6bf03861536662 100644 (file)
@@ -17,7 +17,6 @@ public class ReadyTransactionReply implements SerializableMessage {
     private final String cohortPath;
 
     public ReadyTransactionReply(String cohortPath) {
-
         this.cohortPath = cohortPath;
     }
 
@@ -27,8 +26,9 @@ public class ReadyTransactionReply implements SerializableMessage {
 
     @Override
     public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
-        return ShardTransactionMessages.ReadyTransactionReply.newBuilder().
-                setActorPath(cohortPath).build();
+        return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
+                .setActorPath(cohortPath)
+                .build();
     }
 
     public static ReadyTransactionReply fromSerializable(Object serializable) {
index e409168c852a99931c82770d9cd4c88b29f9cf06..904dcdf43989bebfeb180b5f56c489cbf035621a 100644 (file)
@@ -395,4 +395,28 @@ public class ActorContext {
 
         return hostPort1.equals(hostPort2);
     }
+
+    /**
+     * @deprecated This method is present only to support backward compatibility with Helium and should not be
+     * used any further
+     *
+     *
+     * @param primaryPath
+     * @param localPathOfRemoteActor
+     * @return
+    */
+    @Deprecated
+    public String resolvePath(final String primaryPath,
+                                            final String localPathOfRemoteActor) {
+        StringBuilder builder = new StringBuilder();
+        String[] primaryPathElements = primaryPath.split("/");
+        builder.append(primaryPathElements[0]).append("//")
+            .append(primaryPathElements[1]).append(primaryPathElements[2]);
+        String[] remotePathElements = localPathOfRemoteActor.split("/");
+        for (int i = 3; i < remotePathElements.length; i++) {
+                builder.append("/").append(remotePathElements[i]);
+            }
+
+        return builder.toString();
+    }
 }
index 101a73782b498943acb14b0cb03be2dde5df1f87..d5a12c73c57c0deb4fda3be2f11aac435a428bdc 100644 (file)
@@ -4,6 +4,7 @@ import akka.actor.ActorRef;
 import akka.actor.DeadLetter;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
@@ -70,19 +71,25 @@ public class DataChangeListenerTest extends AbstractActorTest {
             final Props props = DataChangeListener.props(mockListener);
             final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender");
 
-            // Let the DataChangeListener know that notifications should be enabled
-            subject.tell(new EnableNotification(true), ActorRef.noSender());
+            getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
 
             subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
                     ActorRef.noSender());
 
-            getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-                    expectNoMsg();
+            // Make sure no DataChangedReply is sent to DeadLetters.
+            while(true) {
+                DeadLetter deadLetter;
+                try {
+                    deadLetter = expectMsgClass(duration("1 seconds"), DeadLetter.class);
+                } catch (AssertionError e) {
+                    // Timed out - got no DeadLetter - this is good
+                    break;
                 }
-            };
+
+                // We may get DeadLetters for other messages we don't care about.
+                Assert.assertFalse("Unexpected DataChangedReply",
+                        deadLetter.message() instanceof DataChangedReply);
+            }
         }};
     }
 }
index cd8a65844756b51820c5f6fb93dfe62a53071d31..c6bb0d6b37061b8055d4065a990cc2acad21c935 100644 (file)
@@ -101,6 +101,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
 
 
 public class ShardTest extends AbstractActorTest {
@@ -611,7 +612,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
@@ -625,10 +627,12 @@ public class ShardTest extends AbstractActorTest {
 
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
@@ -792,10 +796,12 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -859,7 +865,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message.
@@ -902,7 +909,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message.
@@ -954,7 +962,8 @@ public class ShardTest extends AbstractActorTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                     modification, preCommit);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
@@ -1018,10 +1027,12 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
@@ -1080,13 +1091,16 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // canCommit 1st Tx.
@@ -1149,10 +1163,12 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1345,7 +1361,7 @@ public class ShardTest extends AbstractActorTest {
         };
     }
 
-    private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
+    static NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
             throws ExecutionException, InterruptedException {
         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
 
index 6375e3c7fba51fa02c077ac49d01def5eed5d22a..45b00f5f31b72c3d628be7957a09804c7e6b26be 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.pattern.AskTimeoutException;
-import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -32,8 +29,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.pattern.AskTimeoutException;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * Covers negative test cases
@@ -75,7 +76,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -104,7 +106,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -133,7 +136,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -162,7 +166,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -194,7 +199,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -231,7 +237,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -263,7 +270,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java
new file mode 100644 (file)
index 0000000..af07aee
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collections;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+
+/**
+ * Tests backwards compatibility support from Helium-1 to Helium.
+ *
+ * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the
+ * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction,
+ * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages
+ * would be sans transactionId so this test verifies the Shard handles that properly.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
+
+    @Test
+    public void testTransactionCommit() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            SchemaContext schemaContext = TestModel.createTestContext();
+            Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
+                    shardName("inventory").type("config").build(),
+                    Collections.<ShardIdentifier,String>emptyMap(),
+                    DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
+                    schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
+                    "testTransactionCommit");
+
+            waitUntilLeader(shard);
+
+            // Send CreateTransaction message with no messages version
+
+            String transactionID = "txn-1";
+            shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
+                    .setTransactionId(transactionID)
+                    .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+                    .setTransactionChainId("").build(), getRef());
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
+
+            ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
+
+            // Write data to the Tx
+
+            txActor.tell(new WriteData(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+
+            expectMsgClass(duration, WriteDataReply.class);
+
+            // Ready the Tx
+
+            txActor.tell(new ReadyTransaction().toSerializable(), getRef());
+
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
+                    duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+
+            ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
+
+            // Send the CanCommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the PreCommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the CommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            NormalizedNode<?, ?> node = ShardTest.readStore(shard, TestModel.TEST_PATH);
+            Assert.assertNotNull("Data not found in store", node);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testTransactionAbort() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            SchemaContext schemaContext = TestModel.createTestContext();
+            Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
+                    shardName("inventory").type("config").build(),
+                    Collections.<ShardIdentifier,String>emptyMap(),
+                    DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
+                    schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
+                    "testTransactionAbort");
+
+            waitUntilLeader(shard);
+
+            // Send CreateTransaction message with no messages version
+
+            String transactionID = "txn-1";
+            shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
+                    .setTransactionId(transactionID)
+                    .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+                    .setTransactionChainId("").build(), getRef());
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
+
+            ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
+
+            // Write data to the Tx
+
+            txActor.tell(new WriteData(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+
+            expectMsgClass(duration, WriteDataReply.class);
+
+            // Ready the Tx
+
+            txActor.tell(new ReadyTransaction().toSerializable(), getRef());
+
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
+                    duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+
+            ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
+
+            // Send the CanCommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the AbortTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+}
index 793df8e0ca9776f7d21c07fcbaf7788be8f562bd..c869be82d24bcbbd0cedca4b5a6e2a9001c46fad 100644 (file)
@@ -1,12 +1,11 @@
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.actor.Terminated;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
@@ -15,6 +14,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -39,12 +39,13 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 public class ShardTransactionTest extends AbstractActorTest {
     private static ListeningExecutorService storeExecutor =
@@ -78,12 +79,14 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
         }
@@ -114,13 +117,15 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRO"));
 
             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRW"));
@@ -150,12 +155,14 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
         }
@@ -183,12 +190,14 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
         }
@@ -228,7 +237,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
@@ -254,7 +264,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
@@ -279,7 +290,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
 
             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
@@ -301,7 +313,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
 
             watch(transaction);
@@ -318,7 +331,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
 
             watch(transaction);
@@ -339,7 +353,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
 
             watch(transaction);
@@ -355,7 +370,8 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@@ -370,7 +386,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction =
                 getSystem().actorOf(props, "testShardTransactionInactivity");
 
index 35f346f0d65fa4a5803f078dddf79871f21de34c..b77b0b65cf86edb06f370e768725c5f86cb3a439 100644 (file)
@@ -338,13 +338,15 @@ public class TransactionProxyTest {
         return getSystem().actorSelection(actorRef.path());
     }
 
-    private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+    private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
         return CreateTransactionReply.newBuilder()
             .setTransactionActorPath(actorRef.path().toString())
-            .setTransactionId("txn-1").build();
+            .setTransactionId("txn-1")
+            .setMessageVersion(transactionVersion)
+            .build();
     }
 
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
@@ -352,7 +354,7 @@ public class TransactionProxyTest {
         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
-        doReturn(Futures.successful(createTransactionReply(actorRef))).when(mockActorContext).
+        doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
 
@@ -361,6 +363,11 @@ public class TransactionProxyTest {
         return actorRef;
     }
 
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+    }
+
+
     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
             throws Throwable {
 
@@ -835,6 +842,47 @@ public class TransactionProxyTest {
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadyForwardCompatibility() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
+
+        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                WriteDataReply.SERIALIZABLE_CLASS);
+
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testReadyWithRecordingOperationFailure() throws Exception {
index 60f9a2d9dc4d9e2660137eaa77c7df491d124361..39d337e91b6005dc938e98232bbf105fc13ce220 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
@@ -17,7 +18,9 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
+
 import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -182,4 +185,51 @@ public class ActorContextTest extends AbstractActorTest{
         clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/");
         assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
     }
+
+    @Test
+    public void testResolvePathForRemoteActor() {
+        ActorContext actorContext =
+                new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
+                        ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
+                "akka://system/user/shardmanager/shard/transaction");
+
+        String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testResolvePathForLocalActor() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka://system/user/shardmanager/shard",
+                "akka://system/user/shardmanager/shard/transaction");
+
+        String expected = "akka://system/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testResolvePathForRemoteActorWithProperRemoteAddress() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
+                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
+
+        String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
 }