Merge "Remove l2switch sample"
authorTony Tkacik <ttkacik@cisco.com>
Thu, 6 Nov 2014 08:20:06 +0000 (08:20 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 6 Nov 2014 08:20:06 +0000 (08:20 +0000)
34 files changed:
opendaylight/md-sal/benchmark-data-store/pom.xml
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryBrokerWriteTransactionBenchmark.java [new file with mode: 0644]
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryDatastoreWriteTransactionBenchmark.java
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryWriteTransactionBenchmark.java [new file with mode: 0644]
opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryBrokerWriteTransactionBenchmark.java [new file with mode: 0644]
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Leader.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/MessageCollectorActor.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java
opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto
opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java
opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronSubnet.java

index ac38431..7d47add 100644 (file)
@@ -40,6 +40,14 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>sal-inmemory-datastore</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.opendaylight.controller</groupId>
+      <artifactId>sal-broker-impl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
@@ -69,4 +77,4 @@ and is available at http://www.eclipse.org/legal/epl-v10.html
       </plugin>
     </plugins>
   </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryBrokerWriteTransactionBenchmark.java b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryBrokerWriteTransactionBenchmark.java
new file mode 100644 (file)
index 0000000..fdd715e
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.benchmark;
+
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * @author Lukas Sedlak <lsedlak@cisco.com>
+ */
+public abstract class AbstractInMemoryBrokerWriteTransactionBenchmark extends AbstractInMemoryWriteTransactionBenchmark {
+
+    protected DOMDataBrokerImpl domBroker;
+
+    protected void initTestNode() throws Exception {
+        final YangInstanceIdentifier testPath = YangInstanceIdentifier.builder(BenchmarkModel.TEST_PATH)
+            .build();
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        writeTx.put(LogicalDatastoreType.OPERATIONAL, testPath, provideOuterListNode());
+
+        writeTx.submit().get();
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write100KSingleNodeWithOneInnerItemInOneCommitBenchmark() throws Exception {
+
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_100K; ++outerListKey) {
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_100K_PATHS[outerListKey], OUTER_LIST_ONE_ITEM_INNER_LIST[outerListKey]);
+        }
+
+        writeTx.submit().get();
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write100KSingleNodeWithOneInnerItemInCommitPerWriteBenchmark() throws Exception {
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_100K; ++outerListKey) {
+            DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_100K_PATHS[outerListKey], OUTER_LIST_ONE_ITEM_INNER_LIST[outerListKey]);
+
+            writeTx.submit().get();
+        }
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write50KSingleNodeWithTwoInnerItemsInOneCommitBenchmark() throws Exception {
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_50K; ++outerListKey) {
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_50K_PATHS[outerListKey], OUTER_LIST_TWO_ITEM_INNER_LIST[outerListKey]);
+        }
+
+        writeTx.submit().get();
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write50KSingleNodeWithTwoInnerItemsInCommitPerWriteBenchmark() throws Exception {
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_50K; ++outerListKey) {
+            DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_50K_PATHS[outerListKey], OUTER_LIST_TWO_ITEM_INNER_LIST[outerListKey]);
+            writeTx.submit().get();
+        }
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write10KSingleNodeWithTenInnerItemsInOneCommitBenchmark() throws Exception {
+        DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_10K; ++outerListKey) {
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_10K_PATHS[outerListKey], OUTER_LIST_TEN_ITEM_INNER_LIST[outerListKey]);
+        }
+        writeTx.submit().get();
+    }
+
+    @Benchmark
+    @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+    public void write10KSingleNodeWithTenInnerItemsInCommitPerWriteBenchmark() throws Exception {
+        for (int outerListKey = 0; outerListKey < OUTER_LIST_10K; ++outerListKey) {
+            DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+            writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_10K_PATHS[outerListKey], OUTER_LIST_TEN_ITEM_INNER_LIST[outerListKey]);
+            writeTx.submit().get();
+        }
+    }
+}
index aa5ef61..fce0642 100644 (file)
@@ -12,14 +12,6 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Warmup;
@@ -27,67 +19,10 @@ import org.openjdk.jmh.annotations.Warmup;
 /**
  * @author Lukas Sedlak <lsedlak@cisco.com>
  */
-public abstract class AbstractInMemoryDatastoreWriteTransactionBenchmark {
+public abstract class AbstractInMemoryDatastoreWriteTransactionBenchmark extends AbstractInMemoryWriteTransactionBenchmark {
 
-    private static final int WARMUP_ITERATIONS = 20;
-    private static final int MEASUREMENT_ITERATIONS = 20;
-
-    private static final int OUTER_LIST_100K = 100000;
-    private static final int OUTER_LIST_50K = 50000;
-    private static final int OUTER_LIST_10K = 10000;
-
-    private static final YangInstanceIdentifier[] OUTER_LIST_100K_PATHS = initOuterListPaths(OUTER_LIST_100K);
-    private static final YangInstanceIdentifier[] OUTER_LIST_50K_PATHS = initOuterListPaths(OUTER_LIST_50K);
-    private static final YangInstanceIdentifier[] OUTER_LIST_10K_PATHS = initOuterListPaths(OUTER_LIST_10K);
-
-    private static YangInstanceIdentifier[] initOuterListPaths(final int outerListPathsCount) {
-        final YangInstanceIdentifier[] paths = new YangInstanceIdentifier[outerListPathsCount];
-
-        for (int outerListKey = 0; outerListKey < outerListPathsCount; ++outerListKey) {
-            paths[outerListKey] = YangInstanceIdentifier.builder(BenchmarkModel.OUTER_LIST_PATH)
-                .nodeWithKey(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
-                .build();
-        }
-        return paths;
-    }
-
-    private static final MapNode ONE_ITEM_INNER_LIST = initInnerListItems(1);
-    private static final MapNode TWO_ITEM_INNER_LIST = initInnerListItems(2);
-    private static final MapNode TEN_ITEM_INNER_LIST = initInnerListItems(10);
-
-    private static MapNode initInnerListItems(final int count) {
-        final CollectionNodeBuilder<MapEntryNode, MapNode> mapEntryBuilder = ImmutableNodes
-            .mapNodeBuilder(BenchmarkModel.INNER_LIST_QNAME);
-
-        for (int i = 1; i <= count; ++i) {
-            mapEntryBuilder
-                .withChild(ImmutableNodes.mapEntry(BenchmarkModel.INNER_LIST_QNAME, BenchmarkModel.NAME_QNAME, i));
-        }
-        return mapEntryBuilder.build();
-    }
-
-    private static final NormalizedNode<?, ?>[] OUTER_LIST_ONE_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_100K, ONE_ITEM_INNER_LIST);
-    private static final NormalizedNode<?, ?>[] OUTER_LIST_TWO_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_50K, TWO_ITEM_INNER_LIST);
-    private static final NormalizedNode<?, ?>[] OUTER_LIST_TEN_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_10K, TEN_ITEM_INNER_LIST);
-
-    private static NormalizedNode<?,?>[] initOuterListItems(int outerListItemsCount, MapNode innerList) {
-        final NormalizedNode<?,?>[] outerListItems = new NormalizedNode[outerListItemsCount];
-
-        for (int i = 0; i < outerListItemsCount; ++i) {
-            int outerListKey = i;
-            outerListItems[i] = ImmutableNodes.mapEntryBuilder(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
-                .withChild(innerList).build();
-        }
-        return outerListItems;
-    }
-
-    protected SchemaContext schemaContext;
     protected InMemoryDOMDataStore domStore;
 
-    abstract public void setUp() throws Exception;
-
-    abstract public void tearDown();
-
     protected void initTestNode() throws Exception {
         final YangInstanceIdentifier testPath = YangInstanceIdentifier.builder(BenchmarkModel.TEST_PATH)
             .build();
@@ -100,15 +35,6 @@ public abstract class AbstractInMemoryDatastoreWriteTransactionBenchmark {
         cohort.commit().get();
     }
 
-    private DataContainerChild<?, ?> provideOuterListNode() {
-        return ImmutableContainerNodeBuilder
-            .create()
-            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BenchmarkModel.TEST_QNAME))
-            .withChild(
-                ImmutableNodes.mapNodeBuilder(BenchmarkModel.OUTER_LIST_QNAME)
-                    .build()).build();
-    }
-
     @Benchmark
     @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
     @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
