From: Devin Avery Date: Tue, 4 Nov 2014 20:07:31 +0000 (+0000) Subject: Merge "Fix for Bug 2290." X-Git-Tag: release/lithium~876^2~2^2~6 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f5a373c5378af41f62a2c36ced4046fbdb77e00b;hp=2b99f613b5f2a63bd0579ea6fd47a0db4f5a457e Merge "Fix for Bug 2290." --- diff --git a/opendaylight/md-sal/benchmark-data-store/pom.xml b/opendaylight/md-sal/benchmark-data-store/pom.xml index ac384319b8..7d47add017 100644 --- a/opendaylight/md-sal/benchmark-data-store/pom.xml +++ b/opendaylight/md-sal/benchmark-data-store/pom.xml @@ -40,6 +40,14 @@ and is available at http://www.eclipse.org/legal/epl-v10.html org.opendaylight.controller sal-inmemory-datastore + + org.opendaylight.controller + sal-broker-impl + + + org.slf4j + slf4j-simple + @@ -69,4 +77,4 @@ and is available at http://www.eclipse.org/legal/epl-v10.html - \ No newline at end of file + diff --git a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryBrokerWriteTransactionBenchmark.java b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryBrokerWriteTransactionBenchmark.java new file mode 100644 index 0000000000..fdd715e54f --- /dev/null +++ b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryBrokerWriteTransactionBenchmark.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.store.benchmark; + +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Warmup; + +/** + * @author Lukas Sedlak + */ +public abstract class AbstractInMemoryBrokerWriteTransactionBenchmark extends AbstractInMemoryWriteTransactionBenchmark { + + protected DOMDataBrokerImpl domBroker; + + protected void initTestNode() throws Exception { + final YangInstanceIdentifier testPath = YangInstanceIdentifier.builder(BenchmarkModel.TEST_PATH) + .build(); + DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); + writeTx.put(LogicalDatastoreType.OPERATIONAL, testPath, provideOuterListNode()); + + writeTx.submit().get(); + } + + @Benchmark + @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + public void write100KSingleNodeWithOneInnerItemInOneCommitBenchmark() throws Exception { + + DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); + for (int outerListKey = 0; outerListKey < OUTER_LIST_100K; ++outerListKey) { + writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_100K_PATHS[outerListKey], OUTER_LIST_ONE_ITEM_INNER_LIST[outerListKey]); + } + + writeTx.submit().get(); + } + + @Benchmark + @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + public void write100KSingleNodeWithOneInnerItemInCommitPerWriteBenchmark() throws Exception { + for (int outerListKey = 0; outerListKey < OUTER_LIST_100K; ++outerListKey) { + DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); + writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_100K_PATHS[outerListKey], OUTER_LIST_ONE_ITEM_INNER_LIST[outerListKey]); + + writeTx.submit().get(); + } + } + + @Benchmark + @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + public void write50KSingleNodeWithTwoInnerItemsInOneCommitBenchmark() throws Exception { + DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); + for (int outerListKey = 0; outerListKey < OUTER_LIST_50K; ++outerListKey) { + writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_50K_PATHS[outerListKey], OUTER_LIST_TWO_ITEM_INNER_LIST[outerListKey]); + } + + writeTx.submit().get(); + } + + @Benchmark + @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + public void write50KSingleNodeWithTwoInnerItemsInCommitPerWriteBenchmark() throws Exception { + for (int outerListKey = 0; outerListKey < OUTER_LIST_50K; ++outerListKey) { + DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); + writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_50K_PATHS[outerListKey], OUTER_LIST_TWO_ITEM_INNER_LIST[outerListKey]); + writeTx.submit().get(); + } + } + + @Benchmark + @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + public void write10KSingleNodeWithTenInnerItemsInOneCommitBenchmark() throws Exception { + DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); + for (int outerListKey = 0; outerListKey < OUTER_LIST_10K; ++outerListKey) { + writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_10K_PATHS[outerListKey], OUTER_LIST_TEN_ITEM_INNER_LIST[outerListKey]); + } + writeTx.submit().get(); + } + + @Benchmark + @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) + public void write10KSingleNodeWithTenInnerItemsInCommitPerWriteBenchmark() throws Exception { + for (int outerListKey = 0; outerListKey < OUTER_LIST_10K; ++outerListKey) { + DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction(); + writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_10K_PATHS[outerListKey], OUTER_LIST_TEN_ITEM_INNER_LIST[outerListKey]); + writeTx.submit().get(); + } + } +} diff --git a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryDatastoreWriteTransactionBenchmark.java b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryDatastoreWriteTransactionBenchmark.java index aa5ef61ce4..fce0642860 100644 --- a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryDatastoreWriteTransactionBenchmark.java +++ b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryDatastoreWriteTransactionBenchmark.java @@ -12,14 +12,6 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; -import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; -import org.opendaylight.yangtools.yang.data.api.schema.MapNode; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; -import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; -import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Warmup; @@ -27,67 +19,10 @@ import org.openjdk.jmh.annotations.Warmup; /** * @author Lukas Sedlak */ -public abstract class AbstractInMemoryDatastoreWriteTransactionBenchmark { +public abstract class AbstractInMemoryDatastoreWriteTransactionBenchmark extends AbstractInMemoryWriteTransactionBenchmark { - private static final int WARMUP_ITERATIONS = 20; - private static final int MEASUREMENT_ITERATIONS = 20; - - private static final int OUTER_LIST_100K = 100000; - private static final int OUTER_LIST_50K = 50000; - private static final int OUTER_LIST_10K = 10000; - - private static final YangInstanceIdentifier[] OUTER_LIST_100K_PATHS = initOuterListPaths(OUTER_LIST_100K); - private static final YangInstanceIdentifier[] OUTER_LIST_50K_PATHS = initOuterListPaths(OUTER_LIST_50K); - private static final YangInstanceIdentifier[] OUTER_LIST_10K_PATHS = initOuterListPaths(OUTER_LIST_10K); - - private static YangInstanceIdentifier[] initOuterListPaths(final int outerListPathsCount) { - final YangInstanceIdentifier[] paths = new YangInstanceIdentifier[outerListPathsCount]; - - for (int outerListKey = 0; outerListKey < outerListPathsCount; ++outerListKey) { - paths[outerListKey] = YangInstanceIdentifier.builder(BenchmarkModel.OUTER_LIST_PATH) - .nodeWithKey(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey) - .build(); - } - return paths; - } - - private static final MapNode ONE_ITEM_INNER_LIST = initInnerListItems(1); - private static final MapNode TWO_ITEM_INNER_LIST = initInnerListItems(2); - private static final MapNode TEN_ITEM_INNER_LIST = initInnerListItems(10); - - private static MapNode initInnerListItems(final int count) { - final CollectionNodeBuilder mapEntryBuilder = ImmutableNodes - .mapNodeBuilder(BenchmarkModel.INNER_LIST_QNAME); - - for (int i = 1; i <= count; ++i) { - mapEntryBuilder - .withChild(ImmutableNodes.mapEntry(BenchmarkModel.INNER_LIST_QNAME, BenchmarkModel.NAME_QNAME, i)); - } - return mapEntryBuilder.build(); - } - - private static final NormalizedNode[] OUTER_LIST_ONE_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_100K, ONE_ITEM_INNER_LIST); - private static final NormalizedNode[] OUTER_LIST_TWO_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_50K, TWO_ITEM_INNER_LIST); - private static final NormalizedNode[] OUTER_LIST_TEN_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_10K, TEN_ITEM_INNER_LIST); - - private static NormalizedNode[] initOuterListItems(int outerListItemsCount, MapNode innerList) { - final NormalizedNode[] outerListItems = new NormalizedNode[outerListItemsCount]; - - for (int i = 0; i < outerListItemsCount; ++i) { - int outerListKey = i; - outerListItems[i] = ImmutableNodes.mapEntryBuilder(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey) - .withChild(innerList).build(); - } - return outerListItems; - } - - protected SchemaContext schemaContext; protected InMemoryDOMDataStore domStore; - abstract public void setUp() throws Exception; - - abstract public void tearDown(); - protected void initTestNode() throws Exception { final YangInstanceIdentifier testPath = YangInstanceIdentifier.builder(BenchmarkModel.TEST_PATH) .build(); @@ -100,15 +35,6 @@ public abstract class AbstractInMemoryDatastoreWriteTransactionBenchmark { cohort.commit().get(); } - private DataContainerChild provideOuterListNode() { - return ImmutableContainerNodeBuilder - .create() - .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BenchmarkModel.TEST_QNAME)) - .withChild( - ImmutableNodes.mapNodeBuilder(BenchmarkModel.OUTER_LIST_QNAME) - .build()).build(); - } - @Benchmark @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS) diff --git a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryWriteTransactionBenchmark.java b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryWriteTransactionBenchmark.java new file mode 100644 index 0000000000..6c256eb7af --- /dev/null +++ b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/AbstractInMemoryWriteTransactionBenchmark.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.store.benchmark; + +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +public abstract class AbstractInMemoryWriteTransactionBenchmark { + protected static final int OUTER_LIST_100K = 100000; + protected static final int OUTER_LIST_50K = 50000; + protected static final int OUTER_LIST_10K = 10000; + + protected static final YangInstanceIdentifier[] OUTER_LIST_100K_PATHS = initOuterListPaths(OUTER_LIST_100K); + protected static final YangInstanceIdentifier[] OUTER_LIST_50K_PATHS = initOuterListPaths(OUTER_LIST_50K); + protected static final YangInstanceIdentifier[] OUTER_LIST_10K_PATHS = initOuterListPaths(OUTER_LIST_10K); + + private static YangInstanceIdentifier[] initOuterListPaths(final int outerListPathsCount) { + final YangInstanceIdentifier[] paths = new YangInstanceIdentifier[outerListPathsCount]; + + for (int outerListKey = 0; outerListKey < outerListPathsCount; ++outerListKey) { + paths[outerListKey] = YangInstanceIdentifier.builder(BenchmarkModel.OUTER_LIST_PATH) + .nodeWithKey(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey) + .build(); + } + return paths; + } + + protected static final int WARMUP_ITERATIONS = 20; + protected static final int MEASUREMENT_ITERATIONS = 20; + + protected static final MapNode ONE_ITEM_INNER_LIST = initInnerListItems(1); + protected static final MapNode TWO_ITEM_INNER_LIST = initInnerListItems(2); + protected static final MapNode TEN_ITEM_INNER_LIST = initInnerListItems(10); + + private static MapNode initInnerListItems(final int count) { + final CollectionNodeBuilder mapEntryBuilder = ImmutableNodes + .mapNodeBuilder(BenchmarkModel.INNER_LIST_QNAME); + + for (int i = 1; i <= count; ++i) { + mapEntryBuilder + .withChild(ImmutableNodes.mapEntry(BenchmarkModel.INNER_LIST_QNAME, BenchmarkModel.NAME_QNAME, i)); + } + return mapEntryBuilder.build(); + } + + protected static final NormalizedNode[] OUTER_LIST_ONE_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_100K, ONE_ITEM_INNER_LIST); + protected static final NormalizedNode[] OUTER_LIST_TWO_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_50K, TWO_ITEM_INNER_LIST); + protected static final NormalizedNode[] OUTER_LIST_TEN_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_10K, TEN_ITEM_INNER_LIST); + + private static NormalizedNode[] initOuterListItems(final int outerListItemsCount, final MapNode innerList) { + final NormalizedNode[] outerListItems = new NormalizedNode[outerListItemsCount]; + + for (int i = 0; i < outerListItemsCount; ++i) { + int outerListKey = i; + outerListItems[i] = ImmutableNodes.mapEntryBuilder(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey) + .withChild(innerList).build(); + } + return outerListItems; + } + + protected SchemaContext schemaContext; + abstract public void setUp() throws Exception; + abstract public void tearDown(); + + protected static DataContainerChild provideOuterListNode() { + return ImmutableContainerNodeBuilder + .create() + .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BenchmarkModel.TEST_QNAME)) + .withChild( + ImmutableNodes.mapNodeBuilder(BenchmarkModel.OUTER_LIST_QNAME) + .build()).build(); + } +} diff --git a/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryBrokerWriteTransactionBenchmark.java b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryBrokerWriteTransactionBenchmark.java new file mode 100644 index 0000000000..a46a6d1e79 --- /dev/null +++ b/opendaylight/md-sal/benchmark-data-store/src/main/java/org/opendaylight/controller/md/sal/dom/store/benchmark/InMemoryBrokerWriteTransactionBenchmark.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.dom.store.benchmark; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.sal.core.spi.data.DOMStore; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; + +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(1) +public class InMemoryBrokerWriteTransactionBenchmark extends AbstractInMemoryBrokerWriteTransactionBenchmark { + private ListeningExecutorService executor; + + @Setup(Level.Trial) + @Override + public void setUp() throws Exception { + ListeningExecutorService dsExec = MoreExecutors.sameThreadExecutor(); + executor = MoreExecutors.listeningDecorator( + MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)Executors.newFixedThreadPool(1), 1L, TimeUnit.SECONDS)); + + InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", dsExec, + MoreExecutors.sameThreadExecutor()); + InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", dsExec, + MoreExecutors.sameThreadExecutor()); + Map datastores = ImmutableMap.of( + LogicalDatastoreType.OPERATIONAL, (DOMStore)operStore, + LogicalDatastoreType.CONFIGURATION, configStore); + + domBroker = new DOMDataBrokerImpl(datastores, executor); + schemaContext = BenchmarkModel.createTestContext(); + configStore.onGlobalContextUpdated(schemaContext); + operStore.onGlobalContextUpdated(schemaContext); + initTestNode(); + } + + @Override + public void tearDown() { + domBroker.close(); + executor.shutdown(); + } +} diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java index 69af77c749..fd9912244a 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java @@ -358,6 +358,11 @@ public class RaftActorTest extends AbstractActorTest { TestActorRef mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId, Collections.EMPTY_MAP, Optional.of(config)), persistenceId); + MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + + // Wait for akka's recovery to complete so it doesn't interfere. + mockRaftActor.waitForRecoveryComplete(); + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), @@ -367,8 +372,6 @@ public class RaftActorTest extends AbstractActorTest { Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(), Lists.newArrayList(), 3, 1 ,3, 1); - MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); - mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot)); CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch(); @@ -402,8 +405,6 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class)); - mockRaftActor.waitForRecoveryComplete(); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); }}; @@ -430,6 +431,9 @@ public class RaftActorTest extends AbstractActorTest { MockRaftActor mockRaftActor = mockActorRef.underlyingActor(); + // Wait for akka's recovery to complete so it doesn't interfere. + mockRaftActor.waitForRecoveryComplete(); + ByteString snapshotBytes = fromObject(Arrays.asList( new MockRaftActorContext.MockPayload("A"), new MockRaftActorContext.MockPayload("B"), @@ -470,8 +474,6 @@ public class RaftActorTest extends AbstractActorTest { mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class)); - mockRaftActor.waitForRecoveryComplete(); - mockActorRef.tell(PoisonPill.getInstance(), getRef()); }}; } diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java index e43b44582d..2a79a5b827 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/cohort3pc/ThreePhaseCommitCohortMessages.java @@ -11,17 +11,17 @@ public final class ThreePhaseCommitCohortMessages { public interface CanCommitTransactionOrBuilder extends com.google.protobuf.MessageOrBuilder { - // required string transactionId = 1; + // optional string transactionId = 1; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ boolean hasTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ java.lang.String getTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ com.google.protobuf.ByteString getTransactionIdBytes(); @@ -122,17 +122,17 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; public static final int TRANSACTIONID_FIELD_NUMBER = 1; private java.lang.Object transactionId_; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -149,7 +149,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -173,10 +173,6 @@ public final class ThreePhaseCommitCohortMessages { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; - if (!hasTransactionId()) { - memoizedIsInitialized = 0; - return false; - } memoizedIsInitialized = 1; return true; } @@ -376,10 +372,6 @@ public final class ThreePhaseCommitCohortMessages { } public final boolean isInitialized() { - if (!hasTransactionId()) { - - return false; - } return true; } @@ -402,16 +394,16 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; private java.lang.Object transactionId_ = ""; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -425,7 +417,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -441,7 +433,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionId( java.lang.String value) { @@ -454,7 +446,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder clearTransactionId() { bitField0_ = (bitField0_ & ~0x00000001); @@ -463,7 +455,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionIdBytes( com.google.protobuf.ByteString value) { @@ -894,17 +886,17 @@ public final class ThreePhaseCommitCohortMessages { public interface AbortTransactionOrBuilder extends com.google.protobuf.MessageOrBuilder { - // required string transactionId = 1; + // optional string transactionId = 1; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ boolean hasTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ java.lang.String getTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ com.google.protobuf.ByteString getTransactionIdBytes(); @@ -1005,17 +997,17 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; public static final int TRANSACTIONID_FIELD_NUMBER = 1; private java.lang.Object transactionId_; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -1032,7 +1024,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -1056,10 +1048,6 @@ public final class ThreePhaseCommitCohortMessages { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; - if (!hasTransactionId()) { - memoizedIsInitialized = 0; - return false; - } memoizedIsInitialized = 1; return true; } @@ -1259,10 +1247,6 @@ public final class ThreePhaseCommitCohortMessages { } public final boolean isInitialized() { - if (!hasTransactionId()) { - - return false; - } return true; } @@ -1285,16 +1269,16 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; private java.lang.Object transactionId_ = ""; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -1308,7 +1292,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -1324,7 +1308,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionId( java.lang.String value) { @@ -1337,7 +1321,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder clearTransactionId() { bitField0_ = (bitField0_ & ~0x00000001); @@ -1346,7 +1330,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionIdBytes( com.google.protobuf.ByteString value) { @@ -1682,17 +1666,17 @@ public final class ThreePhaseCommitCohortMessages { public interface CommitTransactionOrBuilder extends com.google.protobuf.MessageOrBuilder { - // required string transactionId = 1; + // optional string transactionId = 1; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ boolean hasTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ java.lang.String getTransactionId(); /** - * required string transactionId = 1; + * optional string transactionId = 1; */ com.google.protobuf.ByteString getTransactionIdBytes(); @@ -1793,17 +1777,17 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; public static final int TRANSACTIONID_FIELD_NUMBER = 1; private java.lang.Object transactionId_; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -1820,7 +1804,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -1844,10 +1828,6 @@ public final class ThreePhaseCommitCohortMessages { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; - if (!hasTransactionId()) { - memoizedIsInitialized = 0; - return false; - } memoizedIsInitialized = 1; return true; } @@ -2047,10 +2027,6 @@ public final class ThreePhaseCommitCohortMessages { } public final boolean isInitialized() { - if (!hasTransactionId()) { - - return false; - } return true; } @@ -2073,16 +2049,16 @@ public final class ThreePhaseCommitCohortMessages { } private int bitField0_; - // required string transactionId = 1; + // optional string transactionId = 1; private java.lang.Object transactionId_ = ""; /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public boolean hasTransactionId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public java.lang.String getTransactionId() { java.lang.Object ref = transactionId_; @@ -2096,7 +2072,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public com.google.protobuf.ByteString getTransactionIdBytes() { @@ -2112,7 +2088,7 @@ public final class ThreePhaseCommitCohortMessages { } } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionId( java.lang.String value) { @@ -2125,7 +2101,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder clearTransactionId() { bitField0_ = (bitField0_ & ~0x00000001); @@ -2134,7 +2110,7 @@ public final class ThreePhaseCommitCohortMessages { return this; } /** - * required string transactionId = 1; + * optional string transactionId = 1; */ public Builder setTransactionIdBytes( com.google.protobuf.ByteString value) { @@ -3136,11 +3112,11 @@ public final class ThreePhaseCommitCohortMessages { java.lang.String[] descriptorData = { "\n\014Cohort.proto\022!org.opendaylight.control" + "ler.mdsal\"-\n\024CanCommitTransaction\022\025\n\rtra" + - "nsactionId\030\001 \002(\t\".\n\031CanCommitTransaction" + + "nsactionId\030\001 \001(\t\".\n\031CanCommitTransaction" + "Reply\022\021\n\tcanCommit\030\001 \002(\010\")\n\020AbortTransac" + - "tion\022\025\n\rtransactionId\030\001 \002(\t\"\027\n\025AbortTran" + + "tion\022\025\n\rtransactionId\030\001 \001(\t\"\027\n\025AbortTran" + "sactionReply\"*\n\021CommitTransaction\022\025\n\rtra" + - "nsactionId\030\001 \002(\t\"\030\n\026CommitTransactionRep" + + "nsactionId\030\001 \001(\t\"\030\n\026CommitTransactionRep" + "ly\"\026\n\024PreCommitTransaction\"\033\n\031PreCommitT" + "ransactionReplyBZ\n8org.opendaylight.cont" + "roller.protobuff.messages.cohort3pcB\036Thr", diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java index 96a39bddd3..3a1cfaa443 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/java/org/opendaylight/controller/protobuff/messages/transaction/ShardTransactionMessages.java @@ -668,6 +668,16 @@ public final class ShardTransactionMessages { */ com.google.protobuf.ByteString getTransactionChainIdBytes(); + + // optional int32 messageVersion = 4; + /** + * optional int32 messageVersion = 4; + */ + boolean hasMessageVersion(); + /** + * optional int32 messageVersion = 4; + */ + int getMessageVersion(); } /** * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransaction} @@ -735,6 +745,11 @@ public final class ShardTransactionMessages { transactionChainId_ = input.readBytes(); break; } + case 32: { + bitField0_ |= 0x00000008; + messageVersion_ = input.readInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -877,10 +892,27 @@ public final class ShardTransactionMessages { } } + // optional int32 messageVersion = 4; + public static final int MESSAGEVERSION_FIELD_NUMBER = 4; + private int messageVersion_; + /** + * optional int32 messageVersion = 4; + */ + public boolean hasMessageVersion() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional int32 messageVersion = 4; + */ + public int getMessageVersion() { + return messageVersion_; + } + private void initFields() { transactionId_ = ""; transactionType_ = 0; transactionChainId_ = ""; + messageVersion_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -911,6 +943,9 @@ public final class ShardTransactionMessages { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBytes(3, getTransactionChainIdBytes()); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeInt32(4, messageVersion_); + } getUnknownFields().writeTo(output); } @@ -932,6 +967,10 @@ public final class ShardTransactionMessages { size += com.google.protobuf.CodedOutputStream .computeBytesSize(3, getTransactionChainIdBytes()); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(4, messageVersion_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1054,6 +1093,8 @@ public final class ShardTransactionMessages { bitField0_ = (bitField0_ & ~0x00000002); transactionChainId_ = ""; bitField0_ = (bitField0_ & ~0x00000004); + messageVersion_ = 0; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -1094,6 +1135,10 @@ public final class ShardTransactionMessages { to_bitField0_ |= 0x00000004; } result.transactionChainId_ = transactionChainId_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.messageVersion_ = messageVersion_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1123,6 +1168,9 @@ public final class ShardTransactionMessages { transactionChainId_ = other.transactionChainId_; onChanged(); } + if (other.hasMessageVersion()) { + setMessageVersion(other.getMessageVersion()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1339,6 +1387,39 @@ public final class ShardTransactionMessages { return this; } + // optional int32 messageVersion = 4; + private int messageVersion_ ; + /** + * optional int32 messageVersion = 4; + */ + public boolean hasMessageVersion() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional int32 messageVersion = 4; + */ + public int getMessageVersion() { + return messageVersion_; + } + /** + * optional int32 messageVersion = 4; + */ + public Builder setMessageVersion(int value) { + bitField0_ |= 0x00000008; + messageVersion_ = value; + onChanged(); + return this; + } + /** + * optional int32 messageVersion = 4; + */ + public Builder clearMessageVersion() { + bitField0_ = (bitField0_ & ~0x00000008); + messageVersion_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransaction) } @@ -1382,6 +1463,16 @@ public final class ShardTransactionMessages { */ com.google.protobuf.ByteString getTransactionIdBytes(); + + // optional int32 messageVersion = 3; + /** + * optional int32 messageVersion = 3; + */ + boolean hasMessageVersion(); + /** + * optional int32 messageVersion = 3; + */ + int getMessageVersion(); } /** * Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransactionReply} @@ -1444,6 +1535,11 @@ public final class ShardTransactionMessages { transactionId_ = input.readBytes(); break; } + case 24: { + bitField0_ |= 0x00000004; + messageVersion_ = input.readInt32(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -1570,9 +1666,26 @@ public final class ShardTransactionMessages { } } + // optional int32 messageVersion = 3; + public static final int MESSAGEVERSION_FIELD_NUMBER = 3; + private int messageVersion_; + /** + * optional int32 messageVersion = 3; + */ + public boolean hasMessageVersion() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int32 messageVersion = 3; + */ + public int getMessageVersion() { + return messageVersion_; + } + private void initFields() { transactionActorPath_ = ""; transactionId_ = ""; + messageVersion_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -1600,6 +1713,9 @@ public final class ShardTransactionMessages { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getTransactionIdBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt32(3, messageVersion_); + } getUnknownFields().writeTo(output); } @@ -1617,6 +1733,10 @@ public final class ShardTransactionMessages { size += com.google.protobuf.CodedOutputStream .computeBytesSize(2, getTransactionIdBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(3, messageVersion_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1737,6 +1857,8 @@ public final class ShardTransactionMessages { bitField0_ = (bitField0_ & ~0x00000001); transactionId_ = ""; bitField0_ = (bitField0_ & ~0x00000002); + messageVersion_ = 0; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -1773,6 +1895,10 @@ public final class ShardTransactionMessages { to_bitField0_ |= 0x00000002; } result.transactionId_ = transactionId_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.messageVersion_ = messageVersion_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1799,6 +1925,9 @@ public final class ShardTransactionMessages { transactionId_ = other.transactionId_; onChanged(); } + if (other.hasMessageVersion()) { + setMessageVersion(other.getMessageVersion()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1982,6 +2111,39 @@ public final class ShardTransactionMessages { return this; } + // optional int32 messageVersion = 3; + private int messageVersion_ ; + /** + * optional int32 messageVersion = 3; + */ + public boolean hasMessageVersion() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional int32 messageVersion = 3; + */ + public int getMessageVersion() { + return messageVersion_; + } + /** + * optional int32 messageVersion = 3; + */ + public Builder setMessageVersion(int value) { + bitField0_ |= 0x00000004; + messageVersion_ = value; + onChanged(); + return this; + } + /** + * optional int32 messageVersion = 3; + */ + public Builder clearMessageVersion() { + bitField0_ = (bitField0_ & ~0x00000004); + messageVersion_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransactionReply) } @@ -7753,37 +7915,38 @@ public final class ShardTransactionMessages { java.lang.String[] descriptorData = { "\n\026ShardTransaction.proto\022!org.opendaylig" + "ht.controller.mdsal\032\014Common.proto\"\022\n\020Clo" + - "seTransaction\"\027\n\025CloseTransactionReply\"_" + + "seTransaction\"\027\n\025CloseTransactionReply\"w" + "\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" + "(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" + - "onChainId\030\003 \001(\t\"M\n\026CreateTransactionRepl" + - "y\022\034\n\024transactionActorPath\030\001 \002(\t\022\025\n\rtrans" + - "actionId\030\002 \002(\t\"\022\n\020ReadyTransaction\"*\n\025Re" + - "adyTransactionReply\022\021\n\tactorPath\030\001 \002(\t\"l" + - "\n\nDeleteData\022^\n\037instanceIdentifierPathAr", + "onChainId\030\003 \001(\t\022\026\n\016messageVersion\030\004 \001(\005\"" + + "e\n\026CreateTransactionReply\022\034\n\024transaction" + + "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\022\026" + + "\n\016messageVersion\030\003 \001(\005\"\022\n\020ReadyTransacti" + + "on\"*\n\025ReadyTransactionReply\022\021\n\tactorPath", + "\030\001 \002(\t\"l\n\nDeleteData\022^\n\037instanceIdentifi" + + "erPathArguments\030\001 \002(\01325.org.opendaylight" + + ".controller.mdsal.InstanceIdentifier\"\021\n\017" + + "DeleteDataReply\"j\n\010ReadData\022^\n\037instanceI" + + "dentifierPathArguments\030\001 \002(\01325.org.opend" + + "aylight.controller.mdsal.InstanceIdentif" + + "ier\"P\n\rReadDataReply\022?\n\016normalizedNode\030\001" + + " \001(\0132\'.org.opendaylight.controller.mdsal" + + ".Node\"\254\001\n\tWriteData\022^\n\037instanceIdentifie" + + "rPathArguments\030\001 \002(\01325.org.opendaylight.", + "controller.mdsal.InstanceIdentifier\022?\n\016n" + + "ormalizedNode\030\002 \002(\0132\'.org.opendaylight.c" + + "ontroller.mdsal.Node\"\020\n\016WriteDataReply\"\254" + + "\001\n\tMergeData\022^\n\037instanceIdentifierPathAr" + "guments\030\001 \002(\01325.org.opendaylight.control" + - "ler.mdsal.InstanceIdentifier\"\021\n\017DeleteDa" + - "taReply\"j\n\010ReadData\022^\n\037instanceIdentifie" + - "rPathArguments\030\001 \002(\01325.org.opendaylight." + - "controller.mdsal.InstanceIdentifier\"P\n\rR" + - "eadDataReply\022?\n\016normalizedNode\030\001 \001(\0132\'.o" + - "rg.opendaylight.controller.mdsal.Node\"\254\001" + - "\n\tWriteData\022^\n\037instanceIdentifierPathArg" + - "uments\030\001 \002(\01325.org.opendaylight.controll" + - "er.mdsal.InstanceIdentifier\022?\n\016normalize", - "dNode\030\002 \002(\0132\'.org.opendaylight.controlle" + - "r.mdsal.Node\"\020\n\016WriteDataReply\"\254\001\n\tMerge" + - "Data\022^\n\037instanceIdentifierPathArguments\030" + - "\001 \002(\01325.org.opendaylight.controller.mdsa" + - "l.InstanceIdentifier\022?\n\016normalizedNode\030\002" + - " \002(\0132\'.org.opendaylight.controller.mdsal" + - ".Node\"\020\n\016MergeDataReply\"l\n\nDataExists\022^\n" + - "\037instanceIdentifierPathArguments\030\001 \002(\01325" + - ".org.opendaylight.controller.mdsal.Insta" + - "nceIdentifier\"!\n\017DataExistsReply\022\016\n\006exis", - "ts\030\001 \002(\010BV\n:org.opendaylight.controller." + - "protobuff.messages.transactionB\030ShardTra" + - "nsactionMessages" + "ler.mdsal.InstanceIdentifier\022?\n\016normaliz" + + "edNode\030\002 \002(\0132\'.org.opendaylight.controll" + + "er.mdsal.Node\"\020\n\016MergeDataReply\"l\n\nDataE" + + "xists\022^\n\037instanceIdentifierPathArguments" + + "\030\001 \002(\01325.org.opendaylight.controller.mds", + "al.InstanceIdentifier\"!\n\017DataExistsReply" + + "\022\016\n\006exists\030\001 \002(\010BV\n:org.opendaylight.con" + + "troller.protobuff.messages.transactionB\030" + + "ShardTransactionMessages" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -7807,13 +7970,13 @@ public final class ShardTransactionMessages { internal_static_org_opendaylight_controller_mdsal_CreateTransaction_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_opendaylight_controller_mdsal_CreateTransaction_descriptor, - new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", }); + new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", "MessageVersion", }); internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor = getDescriptor().getMessageTypes().get(3); internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor, - new java.lang.String[] { "TransactionActorPath", "TransactionId", }); + new java.lang.String[] { "TransactionActorPath", "TransactionId", "MessageVersion", }); internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_descriptor = getDescriptor().getMessageTypes().get(4); internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_fieldAccessorTable = new diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto index 49c6cd07a8..222f68ab4f 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/Cohort.proto @@ -5,7 +5,7 @@ option java_outer_classname = "ThreePhaseCommitCohortMessages"; message CanCommitTransaction{ - required string transactionId = 1; + optional string transactionId = 1; } message CanCommitTransactionReply{ @@ -14,7 +14,7 @@ message CanCommitTransactionReply{ } message AbortTransaction{ - required string transactionId = 1; + optional string transactionId = 1; } message AbortTransactionReply { @@ -22,7 +22,7 @@ message AbortTransactionReply { } message CommitTransaction{ - required string transactionId = 1; + optional string transactionId = 1; } message CommitTransactionReply{ diff --git a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto index 26581478d9..c5e4ee45c0 100644 --- a/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto +++ b/opendaylight/md-sal/sal-clustering-commons/src/main/resources/ShardTransaction.proto @@ -16,12 +16,13 @@ message CreateTransaction{ required string transactionId = 1; required int32 transactionType =2; optional string transactionChainId = 3; + optional int32 messageVersion = 4; } message CreateTransactionReply{ -required string transactionActorPath = 1; -required string transactionId = 2; - + required string transactionActorPath = 1; + required string transactionId = 2; + optional int32 messageVersion = 3; } message ReadyTransaction{ @@ -29,7 +30,7 @@ message ReadyTransaction{ } message ReadyTransactionReply{ -required string actorPath = 1; + required string actorPath = 1; } message DeleteData { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7d67e0856f..5ea9b30c63 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -31,6 +31,7 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; +import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; @@ -374,7 +375,8 @@ public class Shard extends RaftActor { } private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) { - LOG.debug("Readying transaction {}", ready.getTransactionID()); + LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(), + ready.getTxnClientVersion()); // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the // commitCoordinator in preparation for the subsequent three phase commit initiated by @@ -382,12 +384,22 @@ public class Shard extends RaftActor { commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(), ready.getModification()); - // Return our actor path as we'll handle the three phase commit. - ReadyTransactionReply readyTransactionReply = - new ReadyTransactionReply(Serialization.serializedActorPath(self())); - getSender().tell( - ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply, - getSelf()); + // Return our actor path as we'll handle the three phase commit, except if the Tx client + // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version + // node. In that case, the subsequent 3-phase commit messages won't contain the + // transactionId so to maintain backwards compatibility, we create a separate cohort actor + // to provide the compatible behavior. + ActorRef replyActorPath = self(); + if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) { + LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); + replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( + ready.getTransactionID())); + } + + ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply( + Serialization.serializedActorPath(replyActorPath)); + getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : + readyTransactionReply, getSelf()); } private void handleAbortTransaction(AbortTransaction abort) { @@ -465,10 +477,8 @@ public class Shard extends RaftActor { } } - private ActorRef createTypedTransactionActor( - int transactionType, - ShardTransactionIdentifier transactionId, - String transactionChainId ) { + private ActorRef createTypedTransactionActor(int transactionType, + ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) { DOMStoreTransactionFactory factory = store; @@ -492,7 +502,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(), schemaContext,datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { @@ -501,7 +512,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(), schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { @@ -511,7 +523,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(), schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else { throw new IllegalArgumentException( "Shard="+name + ":CreateTransaction message has unidentified transaction type=" @@ -521,10 +534,12 @@ public class Shard extends RaftActor { private void createTransaction(CreateTransaction createTransaction) { createTransaction(createTransaction.getTransactionType(), - createTransaction.getTransactionId(), createTransaction.getTransactionChainId()); + createTransaction.getTransactionId(), createTransaction.getTransactionChainId(), + createTransaction.getVersion()); } - private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) { + private ActorRef createTransaction(int transactionType, String remoteTransactionId, + String transactionChainId, int clientVersion) { ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder() @@ -533,8 +548,8 @@ public class Shard extends RaftActor { if(LOG.isDebugEnabled()) { LOG.debug("Creating transaction : {} ", transactionId); } - ActorRef transactionActor = - createTypedTransactionActor(transactionType, transactionId, transactionChainId); + ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId, + transactionChainId, clientVersion); getSender() .tell(new CreateTransactionReply( @@ -599,7 +614,7 @@ public class Shard extends RaftActor { LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ", listenerRegistration.path()); - getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf()); + getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); } private ListenerRegistration { + private static final long serialVersionUID = 1L; + + private final String transactionId; + + BackwardsCompatibleThreePhaseCommitCohortCreator(String transactionId) { + this.transactionId = transactionId; + } + + @Override + public BackwardsCompatibleThreePhaseCommitCohort create() throws Exception { + return new BackwardsCompatibleThreePhaseCommitCohort(transactionId); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java index 361d406ac8..bf82e66036 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java @@ -13,24 +13,33 @@ import org.opendaylight.controller.protobuff.messages.transaction.ShardTransacti public class CreateTransaction implements SerializableMessage { - public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class; + public static final Class SERIALIZABLE_CLASS = + ShardTransactionMessages.CreateTransaction.class; + + public static final int HELIUM_1_VERSION = 1; + public static final int CURRENT_VERSION = HELIUM_1_VERSION; + private final String transactionId; private final int transactionType; private final String transactionChainId; + private final int version; public CreateTransaction(String transactionId, int transactionType) { this(transactionId, transactionType, ""); } public CreateTransaction(String transactionId, int transactionType, String transactionChainId) { + this(transactionId, transactionType, transactionChainId, CURRENT_VERSION); + } + private CreateTransaction(String transactionId, int transactionType, String transactionChainId, + int version) { this.transactionId = transactionId; this.transactionType = transactionType; this.transactionChainId = transactionChainId; - + this.version = version; } - public String getTransactionId() { return transactionId; } @@ -39,19 +48,25 @@ public class CreateTransaction implements SerializableMessage { return transactionType; } + public int getVersion() { + return version; + } + @Override public Object toSerializable() { return ShardTransactionMessages.CreateTransaction.newBuilder() .setTransactionId(transactionId) .setTransactionType(transactionType) - .setTransactionChainId(transactionChainId).build(); + .setTransactionChainId(transactionChainId) + .setMessageVersion(version).build(); } public static CreateTransaction fromSerializable(Object message) { ShardTransactionMessages.CreateTransaction createTransaction = (ShardTransactionMessages.CreateTransaction) message; return new CreateTransaction(createTransaction.getTransactionId(), - createTransaction.getTransactionType(), createTransaction.getTransactionChainId()); + createTransaction.getTransactionType(), createTransaction.getTransactionChainId(), + createTransaction.getMessageVersion()); } public String getTransactionChainId() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java index 096d131d5a..14620f15d9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java @@ -15,13 +15,21 @@ public class CreateTransactionReply implements SerializableMessage { public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class; private final String transactionPath; private final String transactionId; + private final int version; public CreateTransactionReply(String transactionPath, String transactionId) { + this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION); + } + + public CreateTransactionReply(String transactionPath, + String transactionId, int version) { this.transactionPath = transactionPath; this.transactionId = transactionId; + this.version = version; } + public String getTransactionPath() { return transactionPath; } @@ -30,16 +38,21 @@ public class CreateTransactionReply implements SerializableMessage { return transactionId; } + public int getVersion() { + return version; + } + public Object toSerializable(){ return ShardTransactionMessages.CreateTransactionReply.newBuilder() .setTransactionActorPath(transactionPath) .setTransactionId(transactionId) + .setMessageVersion(version) .build(); } public static CreateTransactionReply fromSerializable(Object serializable){ ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable; - return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId()); + return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java index 180108f218..38886c9a58 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java @@ -20,14 +20,16 @@ public class ForwardedReadyTransaction { private final DOMStoreThreePhaseCommitCohort cohort; private final Modification modification; private final boolean returnSerialized; + private final int txnClientVersion; - public ForwardedReadyTransaction(String transactionID, DOMStoreThreePhaseCommitCohort cohort, - Modification modification, boolean returnSerialized) { + public ForwardedReadyTransaction(String transactionID, int txnClientVersion, + DOMStoreThreePhaseCommitCohort cohort, Modification modification, + boolean returnSerialized) { this.transactionID = transactionID; this.cohort = cohort; this.modification = modification; this.returnSerialized = returnSerialized; - + this.txnClientVersion = txnClientVersion; } public String getTransactionID() { @@ -45,4 +47,8 @@ public class ForwardedReadyTransaction { public boolean isReturnSerialized() { return returnSerialized; } + + public int getTxnClientVersion() { + return txnClientVersion; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java index eee489177a..282e23ed3b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyTransactionReply.java @@ -17,7 +17,6 @@ public class ReadyTransactionReply implements SerializableMessage { private final String cohortPath; public ReadyTransactionReply(String cohortPath) { - this.cohortPath = cohortPath; } @@ -27,8 +26,9 @@ public class ReadyTransactionReply implements SerializableMessage { @Override public ShardTransactionMessages.ReadyTransactionReply toSerializable() { - return ShardTransactionMessages.ReadyTransactionReply.newBuilder(). - setActorPath(cohortPath).build(); + return ShardTransactionMessages.ReadyTransactionReply.newBuilder() + .setActorPath(cohortPath) + .build(); } public static ReadyTransactionReply fromSerializable(Object serializable) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java index e409168c85..904dcdf439 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java @@ -395,4 +395,28 @@ public class ActorContext { return hostPort1.equals(hostPort2); } + + /** + * @deprecated This method is present only to support backward compatibility with Helium and should not be + * used any further + * + * + * @param primaryPath + * @param localPathOfRemoteActor + * @return + */ + @Deprecated + public String resolvePath(final String primaryPath, + final String localPathOfRemoteActor) { + StringBuilder builder = new StringBuilder(); + String[] primaryPathElements = primaryPath.split("/"); + builder.append(primaryPathElements[0]).append("//") + .append(primaryPathElements[1]).append(primaryPathElements[2]); + String[] remotePathElements = localPathOfRemoteActor.split("/"); + for (int i = 3; i < remotePathElements.length; i++) { + builder.append("/").append(remotePathElements[i]); + } + + return builder.toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java index 101a73782b..d5a12c73c5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java @@ -4,6 +4,7 @@ import akka.actor.ActorRef; import akka.actor.DeadLetter; import akka.actor.Props; import akka.testkit.JavaTestKit; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.messages.DataChanged; @@ -70,19 +71,25 @@ public class DataChangeListenerTest extends AbstractActorTest { final Props props = DataChangeListener.props(mockListener); final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender"); - // Let the DataChangeListener know that notifications should be enabled - subject.tell(new EnableNotification(true), ActorRef.noSender()); + getSystem().eventStream().subscribe(getRef(), DeadLetter.class); subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent), ActorRef.noSender()); - getSystem().eventStream().subscribe(getRef(), DeadLetter.class); - new Within(duration("1 seconds")) { - @Override - protected void run() { - expectNoMsg(); + // Make sure no DataChangedReply is sent to DeadLetters. + while(true) { + DeadLetter deadLetter; + try { + deadLetter = expectMsgClass(duration("1 seconds"), DeadLetter.class); + } catch (AssertionError e) { + // Timed out - got no DeadLetter - this is good + break; } - }; + + // We may get DeadLetters for other messages we don't care about. + Assert.assertFalse("Unexpected DataChangedReply", + deadLetter.message() instanceof DataChangedReply); + } }}; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index cd8a658447..c6bb0d6b37 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -101,6 +101,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION; public class ShardTest extends AbstractActorTest { @@ -611,7 +612,8 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); @@ -625,10 +627,12 @@ public class ShardTest extends AbstractActorTest { // Send the ForwardedReadyTransaction for the next 2 Tx's. - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and @@ -792,10 +796,12 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the first Tx. @@ -859,7 +865,8 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message. @@ -902,7 +909,8 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message. @@ -954,7 +962,8 @@ public class ShardTest extends AbstractActorTest { TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification, preCommit); - shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION, + cohort, modification, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef()); @@ -1018,10 +1027,12 @@ public class ShardTest extends AbstractActorTest { // Ready the Tx's - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // canCommit 1st Tx. We don't send the commit so it should timeout. @@ -1080,13 +1091,16 @@ public class ShardTest extends AbstractActorTest { // Ready the Tx's - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION, + cohort3, modification3, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // canCommit 1st Tx. @@ -1149,10 +1163,12 @@ public class ShardTest extends AbstractActorTest { // Simulate the ForwardedReadyTransaction messages that would be sent // by the ShardTransaction. - shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION, + cohort1, modification1, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); - shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef()); + shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, + cohort2, modification2, true), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the first Tx. @@ -1345,7 +1361,7 @@ public class ShardTest extends AbstractActorTest { }; } - private NormalizedNode readStore(TestActorRef shard, YangInstanceIdentifier id) + static NormalizedNode readStore(TestActorRef shard, YangInstanceIdentifier id) throws ExecutionException, InterruptedException { DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 6375e3c7fb..45b00f5f31 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -10,16 +10,13 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.pattern.AskTimeoutException; -import akka.testkit.TestActorRef; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; +import java.util.Collections; +import java.util.concurrent.TimeUnit; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -32,8 +29,12 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import java.util.Collections; -import java.util.concurrent.TimeUnit; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.pattern.AskTimeoutException; +import akka.testkit.TestActorRef; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; /** * Covers negative test cases @@ -75,7 +76,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -104,7 +106,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -133,7 +136,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -162,7 +166,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -194,7 +199,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, @@ -231,7 +237,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, "testNegativeMergeTransactionReady"); @@ -263,7 +270,8 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final TestActorRef subject = TestActorRef .create(getSystem(), props, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java new file mode 100644 index 0000000000..af07aeebcf --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionHeliumBackwardsCompatibilityTest.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import java.util.Collections; +import org.junit.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.duration.FiniteDuration; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.dispatch.Dispatchers; +import akka.testkit.TestActorRef; + +/** + * Tests backwards compatibility support from Helium-1 to Helium. + * + * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the + * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction, + * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages + * would be sans transactionId so this test verifies the Shard handles that properly. + * + * @author Thomas Pantelis + */ +public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest { + + @Test + public void testTransactionCommit() throws Exception { + new ShardTestKit(getSystem()) {{ + SchemaContext schemaContext = TestModel.createTestContext(); + Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1"). + shardName("inventory").type("config").build(), + Collections.emptyMap(), + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(), + schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId()); + + final TestActorRef shard = TestActorRef.create(getSystem(), shardProps, + "testTransactionCommit"); + + waitUntilLeader(shard); + + // Send CreateTransaction message with no messages version + + String transactionID = "txn-1"; + shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder() + .setTransactionId(transactionID) + .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) + .setTransactionChainId("").build(), getRef()); + + final FiniteDuration duration = duration("5 seconds"); + + CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class); + + ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath()); + + // Write data to the Tx + + txActor.tell(new WriteData(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef()); + + expectMsgClass(duration, WriteDataReply.class); + + // Ready the Tx + + txActor.tell(new ReadyTransaction().toSerializable(), getRef()); + + ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass( + duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); + + ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath()); + + // Send the CanCommitTransaction message with no transactionId. + + cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(), + getRef()); + + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + + // Send the PreCommitTransaction message with no transactionId. + + cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(), + getRef()); + + expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS); + + // Send the CommitTransaction message with no transactionId. + + cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(), + getRef()); + + expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); + + NormalizedNode node = ShardTest.readStore(shard, TestModel.TEST_PATH); + Assert.assertNotNull("Data not found in store", node); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } + + @Test + public void testTransactionAbort() throws Exception { + new ShardTestKit(getSystem()) {{ + SchemaContext schemaContext = TestModel.createTestContext(); + Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1"). + shardName("inventory").type("config").build(), + Collections.emptyMap(), + DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(), + schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId()); + + final TestActorRef shard = TestActorRef.create(getSystem(), shardProps, + "testTransactionAbort"); + + waitUntilLeader(shard); + + // Send CreateTransaction message with no messages version + + String transactionID = "txn-1"; + shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder() + .setTransactionId(transactionID) + .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) + .setTransactionChainId("").build(), getRef()); + + final FiniteDuration duration = duration("5 seconds"); + + CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class); + + ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath()); + + // Write data to the Tx + + txActor.tell(new WriteData(TestModel.TEST_PATH, + ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef()); + + expectMsgClass(duration, WriteDataReply.class); + + // Ready the Tx + + txActor.tell(new ReadyTransaction().toSerializable(), getRef()); + + ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass( + duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); + + ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath()); + + // Send the CanCommitTransaction message with no transactionId. + + cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(), + getRef()); + + expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS); + + // Send the AbortTransaction message with no transactionId. + + cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(), + getRef()); + + expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + }}; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java index 793df8e0ca..c869be82d2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java @@ -1,12 +1,11 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorRef; -import akka.actor.Props; -import akka.actor.Terminated; -import akka.testkit.JavaTestKit; -import akka.testkit.TestActorRef; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import java.util.Collections; +import java.util.concurrent.TimeUnit; import org.junit.BeforeClass; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply; @@ -15,6 +14,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; @@ -39,12 +39,13 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; -import java.util.Collections; -import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.Terminated; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; public class ShardTransactionTest extends AbstractActorTest { private static ListeningExecutorService storeExecutor = @@ -78,12 +79,14 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW")); } @@ -114,13 +117,15 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( props, "testReadDataWhenDataNotFoundRO")); props = ShardTransaction.props( store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf( props, "testReadDataWhenDataNotFoundRW")); @@ -150,12 +155,14 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW")); } @@ -183,12 +190,14 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO")); props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW")); } @@ -228,7 +237,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testWriteData"); transaction.tell(new WriteData(TestModel.TEST_PATH, @@ -254,7 +264,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testMergeData"); transaction.tell(new MergeData(TestModel.TEST_PATH, @@ -279,7 +290,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testDeleteData"); transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef()); @@ -301,7 +313,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction"); watch(transaction); @@ -318,7 +331,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2"); watch(transaction); @@ -339,7 +353,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction"); watch(transaction); @@ -355,7 +370,8 @@ public class ShardTransactionTest extends AbstractActorTest { public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception { final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final TestActorRef transaction = TestActorRef.apply(props,getSystem()); transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender()); @@ -370,7 +386,8 @@ public class ShardTransactionTest extends AbstractActorTest { new JavaTestKit(getSystem()) {{ final ActorRef shard = createShard(); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, - testSchemaContext, datastoreContext, shardStats, "txn"); + testSchemaContext, datastoreContext, shardStats, "txn", + CreateTransaction.CURRENT_VERSION); final ActorRef transaction = getSystem().actorOf(props, "testShardTransactionInactivity"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java index 35f346f0d6..b77b0b65cf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java @@ -338,13 +338,15 @@ public class TransactionProxyTest { return getSystem().actorSelection(actorRef.path()); } - private CreateTransactionReply createTransactionReply(ActorRef actorRef){ + private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){ return CreateTransactionReply.newBuilder() .setTransactionActorPath(actorRef.path().toString()) - .setTransactionId("txn-1").build(); + .setTransactionId("txn-1") + .setMessageVersion(transactionVersion) + .build(); } - private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) { ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class)); doReturn(actorSystem.actorSelection(actorRef.path())). when(mockActorContext).actorSelection(actorRef.path().toString()); @@ -352,7 +354,7 @@ public class TransactionProxyTest { doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))). when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD)); - doReturn(Futures.successful(createTransactionReply(actorRef))).when(mockActorContext). + doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext). executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())), eqCreateTransaction(memberName, type)); @@ -361,6 +363,11 @@ public class TransactionProxyTest { return actorRef; } + private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) { + return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION); + } + + private void propagateReadFailedExceptionCause(CheckedFuture future) throws Throwable { @@ -835,6 +842,47 @@ public class TransactionProxyTest { verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); } + @SuppressWarnings("unchecked") + @Test + public void testReadyForwardCompatibility() throws Exception { + ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0); + + NormalizedNode nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + + doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedReadData()); + + doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite)); + + doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync( + eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS)); + + doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + + TransactionProxy transactionProxy = new TransactionProxy(mockActorContext, + READ_WRITE); + + transactionProxy.read(TestModel.TEST_PATH); + + transactionProxy.write(TestModel.TEST_PATH, nodeToWrite); + + DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready(); + + assertTrue(ready instanceof ThreePhaseCommitCohortProxy); + + ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready; + + verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(), + WriteDataReply.SERIALIZABLE_CLASS); + + verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path())); + + verify(mockActorContext).resolvePath(eq(actorRef.path().toString()), + eq(actorRef.path().toString())); + } + @SuppressWarnings("unchecked") @Test public void testReadyWithRecordingOperationFailure() throws Exception { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 60f9a2d9dc..39d337e91b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -2,6 +2,7 @@ package org.opendaylight.controller.cluster.datastore.utils; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActor; import akka.japi.Creator; @@ -17,7 +18,9 @@ import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; + import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -182,4 +185,51 @@ public class ActorContextTest extends AbstractActorTest{ clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/"); assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/")); } + + @Test + public void testResolvePathForRemoteActor() { + ActorContext actorContext = + new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock( + ClusterWrapper.class), + mock(Configuration.class)); + + String actual = actorContext.resolvePath( + "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard", + "akka://system/user/shardmanager/shard/transaction"); + + String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction"; + + assertEquals(expected, actual); + } + + @Test + public void testResolvePathForLocalActor() { + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class)); + + String actual = actorContext.resolvePath( + "akka://system/user/shardmanager/shard", + "akka://system/user/shardmanager/shard/transaction"); + + String expected = "akka://system/user/shardmanager/shard/transaction"; + + assertEquals(expected, actual); + } + + @Test + public void testResolvePathForRemoteActorWithProperRemoteAddress() { + ActorContext actorContext = + new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), + mock(Configuration.class)); + + String actual = actorContext.resolvePath( + "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard", + "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"); + + String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction"; + + assertEquals(expected, actual); + } + }