diff --git a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryWriteTransactionBenchmark.java b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryWriteTransactionBenchmark.java
new file mode 100644 (file)
index 0000000..6c256eb
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.benchmark;
+
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public abstract class AbstractInMemoryWriteTransactionBenchmark {
+    protected static final int OUTER_LIST_100K = 100000;
+    protected static final int OUTER_LIST_50K = 50000;
+    protected static final int OUTER_LIST_10K = 10000;
+
+    protected static final YangInstanceIdentifier[] OUTER_LIST_100K_PATHS = initOuterListPaths(OUTER_LIST_100K);
+    protected static final YangInstanceIdentifier[] OUTER_LIST_50K_PATHS = initOuterListPaths(OUTER_LIST_50K);
+    protected static final YangInstanceIdentifier[] OUTER_LIST_10K_PATHS = initOuterListPaths(OUTER_LIST_10K);
+
+    private static YangInstanceIdentifier[] initOuterListPaths(final int outerListPathsCount) {
+        final YangInstanceIdentifier[] paths = new YangInstanceIdentifier[outerListPathsCount];
+
+        for (int outerListKey = 0; outerListKey < outerListPathsCount; ++outerListKey) {
+            paths[outerListKey] = YangInstanceIdentifier.builder(BenchmarkModel.OUTER_LIST_PATH)
+                .nodeWithKey(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
+                .build();
+        }
+        return paths;
+    }
+
+    protected static final int WARMUP_ITERATIONS = 20;
+    protected static final int MEASUREMENT_ITERATIONS = 20;
+
+    protected static final MapNode ONE_ITEM_INNER_LIST = initInnerListItems(1);
+    protected static final MapNode TWO_ITEM_INNER_LIST = initInnerListItems(2);
+    protected static final MapNode TEN_ITEM_INNER_LIST = initInnerListItems(10);
+
+    private static MapNode initInnerListItems(final int count) {
+        final CollectionNodeBuilder<MapEntryNode, MapNode> mapEntryBuilder = ImmutableNodes
+            .mapNodeBuilder(BenchmarkModel.INNER_LIST_QNAME);
+
+        for (int i = 1; i <= count; ++i) {
+            mapEntryBuilder
+                .withChild(ImmutableNodes.mapEntry(BenchmarkModel.INNER_LIST_QNAME, BenchmarkModel.NAME_QNAME, i));
+        }
+        return mapEntryBuilder.build();
+    }
+
+    protected static final NormalizedNode<?, ?>[] OUTER_LIST_ONE_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_100K, ONE_ITEM_INNER_LIST);
+    protected static final NormalizedNode<?, ?>[] OUTER_LIST_TWO_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_50K, TWO_ITEM_INNER_LIST);
+    protected static final NormalizedNode<?, ?>[] OUTER_LIST_TEN_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_10K, TEN_ITEM_INNER_LIST);
+
+    private static NormalizedNode<?,?>[] initOuterListItems(final int outerListItemsCount, final MapNode innerList) {
+        final NormalizedNode<?,?>[] outerListItems = new NormalizedNode[outerListItemsCount];
+
+        for (int i = 0; i < outerListItemsCount; ++i) {
+            int outerListKey = i;
+            outerListItems[i] = ImmutableNodes.mapEntryBuilder(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
+                .withChild(innerList).build();
+        }
+        return outerListItems;
+    }
+
+    protected SchemaContext schemaContext;
+    abstract public void setUp() throws Exception;
+    abstract public void tearDown();
+
+    protected static DataContainerChild<?, ?> provideOuterListNode() {
+        return ImmutableContainerNodeBuilder
+            .create()
+            .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BenchmarkModel.TEST_QNAME))
+            .withChild(
+                ImmutableNodes.mapNodeBuilder(BenchmarkModel.OUTER_LIST_QNAME)
+                    .build()).build();
+    }
+}
diff --git a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryBrokerWriteTransactionBenchmark.java b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryBrokerWriteTransactionBenchmark.java
new file mode 100644 (file)
index 0000000..a46a6d1
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.benchmark;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+@State(Scope.Thread)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+public class InMemoryBrokerWriteTransactionBenchmark extends AbstractInMemoryBrokerWriteTransactionBenchmark {
+    private ListeningExecutorService executor;
+
+    @Setup(Level.Trial)
+    @Override
+    public void setUp() throws Exception {
+        ListeningExecutorService dsExec = MoreExecutors.sameThreadExecutor();
+        executor = MoreExecutors.listeningDecorator(
+            MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)Executors.newFixedThreadPool(1), 1L, TimeUnit.SECONDS));
+
+        InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", dsExec,
+            MoreExecutors.sameThreadExecutor());
+        InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", dsExec,
+            MoreExecutors.sameThreadExecutor());
+        Map<LogicalDatastoreType, DOMStore> datastores = ImmutableMap.of(
+            LogicalDatastoreType.OPERATIONAL, (DOMStore)operStore,
+            LogicalDatastoreType.CONFIGURATION, configStore);
+
+        domBroker = new DOMDataBrokerImpl(datastores, executor);
+        schemaContext = BenchmarkModel.createTestContext();
+        configStore.onGlobalContextUpdated(schemaContext);
+        operStore.onGlobalContextUpdated(schemaContext);
+        initTestNode();
+    }
+
+    @Override
+    public void tearDown() {
+        domBroker.close();
+        executor.shutdown();
+    }
+}
index 9edba85..de74867 100644 (file)
@@ -73,7 +73,6 @@ public class Leader extends AbstractRaftActorBehavior {
     private final Set<String> followers;
 
     private Cancellable heartbeatSchedule = null;
-    private Cancellable appendEntriesSchedule = null;
     private Cancellable installSnapshotSchedule = null;
 
     private List<ClientRequestTracker> trackerList = new ArrayList<>();
index 69af77c..ca864eb 100644 (file)
@@ -5,22 +5,30 @@ import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.Terminated;
-import akka.event.Logging;
 import akka.japi.Creator;
+import akka.japi.Procedure;
+import akka.pattern.Patterns;
 import akka.persistence.RecoveryCompleted;
+import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.ByteString;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
@@ -28,6 +36,9 @@ import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
 import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayInputStream;
@@ -42,10 +53,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 public class RaftActorTest extends AbstractActorTest {
 
@@ -59,6 +80,7 @@ public class RaftActorTest extends AbstractActorTest {
     public static class MockRaftActor extends RaftActor {
 
         private final DataPersistenceProvider dataPersistenceProvider;
+        private final RaftActor delegate;
 
         public static final class MockRaftActorCreator implements Creator<MockRaftActor> {
             private final Map<String, String> peerAddresses;
@@ -81,14 +103,13 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
-        private final CountDownLatch applyRecoverySnapshot = new CountDownLatch(1);
-        private final CountDownLatch applyStateLatch = new CountDownLatch(1);
 
         private final List<Object> state;
 
         public MockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config, DataPersistenceProvider dataPersistenceProvider) {
             super(id, peerAddresses, config);
             state = new ArrayList<>();
+            this.delegate = mock(RaftActor.class);
             if(dataPersistenceProvider == null){
                 this.dataPersistenceProvider = new PersistentDataProvider();
             } else {
@@ -104,10 +125,6 @@ public class RaftActorTest extends AbstractActorTest {
             }
         }
 
-        public CountDownLatch getApplyRecoverySnapshotLatch(){
-            return applyRecoverySnapshot;
-        }
-
         public List<Object> getState() {
             return state;
         }
@@ -124,9 +141,13 @@ public class RaftActorTest extends AbstractActorTest {
 
 
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
-            applyStateLatch.countDown();
+            delegate.applyState(clientActor, identifier, data);
+            LOG.info("applyState called");
         }
 
+
+
+
         @Override
         protected void startLogRecoveryBatch(int maxBatchSize) {
         }
@@ -142,12 +163,13 @@ public class RaftActorTest extends AbstractActorTest {
 
         @Override
         protected void onRecoveryComplete() {
+            delegate.onRecoveryComplete();
             recoveryComplete.countDown();
         }
 
         @Override
         protected void applyRecoverySnapshot(ByteString snapshot) {
-            applyRecoverySnapshot.countDown();
+            delegate.applyRecoverySnapshot(snapshot);
             try {
                 Object data = toObject(snapshot);
                 System.out.println("!!!!!applyRecoverySnapshot: "+data);
@@ -160,12 +182,15 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override protected void createSnapshot() {
+            delegate.createSnapshot();
         }
 
         @Override protected void applySnapshot(ByteString snapshot) {
+            delegate.applySnapshot(snapshot);
         }
 
         @Override protected void onStateChanged() {
+            delegate.onStateChanged();
         }
 
         @Override
@@ -215,46 +240,64 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
 
-        public boolean waitForStartup(){
+        public ActorRef getRaftActor() {
+            return raftActor;
+        }
+
+        public boolean waitForLogMessage(final Class logEventClass, String message){
             // Wait for a specific log message to show up
             return
-                new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
+                new JavaTestKit.EventFilter<Boolean>(logEventClass
                 ) {
                     @Override
                     protected Boolean run() {
                         return true;
                     }
                 }.from(raftActor.path().toString())
-                    .message("Switching from behavior Candidate to Leader")
+                    .message(message)
                     .occurrences(1).exec();
 
 
         }
 
-        public void findLeader(final String expectedLeader){
-            raftActor.tell(new FindLeader(), getRef());
-
-            FindLeaderReply reply = expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
-            assertEquals("getLeaderActor", expectedLeader, reply.getLeaderActor());
+        protected void waitUntilLeader(){
+            waitUntilLeader(raftActor);
         }
 
-        public ActorRef getRaftActor() {
-            return raftActor;
+        protected void waitUntilLeader(ActorRef actorRef) {
+            FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
+            for(int i = 0; i < 20 * 5; i++) {
+                Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
+                try {
+                    FindLeaderReply resp = (FindLeaderReply) Await.result(future, duration);
+                    if(resp.getLeaderActor() != null) {
+                        return;
+                    }
+                } catch(TimeoutException e) {
+                } catch(Exception e) {
+                    System.err.println("FindLeader threw ex");
+                    e.printStackTrace();
+                }
+
+
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+
+            Assert.fail("Leader not found for actorRef " + actorRef.path());
         }
+
     }
 
 
     @Test
     public void testConstruction() {
-        boolean started = new RaftActorTestKit(getSystem(), "testConstruction").waitForStartup();
-        assertEquals(true, started);
+        new RaftActorTestKit(getSystem(), "testConstruction").waitUntilLeader();
     }
 
     @Test
     public void testFindLeaderWhenLeaderIsSelf(){
         RaftActorTestKit kit = new RaftActorTestKit(getSystem(), "testFindLeader");
-        kit.waitForStartup();
-        kit.findLeader(kit.getRaftActor().path().toString());
+        kit.waitUntilLeader();
     }
 
     @Test
@@ -358,6 +401,11 @@ public class RaftActorTest extends AbstractActorTest {
                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
 
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                // Wait for akka's recovery to complete so it doesn't interfere.
+                mockRaftActor.waitForRecoveryComplete();
+
                 ByteString snapshotBytes  = fromObject(Arrays.asList(
                         new MockRaftActorContext.MockPayload("A"),
                         new MockRaftActorContext.MockPayload("B"),
@@ -367,13 +415,9 @@ public class RaftActorTest extends AbstractActorTest {
                 Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
                         Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
 
-                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
-
-                assertEquals("apply recovery snapshot", true, applyRecoverySnapshotLatch.await(5, TimeUnit.SECONDS));
+                verify(mockRaftActor.delegate).applyRecoverySnapshot(eq(snapshotBytes));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -402,8 +446,6 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
 
-                mockRaftActor.waitForRecoveryComplete();
-
                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
 
             }};
@@ -430,6 +472,9 @@ public class RaftActorTest extends AbstractActorTest {
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
+                // Wait for akka's recovery to complete so it doesn't interfere.
+                mockRaftActor.waitForRecoveryComplete();
+
                 ByteString snapshotBytes  = fromObject(Arrays.asList(
                         new MockRaftActorContext.MockPayload("A"),
                         new MockRaftActorContext.MockPayload("B"),
@@ -441,9 +486,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
 
-                CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
-
-                assertEquals("apply recovery snapshot", false, applyRecoverySnapshotLatch.await(1, TimeUnit.SECONDS));
+                verify(mockRaftActor.delegate, times(0)).applyRecoverySnapshot(any(ByteString.class));
 
                 mockRaftActor.onReceiveRecover(new ReplicatedLogImplEntry(0, 1, new MockRaftActorContext.MockPayload("A")));
 
@@ -470,8 +513,6 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
 
-                mockRaftActor.waitForRecoveryComplete();
-
                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
             }};
     }
@@ -516,18 +557,19 @@ public class RaftActorTest extends AbstractActorTest {
 
                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-                CountDownLatch persistLatch = new CountDownLatch(1);
-                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
-                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
-                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
-                mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class)));
+                MockRaftActorContext.MockReplicatedLogEntry logEntry = new MockRaftActorContext.MockReplicatedLogEntry(10, 10, mock(Payload.class));
 
-                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+                mockRaftActor.getRaftActorContext().getReplicatedLog().appendAndPersist(logEntry);
+
+                verify(dataPersistenceProvider).persist(eq(logEntry), any(Procedure.class));
 
                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
 
@@ -545,12 +587,10 @@ public class RaftActorTest extends AbstractActorTest {
 
                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-                CountDownLatch persistLatch = new CountDownLatch(2);
-                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
-                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
-                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -558,7 +598,8 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.getRaftActorContext().getReplicatedLog().removeFromAndPersist(0);
 
-                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+                verify(dataPersistenceProvider, times(2)).persist(anyObject(), any(Procedure.class));
+
 
                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
 
@@ -576,18 +617,16 @@ public class RaftActorTest extends AbstractActorTest {
 
                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-                CountDownLatch persistLatch = new CountDownLatch(1);
-                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
-                dataPersistenceProviderMonitor.setPersistLatch(persistLatch);
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
-                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
                 mockRaftActor.onReceiveCommand(new ApplyLogEntries(10));
 
-                assertEquals("Persist called", true, persistLatch.await(5, TimeUnit.SECONDS));
+                verify(dataPersistenceProvider, times(1)).persist(anyObject(), any(Procedure.class));
 
                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
 
@@ -605,12 +644,10 @@ public class RaftActorTest extends AbstractActorTest {
 
                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-                CountDownLatch persistLatch = new CountDownLatch(1);
-                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
-                dataPersistenceProviderMonitor.setSaveSnapshotLatch(persistLatch);
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
 
                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
-                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
 
                 MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
 
@@ -624,7 +661,7 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
 
-                assertEquals("Save Snapshot called", true, persistLatch.await(5, TimeUnit.SECONDS));
+                verify(dataPersistenceProvider).saveSnapshot(anyObject());
 
                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
 
@@ -642,11 +679,148 @@ public class RaftActorTest extends AbstractActorTest {
 
                 config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
 
-                CountDownLatch deleteMessagesLatch = new CountDownLatch(1);
-                CountDownLatch deleteSnapshotsLatch = new CountDownLatch(1);
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,0, mock(Payload.class)));
+                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,1, mock(Payload.class)));
+                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,2, mock(Payload.class)));
+                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,3, mock(Payload.class)));
+                mockRaftActor.getReplicatedLog().append(new MockRaftActorContext.MockReplicatedLogEntry(1,4, mock(Payload.class)));
+
+                ByteString snapshotBytes = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+                mockRaftActor.onReceiveCommand(new CaptureSnapshot(-1, 1, 2, 1));
+
+                verify(mockRaftActor.delegate).createSnapshot();
+
+                mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
+
+                mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
+
+                verify(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+
+                verify(dataPersistenceProvider).deleteMessages(100);
+
+                assertNotNull("Snapshot should not be null", mockRaftActor.getReplicatedLog().getSnapshot());
+
+                assertEquals(2, mockRaftActor.getReplicatedLog().size());
+
+                assertNotNull(mockRaftActor.getReplicatedLog().get(3));
+                assertNotNull(mockRaftActor.getReplicatedLog().get(4));
+
+                // Index 2 will not be in the log because it was removed due to snapshotting
+                assertNull(mockRaftActor.getReplicatedLog().get(2));
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testApplyState() throws Exception {
+
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testApplyState";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProvider), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
+                        new MockRaftActorContext.MockPayload("F"));
+
+                mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, "apply-state", entry));
+
+                verify(mockRaftActor.delegate).applyState(eq(mockActorRef), eq("apply-state"), anyObject());
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+
+
+    }
+
+    @Test
+    public void testApplySnapshot() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testApplySnapshot";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
+                DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
+
+                TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
+                        Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
+
+                MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+                ReplicatedLog oldReplicatedLog = mockRaftActor.getReplicatedLog();
+
+                oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,0,mock(Payload.class)));
+                oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,1,mock(Payload.class)));
+                oldReplicatedLog.append(new MockRaftActorContext.MockReplicatedLogEntry(1,2,mock(Payload.class)));
+
+                ByteString snapshotBytes = fromObject(Arrays.asList(
+                        new MockRaftActorContext.MockPayload("A"),
+                        new MockRaftActorContext.MockPayload("B"),
+                        new MockRaftActorContext.MockPayload("C"),
+                        new MockRaftActorContext.MockPayload("D")));
+
+                Snapshot snapshot = mock(Snapshot.class);
+
+                doReturn(snapshotBytes.toByteArray()).when(snapshot).getState();
+
+                doReturn(3L).when(snapshot).getLastAppliedIndex();
+
+                mockRaftActor.onReceiveCommand(new ApplySnapshot(snapshot));
+
+                verify(mockRaftActor.delegate).applySnapshot(eq(snapshotBytes));
+
+                assertTrue("The replicatedLog should have changed", oldReplicatedLog != mockRaftActor.getReplicatedLog());
+
+                assertEquals("lastApplied should be same as in the snapshot", (Long) 3L, mockRaftActor.getLastApplied());
+
+                assertEquals(0, mockRaftActor.getReplicatedLog().size());
+
+                mockActorRef.tell(PoisonPill.getInstance(), getRef());
+
+            }
+        };
+    }
+
+    @Test
+    public void testSaveSnapshotFailure() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                String persistenceId = "testSaveSnapshotFailure";
+
+                DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+
+                config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
+
                 DataPersistenceProviderMonitor dataPersistenceProviderMonitor = new DataPersistenceProviderMonitor();
-                dataPersistenceProviderMonitor.setDeleteMessagesLatch(deleteMessagesLatch);
-                dataPersistenceProviderMonitor.setDeleteSnapshotsLatch(deleteSnapshotsLatch);
 
                 TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
                         Collections.EMPTY_MAP, Optional.<ConfigParams>of(config), dataPersistenceProviderMonitor), persistenceId);
@@ -663,11 +837,13 @@ public class RaftActorTest extends AbstractActorTest {
 
                 mockRaftActor.onReceiveCommand(new CaptureSnapshotReply(snapshotBytes));
 
-                mockRaftActor.onReceiveCommand(new SaveSnapshotSuccess(new SnapshotMetadata("foo", 100, 100)));
+                mockRaftActor.onReceiveCommand(new SaveSnapshotFailure(new SnapshotMetadata("foobar", 10L, 1234L),
+                        new Exception()));
 
-                assertEquals("Delete Messages called", true, deleteMessagesLatch.await(5, TimeUnit.SECONDS));
+                assertEquals("Snapshot index should not have advanced because save snapshot failed", -1,
+                        mockRaftActor.getReplicatedLog().getSnapshotIndex());
 
-                assertEquals("Delete Snapshots called", true, deleteSnapshotsLatch.await(5, TimeUnit.SECONDS));
+                assertNull("Snapshot should be null", mockRaftActor.getReplicatedLog().getSnapshot());
 
                 mockActorRef.tell(PoisonPill.getInstance(), getRef());
 
index 48543d7..030b4b2 100644 (file)
@@ -12,8 +12,10 @@ import org.opendaylight.controller.cluster.raft.FollowerLogInformation;
 import org.opendaylight.controller.cluster.raft.FollowerLogInformationImpl;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.SerializationUtils;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
@@ -22,6 +24,7 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
 import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages;
@@ -31,6 +34,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -673,6 +677,145 @@ public class LeaderTest extends AbstractRaftActorBehaviorTest {
         }};
     }
 
+    @Test
+    public void testHandleAppendEntriesReplyFailure(){
+        new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                ActorRef followerActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                Map<String, String> peerAddresses = new HashMap();
+                peerAddresses.put("follower-1",
+                    followerActor.path().toString());
+
+                leaderActorContext.setPeerAddresses(peerAddresses);
+
+                Leader leader = new Leader(leaderActorContext);
+
+                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+            }};
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplySuccess() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                ActorRef followerActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                leaderActorContext.setReplicatedLog(
+                    new MockRaftActorContext.MockReplicatedLogBuilder().createEntries(0, 3, 1).build());
+
+                Map<String, String> peerAddresses = new HashMap();
+                peerAddresses.put("follower-1",
+                    followerActor.path().toString());
+
+                leaderActorContext.setPeerAddresses(peerAddresses);
+                leaderActorContext.setCommitIndex(1);
+                leaderActorContext.setLastApplied(1);
+                leaderActorContext.getTermInformation().update(1, "leader");
+
+                Leader leader = new Leader(leaderActorContext);
+
+                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, true, 2, 1);
+
+                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(followerActor, reply);
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+                assertEquals(2, leaderActorContext.getCommitIndex());
+
+                ApplyLogEntries applyLogEntries =
+                    (ApplyLogEntries) MessageCollectorActor.getFirstMatching(leaderActor,
+                        ApplyLogEntries.class);
+
+                assertNotNull(applyLogEntries);
+
+                assertEquals(2, leaderActorContext.getLastApplied());
+
+                assertEquals(2, applyLogEntries.getToIndex());
+
+                List<Object> applyStateList = MessageCollectorActor.getAllMatching(leaderActor,
+                    ApplyState.class);
+
+                assertEquals(1,applyStateList.size());
+
+                ApplyState applyState = (ApplyState) applyStateList.get(0);
+
+                assertEquals(2, applyState.getReplicatedLogEntry().getIndex());
+
+            }};
+    }
+
+    @Test
+    public void testHandleAppendEntriesReplyUnknownFollower(){
+        new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                Leader leader = new Leader(leaderActorContext);
+
+                AppendEntriesReply reply = new AppendEntriesReply("follower-1", 1, false, 10, 1);
+
+                RaftActorBehavior raftActorBehavior = leader.handleAppendEntriesReply(getRef(), reply);
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+            }};
+    }
+
+    @Test
+    public void testHandleRequestVoteReply(){
+        new JavaTestKit(getSystem()) {
+            {
+
+                ActorRef leaderActor =
+                    getSystem().actorOf(Props.create(MessageCollectorActor.class));
+
+                MockRaftActorContext leaderActorContext =
+                    new MockRaftActorContext("leader", getSystem(), leaderActor);
+
+                Leader leader = new Leader(leaderActorContext);
+
+                RaftActorBehavior raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, true));
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+                raftActorBehavior = leader.handleRequestVoteReply(getRef(), new RequestVoteReply(1, false));
+
+                assertEquals(RaftState.Leader, raftActorBehavior.state());
+
+
+            }};
+
+    }
+
     private static class LeaderTestKit extends JavaTestKit {
 
         private LeaderTestKit(ActorSystem actorSystem) {
index 5892845..3e2e4a0 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.google.common.collect.Lists;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -65,4 +66,18 @@ public class MessageCollectorActor extends UntypedActor {
         return null;
     }
 
+    public static List<Object> getAllMatching(ActorRef actor, Class clazz) throws Exception {
+        List<Object> allMessages = getAllMessages(actor);
+
+        List<Object> output = Lists.newArrayList();
+
+        for(Object message : allMessages){
+            if(message.getClass().equals(clazz)){
+                output.add(message);
+            }
+        }
+
+        return output;
+    }
+
 }
index e43b445..2a79a5b 100644 (file)
@@ -11,17 +11,17 @@ public final class ThreePhaseCommitCohortMessages {
   public interface CanCommitTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     boolean hasTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     java.lang.String getTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
@@ -122,17 +122,17 @@ public final class ThreePhaseCommitCohortMessages {
     }
 
     private int bitField0_;
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     public static final int TRANSACTIONID_FIELD_NUMBER = 1;
     private java.lang.Object transactionId_;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public boolean hasTransactionId() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public java.lang.String getTransactionId() {
       java.lang.Object ref = transactionId_;
@@ -149,7 +149,7 @@ public final class ThreePhaseCommitCohortMessages {
       }
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public com.google.protobuf.ByteString
         getTransactionIdBytes() {
@@ -173,10 +173,6 @@ public final class ThreePhaseCommitCohortMessages {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
-      if (!hasTransactionId()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -376,10 +372,6 @@ public final class ThreePhaseCommitCohortMessages {
       }
 
       public final boolean isInitialized() {
-        if (!hasTransactionId()) {
-
-          return false;
-        }
         return true;
       }
 
@@ -402,16 +394,16 @@ public final class ThreePhaseCommitCohortMessages {
       }
       private int bitField0_;
 
-      // required string transactionId = 1;
+      // optional string transactionId = 1;
       private java.lang.Object transactionId_ = "";
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public boolean hasTransactionId() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public java.lang.String getTransactionId() {
         java.lang.Object ref = transactionId_;
@@ -425,7 +417,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public com.google.protobuf.ByteString
           getTransactionIdBytes() {
@@ -441,7 +433,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionId(
           java.lang.String value) {
@@ -454,7 +446,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder clearTransactionId() {
         bitField0_ = (bitField0_ & ~0x00000001);
@@ -463,7 +455,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionIdBytes(
           com.google.protobuf.ByteString value) {
@@ -894,17 +886,17 @@ public final class ThreePhaseCommitCohortMessages {
   public interface AbortTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     boolean hasTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     java.lang.String getTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
@@ -1005,17 +997,17 @@ public final class ThreePhaseCommitCohortMessages {
     }
 
     private int bitField0_;
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     public static final int TRANSACTIONID_FIELD_NUMBER = 1;
     private java.lang.Object transactionId_;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public boolean hasTransactionId() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public java.lang.String getTransactionId() {
       java.lang.Object ref = transactionId_;
@@ -1032,7 +1024,7 @@ public final class ThreePhaseCommitCohortMessages {
       }
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public com.google.protobuf.ByteString
         getTransactionIdBytes() {
@@ -1056,10 +1048,6 @@ public final class ThreePhaseCommitCohortMessages {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
-      if (!hasTransactionId()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -1259,10 +1247,6 @@ public final class ThreePhaseCommitCohortMessages {
       }
 
       public final boolean isInitialized() {
-        if (!hasTransactionId()) {
-
-          return false;
-        }
         return true;
       }
 
@@ -1285,16 +1269,16 @@ public final class ThreePhaseCommitCohortMessages {
       }
       private int bitField0_;
 
-      // required string transactionId = 1;
+      // optional string transactionId = 1;
       private java.lang.Object transactionId_ = "";
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public boolean hasTransactionId() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public java.lang.String getTransactionId() {
         java.lang.Object ref = transactionId_;
@@ -1308,7 +1292,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public com.google.protobuf.ByteString
           getTransactionIdBytes() {
@@ -1324,7 +1308,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionId(
           java.lang.String value) {
@@ -1337,7 +1321,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder clearTransactionId() {
         bitField0_ = (bitField0_ & ~0x00000001);
@@ -1346,7 +1330,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionIdBytes(
           com.google.protobuf.ByteString value) {
@@ -1682,17 +1666,17 @@ public final class ThreePhaseCommitCohortMessages {
   public interface CommitTransactionOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     boolean hasTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     java.lang.String getTransactionId();
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
@@ -1793,17 +1777,17 @@ public final class ThreePhaseCommitCohortMessages {
     }
 
     private int bitField0_;
-    // required string transactionId = 1;
+    // optional string transactionId = 1;
     public static final int TRANSACTIONID_FIELD_NUMBER = 1;
     private java.lang.Object transactionId_;
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public boolean hasTransactionId() {
       return ((bitField0_ & 0x00000001) == 0x00000001);
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public java.lang.String getTransactionId() {
       java.lang.Object ref = transactionId_;
@@ -1820,7 +1804,7 @@ public final class ThreePhaseCommitCohortMessages {
       }
     }
     /**
-     * <code>required string transactionId = 1;</code>
+     * <code>optional string transactionId = 1;</code>
      */
     public com.google.protobuf.ByteString
         getTransactionIdBytes() {
@@ -1844,10 +1828,6 @@ public final class ThreePhaseCommitCohortMessages {
       byte isInitialized = memoizedIsInitialized;
       if (isInitialized != -1) return isInitialized == 1;
 
-      if (!hasTransactionId()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -2047,10 +2027,6 @@ public final class ThreePhaseCommitCohortMessages {
       }
 
       public final boolean isInitialized() {
-        if (!hasTransactionId()) {
-
-          return false;
-        }
         return true;
       }
 
@@ -2073,16 +2049,16 @@ public final class ThreePhaseCommitCohortMessages {
       }
       private int bitField0_;
 
-      // required string transactionId = 1;
+      // optional string transactionId = 1;
       private java.lang.Object transactionId_ = "";
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public boolean hasTransactionId() {
         return ((bitField0_ & 0x00000001) == 0x00000001);
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public java.lang.String getTransactionId() {
         java.lang.Object ref = transactionId_;
@@ -2096,7 +2072,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public com.google.protobuf.ByteString
           getTransactionIdBytes() {
@@ -2112,7 +2088,7 @@ public final class ThreePhaseCommitCohortMessages {
         }
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionId(
           java.lang.String value) {
@@ -2125,7 +2101,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder clearTransactionId() {
         bitField0_ = (bitField0_ & ~0x00000001);
@@ -2134,7 +2110,7 @@ public final class ThreePhaseCommitCohortMessages {
         return this;
       }
       /**
-       * <code>required string transactionId = 1;</code>
+       * <code>optional string transactionId = 1;</code>
        */
       public Builder setTransactionIdBytes(
           com.google.protobuf.ByteString value) {
@@ -3136,11 +3112,11 @@ public final class ThreePhaseCommitCohortMessages {
     java.lang.String[] descriptorData = {
       "\n\014Cohort.proto\022!org.opendaylight.control" +
       "ler.mdsal\"-\n\024CanCommitTransaction\022\025\n\rtra" +
-      "nsactionId\030\001 \002(\t\".\n\031CanCommitTransaction" +
+      "nsactionId\030\001 \001(\t\".\n\031CanCommitTransaction" +
       "Reply\022\021\n\tcanCommit\030\001 \002(\010\")\n\020AbortTransac" +
-      "tion\022\025\n\rtransactionId\030\001 \002(\t\"\027\n\025AbortTran" +
+      "tion\022\025\n\rtransactionId\030\001 \001(\t\"\027\n\025AbortTran" +
       "sactionReply\"*\n\021CommitTransaction\022\025\n\rtra" +
-      "nsactionId\030\001 \002(\t\"\030\n\026CommitTransactionRep" +
+      "nsactionId\030\001 \001(\t\"\030\n\026CommitTransactionRep" +
       "ly\"\026\n\024PreCommitTransaction\"\033\n\031PreCommitT" +
       "ransactionReplyBZ\n8org.opendaylight.cont" +
       "roller.protobuff.messages.cohort3pcB\036Thr",
index 96a39bd..3a1cfaa 100644 (file)
@@ -668,6 +668,16 @@ public final class ShardTransactionMessages {
      */
     com.google.protobuf.ByteString
         getTransactionChainIdBytes();
+
+    // optional int32 messageVersion = 4;
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    boolean hasMessageVersion();
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    int getMessageVersion();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransaction}
@@ -735,6 +745,11 @@ public final class ShardTransactionMessages {
               transactionChainId_ = input.readBytes();
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              messageVersion_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -877,10 +892,27 @@ public final class ShardTransactionMessages {
       }
     }
 
+    // optional int32 messageVersion = 4;
+    public static final int MESSAGEVERSION_FIELD_NUMBER = 4;
+    private int messageVersion_;
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    public boolean hasMessageVersion() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    /**
+     * <code>optional int32 messageVersion = 4;</code>
+     */
+    public int getMessageVersion() {
+      return messageVersion_;
+    }
+
     private void initFields() {
       transactionId_ = "";
       transactionType_ = 0;
       transactionChainId_ = "";
+      messageVersion_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -911,6 +943,9 @@ public final class ShardTransactionMessages {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBytes(3, getTransactionChainIdBytes());
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeInt32(4, messageVersion_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -932,6 +967,10 @@ public final class ShardTransactionMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(3, getTransactionChainIdBytes());
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(4, messageVersion_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1054,6 +1093,8 @@ public final class ShardTransactionMessages {
         bitField0_ = (bitField0_ & ~0x00000002);
         transactionChainId_ = "";
         bitField0_ = (bitField0_ & ~0x00000004);
+        messageVersion_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -1094,6 +1135,10 @@ public final class ShardTransactionMessages {
           to_bitField0_ |= 0x00000004;
         }
         result.transactionChainId_ = transactionChainId_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.messageVersion_ = messageVersion_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1123,6 +1168,9 @@ public final class ShardTransactionMessages {
           transactionChainId_ = other.transactionChainId_;
           onChanged();
         }
+        if (other.hasMessageVersion()) {
+          setMessageVersion(other.getMessageVersion());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1339,6 +1387,39 @@ public final class ShardTransactionMessages {
         return this;
       }
 
+      // optional int32 messageVersion = 4;
+      private int messageVersion_ ;
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public boolean hasMessageVersion() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public int getMessageVersion() {
+        return messageVersion_;
+      }
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public Builder setMessageVersion(int value) {
+        bitField0_ |= 0x00000008;
+        messageVersion_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 messageVersion = 4;</code>
+       */
+      public Builder clearMessageVersion() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        messageVersion_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransaction)
     }
 
@@ -1382,6 +1463,16 @@ public final class ShardTransactionMessages {
      */
     com.google.protobuf.ByteString
         getTransactionIdBytes();
+
+    // optional int32 messageVersion = 3;
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    boolean hasMessageVersion();
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    int getMessageVersion();
   }
   /**
    * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransactionReply}
@@ -1444,6 +1535,11 @@ public final class ShardTransactionMessages {
               transactionId_ = input.readBytes();
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              messageVersion_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -1570,9 +1666,26 @@ public final class ShardTransactionMessages {
       }
     }
 
+    // optional int32 messageVersion = 3;
+    public static final int MESSAGEVERSION_FIELD_NUMBER = 3;
+    private int messageVersion_;
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    public boolean hasMessageVersion() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional int32 messageVersion = 3;</code>
+     */
+    public int getMessageVersion() {
+      return messageVersion_;
+    }
+
     private void initFields() {
       transactionActorPath_ = "";
       transactionId_ = "";
+      messageVersion_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -1600,6 +1713,9 @@ public final class ShardTransactionMessages {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeBytes(2, getTransactionIdBytes());
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeInt32(3, messageVersion_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -1617,6 +1733,10 @@ public final class ShardTransactionMessages {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(2, getTransactionIdBytes());
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(3, messageVersion_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1737,6 +1857,8 @@ public final class ShardTransactionMessages {
         bitField0_ = (bitField0_ & ~0x00000001);
         transactionId_ = "";
         bitField0_ = (bitField0_ & ~0x00000002);
+        messageVersion_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
 
@@ -1773,6 +1895,10 @@ public final class ShardTransactionMessages {
           to_bitField0_ |= 0x00000002;
         }
         result.transactionId_ = transactionId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.messageVersion_ = messageVersion_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1799,6 +1925,9 @@ public final class ShardTransactionMessages {
           transactionId_ = other.transactionId_;
           onChanged();
         }
+        if (other.hasMessageVersion()) {
+          setMessageVersion(other.getMessageVersion());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1982,6 +2111,39 @@ public final class ShardTransactionMessages {
         return this;
       }
 
+      // optional int32 messageVersion = 3;
+      private int messageVersion_ ;
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public boolean hasMessageVersion() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public int getMessageVersion() {
+        return messageVersion_;
+      }
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public Builder setMessageVersion(int value) {
+        bitField0_ |= 0x00000004;
+        messageVersion_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 messageVersion = 3;</code>
+       */
+      public Builder clearMessageVersion() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        messageVersion_ = 0;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransactionReply)
     }
 
@@ -7753,37 +7915,38 @@ public final class ShardTransactionMessages {
     java.lang.String[] descriptorData = {
       "\n\026ShardTransaction.proto\022!org.opendaylig" +
       "ht.controller.mdsal\032\014Common.proto\"\022\n\020Clo" +
-      "seTransaction\"\027\n\025CloseTransactionReply\"_" +
+      "seTransaction\"\027\n\025CloseTransactionReply\"w" +
       "\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" +
       "(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" +
-      "onChainId\030\003 \001(\t\"M\n\026CreateTransactionRepl" +
-      "y\022\034\n\024transactionActorPath\030\001 \002(\t\022\025\n\rtrans" +
-      "actionId\030\002 \002(\t\"\022\n\020ReadyTransaction\"*\n\025Re" +
-      "adyTransactionReply\022\021\n\tactorPath\030\001 \002(\t\"l" +
-      "\n\nDeleteData\022^\n\037instanceIdentifierPathAr",
+      "onChainId\030\003 \001(\t\022\026\n\016messageVersion\030\004 \001(\005\"" +
+      "e\n\026CreateTransactionReply\022\034\n\024transaction" +
+      "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\022\026" +
+      "\n\016messageVersion\030\003 \001(\005\"\022\n\020ReadyTransacti" +
+      "on\"*\n\025ReadyTransactionReply\022\021\n\tactorPath",
+      "\030\001 \002(\t\"l\n\nDeleteData\022^\n\037instanceIdentifi" +
+      "erPathArguments\030\001 \002(\01325.org.opendaylight" +
+      ".controller.mdsal.InstanceIdentifier\"\021\n\017" +
+      "DeleteDataReply\"j\n\010ReadData\022^\n\037instanceI" +
+      "dentifierPathArguments\030\001 \002(\01325.org.opend" +
+      "aylight.controller.mdsal.InstanceIdentif" +
+      "ier\"P\n\rReadDataReply\022?\n\016normalizedNode\030\001" +
+      " \001(\0132\'.org.opendaylight.controller.mdsal" +
+      ".Node\"\254\001\n\tWriteData\022^\n\037instanceIdentifie" +
+      "rPathArguments\030\001 \002(\01325.org.opendaylight.",
+      "controller.mdsal.InstanceIdentifier\022?\n\016n" +
+      "ormalizedNode\030\002 \002(\0132\'.org.opendaylight.c" +
+      "ontroller.mdsal.Node\"\020\n\016WriteDataReply\"\254" +
+      "\001\n\tMergeData\022^\n\037instanceIdentifierPathAr" +
       "guments\030\001 \002(\01325.org.opendaylight.control" +
-      "ler.mdsal.InstanceIdentifier\"\021\n\017DeleteDa" +
-      "taReply\"j\n\010ReadData\022^\n\037instanceIdentifie" +
-      "rPathArguments\030\001 \002(\01325.org.opendaylight." +
-      "controller.mdsal.InstanceIdentifier\"P\n\rR" +
-      "eadDataReply\022?\n\016normalizedNode\030\001 \001(\0132\'.o" +
-      "rg.opendaylight.controller.mdsal.Node\"\254\001" +
-      "\n\tWriteData\022^\n\037instanceIdentifierPathArg" +
-      "uments\030\001 \002(\01325.org.opendaylight.controll" +
-      "er.mdsal.InstanceIdentifier\022?\n\016normalize",
-      "dNode\030\002 \002(\0132\'.org.opendaylight.controlle" +
-      "r.mdsal.Node\"\020\n\016WriteDataReply\"\254\001\n\tMerge" +
-      "Data\022^\n\037instanceIdentifierPathArguments\030" +
-      "\001 \002(\01325.org.opendaylight.controller.mdsa" +
-      "l.InstanceIdentifier\022?\n\016normalizedNode\030\002" +
-      " \002(\0132\'.org.opendaylight.controller.mdsal" +
-      ".Node\"\020\n\016MergeDataReply\"l\n\nDataExists\022^\n" +
-      "\037instanceIdentifierPathArguments\030\001 \002(\01325" +
-      ".org.opendaylight.controller.mdsal.Insta" +
-      "nceIdentifier\"!\n\017DataExistsReply\022\016\n\006exis",
-      "ts\030\001 \002(\010BV\n:org.opendaylight.controller." +
-      "protobuff.messages.transactionB\030ShardTra" +
-      "nsactionMessages"
+      "ler.mdsal.InstanceIdentifier\022?\n\016normaliz" +
+      "edNode\030\002 \002(\0132\'.org.opendaylight.controll" +
+      "er.mdsal.Node\"\020\n\016MergeDataReply\"l\n\nDataE" +
+      "xists\022^\n\037instanceIdentifierPathArguments" +
+      "\030\001 \002(\01325.org.opendaylight.controller.mds",
+      "al.InstanceIdentifier\"!\n\017DataExistsReply" +
+      "\022\016\n\006exists\030\001 \002(\010BV\n:org.opendaylight.con" +
+      "troller.protobuff.messages.transactionB\030" +
+      "ShardTransactionMessages"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -7807,13 +7970,13 @@ public final class ShardTransactionMessages {
           internal_static_org_opendaylight_controller_mdsal_CreateTransaction_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CreateTransaction_descriptor,
-              new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", });
+              new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", "MessageVersion", });
           internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor =
             getDescriptor().getMessageTypes().get(3);
           internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor,
-              new java.lang.String[] { "TransactionActorPath", "TransactionId", });
+              new java.lang.String[] { "TransactionActorPath", "TransactionId", "MessageVersion", });
           internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_descriptor =
             getDescriptor().getMessageTypes().get(4);
           internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_fieldAccessorTable = new
index 49c6cd0..222f68a 100644 (file)
@@ -5,7 +5,7 @@ option java_outer_classname = "ThreePhaseCommitCohortMessages";
 
 
 message CanCommitTransaction{
-  required string transactionId = 1;
+  optional string transactionId = 1;
 }
 
 message CanCommitTransactionReply{
@@ -14,7 +14,7 @@ message CanCommitTransactionReply{
 }
 
 message AbortTransaction{
-  required string transactionId = 1;
+  optional string transactionId = 1;
 }
 
 message AbortTransactionReply {
@@ -22,7 +22,7 @@ message AbortTransactionReply {
 }
 
 message CommitTransaction{
-  required string transactionId = 1;
+  optional string transactionId = 1;
 }
 
 message CommitTransactionReply{
index 2658147..c5e4ee4 100644 (file)
@@ -16,12 +16,13 @@ message CreateTransaction{
   required string transactionId = 1;
   required int32  transactionType =2;
   optional string transactionChainId = 3;
+  optional int32 messageVersion = 4;
 }
 
 message CreateTransactionReply{
-required string transactionActorPath = 1;
-required string transactionId = 2;
-
+  required string transactionActorPath = 1;
+  required string transactionId = 2;
+  optional int32 messageVersion = 3;
 }
 
 message ReadyTransaction{
@@ -29,7 +30,7 @@ message ReadyTransaction{
 }
 
 message ReadyTransactionReply{
-required string actorPath = 1;
+  required string actorPath = 1;
 }
 
 message DeleteData {
index 7d67e08..5ea9b30 100644 (file)
@@ -31,6 +31,7 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
+import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
@@ -374,7 +375,8 @@ public class Shard extends RaftActor {
     }
 
     private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
-        LOG.debug("Readying transaction {}", ready.getTransactionID());
+        LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
+                ready.getTxnClientVersion());
 
         // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
         // commitCoordinator in preparation for the subsequent three phase commit initiated by
@@ -382,12 +384,22 @@ public class Shard extends RaftActor {
         commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
                 ready.getModification());
 
-        // Return our actor path as we'll handle the three phase commit.
-        ReadyTransactionReply readyTransactionReply =
-            new ReadyTransactionReply(Serialization.serializedActorPath(self()));
-        getSender().tell(
-            ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
-            getSelf());
+        // Return our actor path as we'll handle the three phase commit, except if the Tx client
+        // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
+        // node. In that case, the subsequent 3-phase commit messages won't contain the
+        // transactionId so to maintain backwards compatibility, we create a separate cohort actor
+        // to provide the compatible behavior.
+        ActorRef replyActorPath = self();
+        if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+            LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+            replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+                    ready.getTransactionID()));
+        }
+
+        ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
+                Serialization.serializedActorPath(replyActorPath));
+        getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+                readyTransactionReply, getSelf());
     }
 
     private void handleAbortTransaction(AbortTransaction abort) {
@@ -465,10 +477,8 @@ public class Shard extends RaftActor {
         }
     }
 
-    private ActorRef createTypedTransactionActor(
-        int transactionType,
-        ShardTransactionIdentifier transactionId,
-        String transactionChainId ) {
+    private ActorRef createTypedTransactionActor(int transactionType,
+            ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
 
         DOMStoreTransactionFactory factory = store;
 
@@ -492,7 +502,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
                         schemaContext,datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
 
         } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
 
@@ -501,7 +512,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
                         schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
 
 
         } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
@@ -511,7 +523,8 @@ public class Shard extends RaftActor {
             return getContext().actorOf(
                 ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
                         schemaContext, datastoreContext, shardMBean,
-                        transactionId.getRemoteTransactionId()), transactionId.toString());
+                        transactionId.getRemoteTransactionId(), clientVersion),
+                        transactionId.toString());
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
@@ -521,10 +534,12 @@ public class Shard extends RaftActor {
 
     private void createTransaction(CreateTransaction createTransaction) {
         createTransaction(createTransaction.getTransactionType(),
-            createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+            createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+            createTransaction.getVersion());
     }
 
-    private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
+    private ActorRef createTransaction(int transactionType, String remoteTransactionId,
+            String transactionChainId, int clientVersion) {
 
         ShardTransactionIdentifier transactionId =
             ShardTransactionIdentifier.builder()
@@ -533,8 +548,8 @@ public class Shard extends RaftActor {
         if(LOG.isDebugEnabled()) {
             LOG.debug("Creating transaction : {} ", transactionId);
         }
-        ActorRef transactionActor =
-            createTypedTransactionActor(transactionType, transactionId, transactionChainId);
+        ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
+                transactionChainId, clientVersion);
 
         getSender()
             .tell(new CreateTransactionReply(
@@ -599,7 +614,7 @@ public class Shard extends RaftActor {
         LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
                     listenerRegistration.path());
 
-        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+        getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
     }
 
     private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
@@ -765,7 +780,8 @@ public class Shard extends RaftActor {
             // so that this actor does not get block building the snapshot
             createSnapshotTransaction = createTransaction(
                 TransactionProxy.TransactionType.READ_ONLY.ordinal(),
-                "createSnapshot" + ++createSnapshotTransactionCounter, "");
+                "createSnapshot" + ++createSnapshotTransactionCounter, "",
+                CreateTransaction.CURRENT_VERSION);
 
             createSnapshotTransaction.tell(
                 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
index d12e999..9d8f572 100644 (file)
@@ -26,8 +26,9 @@ public class ShardReadTransaction extends ShardTransaction {
     private final DOMStoreReadTransaction transaction;
 
     public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
-        super(shardActor, schemaContext, shardStats, transactionID);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+            int txnClientVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
         this.transaction = transaction;
     }
 
index b1fd02d..e558677 100644 (file)
@@ -26,8 +26,9 @@ public class ShardReadWriteTransaction extends ShardWriteTransaction {
     private final DOMStoreReadWriteTransaction transaction;
 
     public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
-        super(transaction, shardActor, schemaContext, shardStats, transactionID);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+            int txnClientVersion) {
+        super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
         this.transaction = transaction;
     }
 
index 32de47f..59bb4bf 100644 (file)
@@ -57,26 +57,29 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  */
 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
 
+    protected static final boolean SERIALIZED_REPLY = true;
+
     private final ActorRef shardActor;
     private final SchemaContext schemaContext;
     private final ShardStats shardStats;
     private final String transactionID;
-    protected static final boolean SERIALIZED_REPLY = true;
+    private final int txnClientVersion;
 
     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
-            ShardStats shardStats, String transactionID) {
+            ShardStats shardStats, String transactionID, int txnClientVersion) {
         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
         this.shardActor = shardActor;
         this.schemaContext = schemaContext;
         this.shardStats = shardStats;
         this.transactionID = transactionID;
+        this.txnClientVersion = txnClientVersion;
     }
 
     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
             SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
-            String transactionID) {
+            String transactionID, int txnClientVersion) {
         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
-           datastoreContext, shardStats, transactionID));
+           datastoreContext, shardStats, transactionID, txnClientVersion));
     }
 
     protected abstract DOMStoreTransaction getDOMStoreTransaction();
@@ -93,6 +96,10 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         return schemaContext;
     }
 
+    protected int getTxnClientVersion() {
+        return txnClientVersion;
+    }
+
     @Override
     public void handleReceive(Object message) throws Exception {
         if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
@@ -169,16 +176,18 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
         final DatastoreContext datastoreContext;
         final ShardStats shardStats;
         final String transactionID;
+        final int txnClientVersion;
 
         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
                 SchemaContext schemaContext, DatastoreContext datastoreContext,
-                ShardStats shardStats, String transactionID) {
+                ShardStats shardStats, String transactionID, int txnClientVersion) {
             this.transaction = transaction;
             this.shardActor = shardActor;
             this.shardStats = shardStats;
             this.schemaContext = schemaContext;
             this.datastoreContext = datastoreContext;
             this.transactionID = transactionID;
+            this.txnClientVersion = txnClientVersion;
         }
 
         @Override
@@ -186,13 +195,13 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
             ShardTransaction tx;
             if(transaction instanceof DOMStoreReadWriteTransaction) {
                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID);
+                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
             } else if(transaction instanceof DOMStoreReadTransaction) {
                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
-                        schemaContext, shardStats, transactionID);
+                        schemaContext, shardStats, transactionID, txnClientVersion);
             } else {
                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
-                        shardActor, schemaContext, shardStats, transactionID);
+                        shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
             }
 
             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
index 943a82f..78c6a55 100644 (file)
@@ -63,19 +63,22 @@ public class ShardTransactionChain extends AbstractUntypedActor {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId()), transactionName);
+                            createTransaction.getTransactionId(),
+                            createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId()), transactionName);
+                            createTransaction.getTransactionId(),
+                            createTransaction.getVersion()), transactionName);
         } else if (createTransaction.getTransactionType() ==
                 TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
             return getContext().actorOf(
                     ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
                             schemaContext, datastoreContext, shardStats,
-                            createTransaction.getTransactionId()), transactionName);
+                            createTransaction.getTransactionId(),
+                            createTransaction.getVersion()), transactionName);
         } else {
             throw new IllegalArgumentException (
                     "CreateTransaction message has unidentified transaction type=" +
index b0eaf98..44f2c7b 100644 (file)
@@ -42,8 +42,9 @@ public class ShardWriteTransaction extends ShardTransaction {
     private final DOMStoreWriteTransaction transaction;
 
     public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
-            SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
-        super(shardActor, schemaContext, shardStats, transactionID);
+            SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+            int txnClientVersion) {
+        super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
         this.transaction = transaction;
     }
 
@@ -144,8 +145,8 @@ public class ShardWriteTransaction extends ShardTransaction {
 
         DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
 
-        getShardActor().forward(new ForwardedReadyTransaction(transactionID, cohort, modification,
-                returnSerialized), getContext());
+        getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(),
+                cohort, modification, returnSerialized), getContext());
 
         // The shard will handle the commit from here so we're no longer needed - self-destruct.
         getSelf().tell(PoisonPill.getInstance(), getSelf());
index 239207a..ffb1ab7 100644 (file)
@@ -49,6 +49,8 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,7 +58,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
 
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
@@ -668,8 +669,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             // Check if TxActor is created in the same node
             boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
 
-            transactionContext = new TransactionContextImpl(transactionActor, identifier,
-                actorContext, schemaContext, isTxActorLocal);
+            transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+                actorContext, schemaContext, isTxActorLocal, reply.getVersion());
         }
     }
 
@@ -712,17 +713,21 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
         private final ActorContext actorContext;
         private final SchemaContext schemaContext;
+        private final String transactionPath;
         private final ActorSelection actor;
         private final boolean isTxActorLocal;
+        private final int remoteTransactionVersion;
 
-        private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+        private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
                 ActorContext actorContext, SchemaContext schemaContext,
-                boolean isTxActorLocal) {
+                boolean isTxActorLocal, int remoteTransactionVersion) {
             super(identifier);
+            this.transactionPath = transactionPath;
             this.actor = actor;
             this.actorContext = actorContext;
             this.schemaContext = schemaContext;
             this.isTxActorLocal = isTxActorLocal;
+            this.remoteTransactionVersion = remoteTransactionVersion;
         }
 
         private ActorSelection getActor() {
@@ -783,7 +788,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                     } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
-                        return actorContext.actorSelection(reply.getCohortPath());
+                        String cohortPath = reply.getCohortPath();
+
+                        // In Helium we used to return the local path of the actor which represented
+                        // a remote ThreePhaseCommitCohort. The local path would then be converted to
+                        // a remote path using this resolvePath method. To maintain compatibility with
+                        // a Helium node we need to continue to do this conversion.
+                        // At some point in the future when upgrades from Helium are not supported
+                        // we could remove this code to resolvePath and just use the cohortPath as the
+                        // resolved cohortPath
+                        if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+                            cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
+                        }
+
+                        return actorContext.actorSelection(cohortPath);
 
                     } else {
                         // Throwing an exception here will fail the Future.
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/BackwardsCompatibleThreePhaseCommitCohort.java
new file mode 100644 (file)
index 0000000..30ab97c
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.compat;
+
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+
+/**
+ * An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit
+ * messages don't contain the transactionId. This actor just forwards a new message containing the
+ * transactionId to the parent Shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor {
+
+    private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
+
+    private final String transactionId;
+
+    private BackwardsCompatibleThreePhaseCommitCohort(String transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CanCommitTransaction");
+
+            getContext().parent().forward(new CanCommitTransaction(transactionId).toSerializable(),
+                    getContext());
+        } else if(message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort PreCommitTransaction");
+
+            // The Shard doesn't need the PreCommitTransaction message so just return the reply here.
+            getSender().tell(new PreCommitTransactionReply().toSerializable(), self());
+        } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CommitTransaction");
+
+            getContext().parent().forward(new CommitTransaction(transactionId).toSerializable(),
+                    getContext());
+
+            // We're done now - we can self-destruct
+            self().tell(PoisonPill.getInstance(), self());
+        } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+            LOG.debug("BackwardsCompatibleThreePhaseCommitCohort AbortTransaction");
+
+            getContext().parent().forward(new AbortTransaction(transactionId).toSerializable(),
+                    getContext());
+            self().tell(PoisonPill.getInstance(), self());
+        }
+    }
+
+    public static Props props(String transactionId) {
+        return Props.create(new BackwardsCompatibleThreePhaseCommitCohortCreator(transactionId));
+    }
+
+    private static class BackwardsCompatibleThreePhaseCommitCohortCreator
+                                  implements Creator<BackwardsCompatibleThreePhaseCommitCohort> {
+        private static final long serialVersionUID = 1L;
+
+        private final String transactionId;
+
+        BackwardsCompatibleThreePhaseCommitCohortCreator(String transactionId) {
+            this.transactionId = transactionId;
+        }
+
+        @Override
+        public BackwardsCompatibleThreePhaseCommitCohort create() throws Exception {
+            return new BackwardsCompatibleThreePhaseCommitCohort(transactionId);
+        }
+    }
+}
index 361d406..bf82e66 100644 (file)
@@ -13,24 +13,33 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti
 
 
 public class CreateTransaction implements SerializableMessage {
-    public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class;
+    public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
+            ShardTransactionMessages.CreateTransaction.class;
+
+    public static final int HELIUM_1_VERSION = 1;
+    public static final int CURRENT_VERSION = HELIUM_1_VERSION;
+
     private final String transactionId;
     private final int transactionType;
     private final String transactionChainId;
+    private final int version;
 
     public CreateTransaction(String transactionId, int transactionType) {
         this(transactionId, transactionType, "");
     }
 
     public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
+        this(transactionId, transactionType, transactionChainId, CURRENT_VERSION);
+    }
 
+    private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
+            int version) {
         this.transactionId = transactionId;
         this.transactionType = transactionType;
         this.transactionChainId = transactionChainId;
-
+        this.version = version;
     }
 
-
     public String getTransactionId() {
         return transactionId;
     }
@@ -39,19 +48,25 @@ public class CreateTransaction implements SerializableMessage {
         return transactionType;
     }
 
+    public int getVersion() {
+        return version;
+    }
+
     @Override
     public Object toSerializable() {
         return ShardTransactionMessages.CreateTransaction.newBuilder()
             .setTransactionId(transactionId)
             .setTransactionType(transactionType)
-            .setTransactionChainId(transactionChainId).build();
+            .setTransactionChainId(transactionChainId)
+            .setMessageVersion(version).build();
     }
 
     public static CreateTransaction fromSerializable(Object message) {
         ShardTransactionMessages.CreateTransaction createTransaction =
             (ShardTransactionMessages.CreateTransaction) message;
         return new CreateTransaction(createTransaction.getTransactionId(),
-            createTransaction.getTransactionType(), createTransaction.getTransactionChainId());
+            createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
+            createTransaction.getMessageVersion());
     }
 
     public String getTransactionChainId() {
index 096d131..14620f1 100644 (file)
@@ -15,13 +15,21 @@ public class CreateTransactionReply implements SerializableMessage {
     public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
     private final String transactionPath;
     private final String transactionId;
+    private final int version;
 
     public CreateTransactionReply(String transactionPath,
         String transactionId) {
+        this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION);
+    }
+
+    public CreateTransactionReply(String transactionPath,
+                                  String transactionId, int version) {
         this.transactionPath = transactionPath;
         this.transactionId = transactionId;
+        this.version = version;
     }
 
+
     public String getTransactionPath() {
         return transactionPath;
     }
@@ -30,16 +38,21 @@ public class CreateTransactionReply implements SerializableMessage {
         return transactionId;
     }
 
+    public int getVersion() {
+        return version;
+    }
+
     public Object toSerializable(){
         return ShardTransactionMessages.CreateTransactionReply.newBuilder()
             .setTransactionActorPath(transactionPath)
             .setTransactionId(transactionId)
+            .setMessageVersion(version)
             .build();
     }
 
     public static CreateTransactionReply fromSerializable(Object serializable){
         ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
-        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId());
+        return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion());
     }
 
 }
index 180108f..38886c9 100644 (file)
@@ -20,14 +20,16 @@ public class ForwardedReadyTransaction {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final Modification modification;
     private final boolean returnSerialized;
+    private final int txnClientVersion;
 
-    public ForwardedReadyTransaction(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
-            Modification modification, boolean returnSerialized) {
+    public ForwardedReadyTransaction(String transactionID, int txnClientVersion,
+            DOMStoreThreePhaseCommitCohort cohort, Modification modification,
+            boolean returnSerialized) {
         this.transactionID = transactionID;
         this.cohort = cohort;
         this.modification = modification;
         this.returnSerialized = returnSerialized;
-
+        this.txnClientVersion = txnClientVersion;
     }
 
     public String getTransactionID() {
@@ -45,4 +47,8 @@ public class ForwardedReadyTransaction {
     public boolean isReturnSerialized() {
         return returnSerialized;
     }
+
+    public int getTxnClientVersion() {
+        return txnClientVersion;
+    }
 }
index eee4891..282e23e 100644 (file)
@@ -17,7 +17,6 @@ public class ReadyTransactionReply implements SerializableMessage {
     private final String cohortPath;
 
     public ReadyTransactionReply(String cohortPath) {
-
         this.cohortPath = cohortPath;
     }
 
@@ -27,8 +26,9 @@ public class ReadyTransactionReply implements SerializableMessage {
 
     @Override
     public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
-        return ShardTransactionMessages.ReadyTransactionReply.newBuilder().
-                setActorPath(cohortPath).build();
+        return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
+                .setActorPath(cohortPath)
+                .build();
     }
 
     public static ReadyTransactionReply fromSerializable(Object serializable) {
index e409168..904dcdf 100644 (file)
@@ -395,4 +395,28 @@ public class ActorContext {
 
         return hostPort1.equals(hostPort2);
     }
+
+    /**
+     * @deprecated This method is present only to support backward compatibility with Helium and should not be
+     * used any further
+     *
+     *
+     * @param primaryPath
+     * @param localPathOfRemoteActor
+     * @return
+    */
+    @Deprecated
+    public String resolvePath(final String primaryPath,
+                                            final String localPathOfRemoteActor) {
+        StringBuilder builder = new StringBuilder();
+        String[] primaryPathElements = primaryPath.split("/");
+        builder.append(primaryPathElements[0]).append("//")
+            .append(primaryPathElements[1]).append(primaryPathElements[2]);
+        String[] remotePathElements = localPathOfRemoteActor.split("/");
+        for (int i = 3; i < remotePathElements.length; i++) {
+                builder.append("/").append(remotePathElements[i]);
+            }
+
+        return builder.toString();
+    }
 }
index 101a737..d5a12c7 100644 (file)
@@ -4,6 +4,7 @@ import akka.actor.ActorRef;
 import akka.actor.DeadLetter;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
@@ -70,19 +71,25 @@ public class DataChangeListenerTest extends AbstractActorTest {
             final Props props = DataChangeListener.props(mockListener);
             final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender");
 
-            // Let the DataChangeListener know that notifications should be enabled
-            subject.tell(new EnableNotification(true), ActorRef.noSender());
+            getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
 
             subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
                     ActorRef.noSender());
 
-            getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
-            new Within(duration("1 seconds")) {
-                @Override
-                protected void run() {
-                    expectNoMsg();
+            // Make sure no DataChangedReply is sent to DeadLetters.
+            while(true) {
+                DeadLetter deadLetter;
+                try {
+                    deadLetter = expectMsgClass(duration("1 seconds"), DeadLetter.class);
+                } catch (AssertionError e) {
+                    // Timed out - got no DeadLetter - this is good
+                    break;
                 }
-            };
+
+                // We may get DeadLetters for other messages we don't care about.
+                Assert.assertFalse("Unexpected DataChangedReply",
+                        deadLetter.message() instanceof DataChangedReply);
+            }
         }};
     }
 }
index cd8a658..c6bb0d6 100644 (file)
@@ -101,6 +101,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
 
 
 public class ShardTest extends AbstractActorTest {
@@ -611,7 +612,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
@@ -625,10 +627,12 @@ public class ShardTest extends AbstractActorTest {
 
             // Send the ForwardedReadyTransaction for the next 2 Tx's.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
@@ -792,10 +796,12 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -859,7 +865,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message.
@@ -902,7 +909,8 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message.
@@ -954,7 +962,8 @@ public class ShardTest extends AbstractActorTest {
                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
                     modification, preCommit);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+                    cohort, modification, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
@@ -1018,10 +1027,12 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // canCommit 1st Tx. We don't send the commit so it should timeout.
@@ -1080,13 +1091,16 @@ public class ShardTest extends AbstractActorTest {
 
             // Ready the Tx's
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // canCommit 1st Tx.
@@ -1149,10 +1163,12 @@ public class ShardTest extends AbstractActorTest {
             // Simulate the ForwardedReadyTransaction messages that would be sent
             // by the ShardTransaction.
 
-            shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
-            shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true), getRef());
             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
 
             // Send the CanCommitTransaction message for the first Tx.
@@ -1345,7 +1361,7 @@ public class ShardTest extends AbstractActorTest {
         };
     }
 
-    private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
+    static NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
             throws ExecutionException, InterruptedException {
         DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
 
index 6375e3c..45b00f5 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.pattern.AskTimeoutException;
-import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -32,8 +29,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.pattern.AskTimeoutException;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * Covers negative test cases
@@ -75,7 +76,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -104,7 +106,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -133,7 +136,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -162,7 +166,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -194,7 +199,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -231,7 +237,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -263,7 +270,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java
new file mode 100644 (file)
index 0000000..af07aee
--- /dev/null
@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collections;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+
+/**
+ * Tests backwards compatibility support from Helium-1 to Helium.
+ *
+ * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the
+ * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction,
+ * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages
+ * would be sans transactionId so this test verifies the Shard handles that properly.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
+
+    @Test
+    public void testTransactionCommit() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            SchemaContext schemaContext = TestModel.createTestContext();
+            Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
+                    shardName("inventory").type("config").build(),
+                    Collections.<ShardIdentifier,String>emptyMap(),
+                    DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
+                    schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
+                    "testTransactionCommit");
+
+            waitUntilLeader(shard);
+
+            // Send CreateTransaction message with no messages version
+
+            String transactionID = "txn-1";
+            shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
+                    .setTransactionId(transactionID)
+                    .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+                    .setTransactionChainId("").build(), getRef());
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
+
+            ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
+
+            // Write data to the Tx
+
+            txActor.tell(new WriteData(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+
+            expectMsgClass(duration, WriteDataReply.class);
+
+            // Ready the Tx
+
+            txActor.tell(new ReadyTransaction().toSerializable(), getRef());
+
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
+                    duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+
+            ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
+
+            // Send the CanCommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the PreCommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the CommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            NormalizedNode<?, ?> node = ShardTest.readStore(shard, TestModel.TEST_PATH);
+            Assert.assertNotNull("Data not found in store", node);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testTransactionAbort() throws Exception {
+        new ShardTestKit(getSystem()) {{
+            SchemaContext schemaContext = TestModel.createTestContext();
+            Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
+                    shardName("inventory").type("config").build(),
+                    Collections.<ShardIdentifier,String>emptyMap(),
+                    DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
+                    schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
+
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
+                    "testTransactionAbort");
+
+            waitUntilLeader(shard);
+
+            // Send CreateTransaction message with no messages version
+
+            String transactionID = "txn-1";
+            shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
+                    .setTransactionId(transactionID)
+                    .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+                    .setTransactionChainId("").build(), getRef());
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
+
+            ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
+
+            // Write data to the Tx
+
+            txActor.tell(new WriteData(TestModel.TEST_PATH,
+                    ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+
+            expectMsgClass(duration, WriteDataReply.class);
+
+            // Ready the Tx
+
+            txActor.tell(new ReadyTransaction().toSerializable(), getRef());
+
+            ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
+                    duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+
+            ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
+
+            // Send the CanCommitTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Send the AbortTransaction message with no transactionId.
+
+            cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(),
+                    getRef());
+
+            expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+}
index 793df8e..c869be8 100644 (file)
@@ -1,12 +1,11 @@
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.actor.Terminated;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
@@ -15,6 +14,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -39,12 +39,13 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 public class ShardTransactionTest extends AbstractActorTest {
     private static ListeningExecutorService storeExecutor =
@@ -78,12 +79,14 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
         }
@@ -114,13 +117,15 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRO"));
 
             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
                     props, "testReadDataWhenDataNotFoundRW"));
@@ -150,12 +155,14 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
         }
@@ -183,12 +190,14 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
 
             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
 
             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
         }
@@ -228,7 +237,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
 
             transaction.tell(new WriteData(TestModel.TEST_PATH,
@@ -254,7 +264,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
 
             transaction.tell(new MergeData(TestModel.TEST_PATH,
@@ -279,7 +290,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
 
             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
@@ -301,7 +313,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
 
             watch(transaction);
@@ -318,7 +331,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
 
             watch(transaction);
@@ -339,7 +353,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
 
             watch(transaction);
@@ -355,7 +370,8 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
         final ActorRef shard = createShard();
         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                testSchemaContext, datastoreContext, shardStats, "txn");
+                testSchemaContext, datastoreContext, shardStats, "txn",
+                CreateTransaction.CURRENT_VERSION);
         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
 
         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
@@ -370,7 +386,8 @@ public class ShardTransactionTest extends AbstractActorTest {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = createShard();
             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                    testSchemaContext, datastoreContext, shardStats, "txn");
+                    testSchemaContext, datastoreContext, shardStats, "txn",
+                    CreateTransaction.CURRENT_VERSION);
             final ActorRef transaction =
                 getSystem().actorOf(props, "testShardTransactionInactivity");
 
index 35f346f..b77b0b6 100644 (file)
@@ -338,13 +338,15 @@ public class TransactionProxyTest {
         return getSystem().actorSelection(actorRef.path());
     }
 
-    private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+    private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
         return CreateTransactionReply.newBuilder()
             .setTransactionActorPath(actorRef.path().toString())
-            .setTransactionId("txn-1").build();
+            .setTransactionId("txn-1")
+            .setMessageVersion(transactionVersion)
+            .build();
     }
 
-    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
         ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
         doReturn(actorSystem.actorSelection(actorRef.path())).
                 when(mockActorContext).actorSelection(actorRef.path().toString());
@@ -352,7 +354,7 @@ public class TransactionProxyTest {
         doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
                 when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
 
-        doReturn(Futures.successful(createTransactionReply(actorRef))).when(mockActorContext).
+        doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
                 executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
                         eqCreateTransaction(memberName, type));
 
@@ -361,6 +363,11 @@ public class TransactionProxyTest {
         return actorRef;
     }
 
+    private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+        return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+    }
+
+
     private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
             throws Throwable {
 
@@ -835,6 +842,47 @@ public class TransactionProxyTest {
         verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testReadyForwardCompatibility() throws Exception {
+        ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+
+        NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+        doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedReadData());
+
+        doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+        doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+                eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+        doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+
+        TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+                READ_WRITE);
+
+        transactionProxy.read(TestModel.TEST_PATH);
+
+        transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+                WriteDataReply.SERIALIZABLE_CLASS);
+
+        verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+        verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+                eq(actorRef.path().toString()));
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testReadyWithRecordingOperationFailure() throws Exception {
index 60f9a2d..39d337e 100644 (file)
@@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.japi.Creator;
@@ -17,7 +18,9 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
+
 import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -182,4 +185,51 @@ public class ActorContextTest extends AbstractActorTest{
         clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/");
         assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
     }
+
+    @Test
+    public void testResolvePathForRemoteActor() {
+        ActorContext actorContext =
+                new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
+                        ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
+                "akka://system/user/shardmanager/shard/transaction");
+
+        String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testResolvePathForLocalActor() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka://system/user/shardmanager/shard",
+                "akka://system/user/shardmanager/shard/transaction");
+
+        String expected = "akka://system/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testResolvePathForRemoteActorWithProperRemoteAddress() {
+        ActorContext actorContext =
+                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+                        mock(Configuration.class));
+
+        String actual = actorContext.resolvePath(
+                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
+                "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
+
+        String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
+
+        assertEquals(expected, actual);
+    }
+
 }
index 1f10b39..b0d9bf4 100644 (file)
@@ -9,6 +9,8 @@
 package org.opendaylight.controller.networkconfig.neutron;
 
 import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.Inet6Address;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -255,16 +257,45 @@ public class NeutronSubnet extends ConfigurationObject implements Serializable,
      * a new subnet)
      */
     public boolean isValidCIDR() {
-        try {
-            SubnetUtils util = new SubnetUtils(cidr);
-            SubnetInfo info = util.getInfo();
-            if (!info.getNetworkAddress().equals(info.getAddress())) {
+        // fix for Bug 2290 - need to wrap the existing test as
+        // IPv4 because SubnetUtils doesn't support IPv6
+        if (ipVersion == 4) {
+            try {
+                SubnetUtils util = new SubnetUtils(cidr);
+                SubnetInfo info = util.getInfo();
+                if (!info.getNetworkAddress().equals(info.getAddress())) {
+                    return false;
+                }
+            } catch (Exception e) {
                 return false;
             }
-        } catch (Exception e) {
-            return false;
+            return true;
         }
-        return true;
+        if (ipVersion == 6) {
+            // fix for Bug2290 - this is custom code because no classes
+            // with ODL-friendly licenses have been found
+            // extract address (in front of /) and length (after /)
+            String[] parts = cidr.split("/");
+            if (parts.length != 2) {
+                return false;
+            }
+            try {
+                int length = Integer.parseInt(parts[1]);
+                //TODO?: limit check on length
+                // convert to byte array
+                byte[] addrBytes = ((Inet6Address) InetAddress.getByName(parts[0])).getAddress();
+                int i;
+                for (i=length; i<128; i++) { // offset is to ensure proper comparison
+                    if (((((int) addrBytes[i/8]) & 0x000000FF) & (1 << (7-(i%8)))) != 0) {
+                        return(false);
+                    }
+                }
+                return(true);
+            } catch (Exception e) {
+                return(false);
+            }
+        }
+        return false;
     }
 
     /* test to see if the gateway IP specified overlaps with specified