<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-inmemory-datastore</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-broker-impl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </dependency>
</dependencies>
<build>
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.benchmark;
+
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Warmup;
+
+/**
+ * @author Lukas Sedlak <lsedlak@cisco.com>
+ */
+public abstract class AbstractInMemoryBrokerWriteTransactionBenchmark extends AbstractInMemoryWriteTransactionBenchmark {
+
+ protected DOMDataBrokerImpl domBroker;
+
+ protected void initTestNode() throws Exception {
+ final YangInstanceIdentifier testPath = YangInstanceIdentifier.builder(BenchmarkModel.TEST_PATH)
+ .build();
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, testPath, provideOuterListNode());
+
+ writeTx.submit().get();
+ }
+
+ @Benchmark
+ @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ public void write100KSingleNodeWithOneInnerItemInOneCommitBenchmark() throws Exception {
+
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ for (int outerListKey = 0; outerListKey < OUTER_LIST_100K; ++outerListKey) {
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_100K_PATHS[outerListKey], OUTER_LIST_ONE_ITEM_INNER_LIST[outerListKey]);
+ }
+
+ writeTx.submit().get();
+ }
+
+ @Benchmark
+ @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ public void write100KSingleNodeWithOneInnerItemInCommitPerWriteBenchmark() throws Exception {
+ for (int outerListKey = 0; outerListKey < OUTER_LIST_100K; ++outerListKey) {
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_100K_PATHS[outerListKey], OUTER_LIST_ONE_ITEM_INNER_LIST[outerListKey]);
+
+ writeTx.submit().get();
+ }
+ }
+
+ @Benchmark
+ @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ public void write50KSingleNodeWithTwoInnerItemsInOneCommitBenchmark() throws Exception {
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ for (int outerListKey = 0; outerListKey < OUTER_LIST_50K; ++outerListKey) {
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_50K_PATHS[outerListKey], OUTER_LIST_TWO_ITEM_INNER_LIST[outerListKey]);
+ }
+
+ writeTx.submit().get();
+ }
+
+ @Benchmark
+ @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ public void write50KSingleNodeWithTwoInnerItemsInCommitPerWriteBenchmark() throws Exception {
+ for (int outerListKey = 0; outerListKey < OUTER_LIST_50K; ++outerListKey) {
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_50K_PATHS[outerListKey], OUTER_LIST_TWO_ITEM_INNER_LIST[outerListKey]);
+ writeTx.submit().get();
+ }
+ }
+
+ @Benchmark
+ @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ public void write10KSingleNodeWithTenInnerItemsInOneCommitBenchmark() throws Exception {
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ for (int outerListKey = 0; outerListKey < OUTER_LIST_10K; ++outerListKey) {
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_10K_PATHS[outerListKey], OUTER_LIST_TEN_ITEM_INNER_LIST[outerListKey]);
+ }
+ writeTx.submit().get();
+ }
+
+ @Benchmark
+ @Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ @Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
+ public void write10KSingleNodeWithTenInnerItemsInCommitPerWriteBenchmark() throws Exception {
+ for (int outerListKey = 0; outerListKey < OUTER_LIST_10K; ++outerListKey) {
+ DOMDataReadWriteTransaction writeTx = domBroker.newReadWriteTransaction();
+ writeTx.put(LogicalDatastoreType.OPERATIONAL, OUTER_LIST_10K_PATHS[outerListKey], OUTER_LIST_TEN_ITEM_INNER_LIST[outerListKey]);
+ writeTx.submit().get();
+ }
+ }
+}
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Warmup;
/**
* @author Lukas Sedlak <lsedlak@cisco.com>
*/
-public abstract class AbstractInMemoryDatastoreWriteTransactionBenchmark {
+public abstract class AbstractInMemoryDatastoreWriteTransactionBenchmark extends AbstractInMemoryWriteTransactionBenchmark {
- private static final int WARMUP_ITERATIONS = 20;
- private static final int MEASUREMENT_ITERATIONS = 20;
-
- private static final int OUTER_LIST_100K = 100000;
- private static final int OUTER_LIST_50K = 50000;
- private static final int OUTER_LIST_10K = 10000;
-
- private static final YangInstanceIdentifier[] OUTER_LIST_100K_PATHS = initOuterListPaths(OUTER_LIST_100K);
- private static final YangInstanceIdentifier[] OUTER_LIST_50K_PATHS = initOuterListPaths(OUTER_LIST_50K);
- private static final YangInstanceIdentifier[] OUTER_LIST_10K_PATHS = initOuterListPaths(OUTER_LIST_10K);
-
- private static YangInstanceIdentifier[] initOuterListPaths(final int outerListPathsCount) {
- final YangInstanceIdentifier[] paths = new YangInstanceIdentifier[outerListPathsCount];
-
- for (int outerListKey = 0; outerListKey < outerListPathsCount; ++outerListKey) {
- paths[outerListKey] = YangInstanceIdentifier.builder(BenchmarkModel.OUTER_LIST_PATH)
- .nodeWithKey(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
- .build();
- }
- return paths;
- }
-
- private static final MapNode ONE_ITEM_INNER_LIST = initInnerListItems(1);
- private static final MapNode TWO_ITEM_INNER_LIST = initInnerListItems(2);
- private static final MapNode TEN_ITEM_INNER_LIST = initInnerListItems(10);
-
- private static MapNode initInnerListItems(final int count) {
- final CollectionNodeBuilder<MapEntryNode, MapNode> mapEntryBuilder = ImmutableNodes
- .mapNodeBuilder(BenchmarkModel.INNER_LIST_QNAME);
-
- for (int i = 1; i <= count; ++i) {
- mapEntryBuilder
- .withChild(ImmutableNodes.mapEntry(BenchmarkModel.INNER_LIST_QNAME, BenchmarkModel.NAME_QNAME, i));
- }
- return mapEntryBuilder.build();
- }
-
- private static final NormalizedNode<?, ?>[] OUTER_LIST_ONE_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_100K, ONE_ITEM_INNER_LIST);
- private static final NormalizedNode<?, ?>[] OUTER_LIST_TWO_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_50K, TWO_ITEM_INNER_LIST);
- private static final NormalizedNode<?, ?>[] OUTER_LIST_TEN_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_10K, TEN_ITEM_INNER_LIST);
-
- private static NormalizedNode<?,?>[] initOuterListItems(int outerListItemsCount, MapNode innerList) {
- final NormalizedNode<?,?>[] outerListItems = new NormalizedNode[outerListItemsCount];
-
- for (int i = 0; i < outerListItemsCount; ++i) {
- int outerListKey = i;
- outerListItems[i] = ImmutableNodes.mapEntryBuilder(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
- .withChild(innerList).build();
- }
- return outerListItems;
- }
-
- protected SchemaContext schemaContext;
protected InMemoryDOMDataStore domStore;
- abstract public void setUp() throws Exception;
-
- abstract public void tearDown();
-
protected void initTestNode() throws Exception {
final YangInstanceIdentifier testPath = YangInstanceIdentifier.builder(BenchmarkModel.TEST_PATH)
.build();
cohort.commit().get();
}
- private DataContainerChild<?, ?> provideOuterListNode() {
- return ImmutableContainerNodeBuilder
- .create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BenchmarkModel.TEST_QNAME))
- .withChild(
- ImmutableNodes.mapNodeBuilder(BenchmarkModel.OUTER_LIST_QNAME)
- .build()).build();
- }
-
@Benchmark
@Warmup(iterations = WARMUP_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = MEASUREMENT_ITERATIONS, timeUnit = TimeUnit.MILLISECONDS)
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.benchmark;
+
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public abstract class AbstractInMemoryWriteTransactionBenchmark {
+ protected static final int OUTER_LIST_100K = 100000;
+ protected static final int OUTER_LIST_50K = 50000;
+ protected static final int OUTER_LIST_10K = 10000;
+
+ protected static final YangInstanceIdentifier[] OUTER_LIST_100K_PATHS = initOuterListPaths(OUTER_LIST_100K);
+ protected static final YangInstanceIdentifier[] OUTER_LIST_50K_PATHS = initOuterListPaths(OUTER_LIST_50K);
+ protected static final YangInstanceIdentifier[] OUTER_LIST_10K_PATHS = initOuterListPaths(OUTER_LIST_10K);
+
+ private static YangInstanceIdentifier[] initOuterListPaths(final int outerListPathsCount) {
+ final YangInstanceIdentifier[] paths = new YangInstanceIdentifier[outerListPathsCount];
+
+ for (int outerListKey = 0; outerListKey < outerListPathsCount; ++outerListKey) {
+ paths[outerListKey] = YangInstanceIdentifier.builder(BenchmarkModel.OUTER_LIST_PATH)
+ .nodeWithKey(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
+ .build();
+ }
+ return paths;
+ }
+
+ protected static final int WARMUP_ITERATIONS = 20;
+ protected static final int MEASUREMENT_ITERATIONS = 20;
+
+ protected static final MapNode ONE_ITEM_INNER_LIST = initInnerListItems(1);
+ protected static final MapNode TWO_ITEM_INNER_LIST = initInnerListItems(2);
+ protected static final MapNode TEN_ITEM_INNER_LIST = initInnerListItems(10);
+
+ private static MapNode initInnerListItems(final int count) {
+ final CollectionNodeBuilder<MapEntryNode, MapNode> mapEntryBuilder = ImmutableNodes
+ .mapNodeBuilder(BenchmarkModel.INNER_LIST_QNAME);
+
+ for (int i = 1; i <= count; ++i) {
+ mapEntryBuilder
+ .withChild(ImmutableNodes.mapEntry(BenchmarkModel.INNER_LIST_QNAME, BenchmarkModel.NAME_QNAME, i));
+ }
+ return mapEntryBuilder.build();
+ }
+
+ protected static final NormalizedNode<?, ?>[] OUTER_LIST_ONE_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_100K, ONE_ITEM_INNER_LIST);
+ protected static final NormalizedNode<?, ?>[] OUTER_LIST_TWO_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_50K, TWO_ITEM_INNER_LIST);
+ protected static final NormalizedNode<?, ?>[] OUTER_LIST_TEN_ITEM_INNER_LIST = initOuterListItems(OUTER_LIST_10K, TEN_ITEM_INNER_LIST);
+
+ private static NormalizedNode<?,?>[] initOuterListItems(final int outerListItemsCount, final MapNode innerList) {
+ final NormalizedNode<?,?>[] outerListItems = new NormalizedNode[outerListItemsCount];
+
+ for (int i = 0; i < outerListItemsCount; ++i) {
+ int outerListKey = i;
+ outerListItems[i] = ImmutableNodes.mapEntryBuilder(BenchmarkModel.OUTER_LIST_QNAME, BenchmarkModel.ID_QNAME, outerListKey)
+ .withChild(innerList).build();
+ }
+ return outerListItems;
+ }
+
+ protected SchemaContext schemaContext;
+ abstract public void setUp() throws Exception;
+ abstract public void tearDown();
+
+ protected static DataContainerChild<?, ?> provideOuterListNode() {
+ return ImmutableContainerNodeBuilder
+ .create()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(BenchmarkModel.TEST_QNAME))
+ .withChild(
+ ImmutableNodes.mapNodeBuilder(BenchmarkModel.OUTER_LIST_QNAME)
+ .build()).build();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.store.benchmark;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMDataBrokerImpl;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+@State(Scope.Thread)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(1)
+public class InMemoryBrokerWriteTransactionBenchmark extends AbstractInMemoryBrokerWriteTransactionBenchmark {
+ private ListeningExecutorService executor;
+
+ @Setup(Level.Trial)
+ @Override
+ public void setUp() throws Exception {
+ ListeningExecutorService dsExec = MoreExecutors.sameThreadExecutor();
+ executor = MoreExecutors.listeningDecorator(
+ MoreExecutors.getExitingExecutorService((ThreadPoolExecutor)Executors.newFixedThreadPool(1), 1L, TimeUnit.SECONDS));
+
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", dsExec,
+ MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", dsExec,
+ MoreExecutors.sameThreadExecutor());
+ Map<LogicalDatastoreType, DOMStore> datastores = ImmutableMap.of(
+ LogicalDatastoreType.OPERATIONAL, (DOMStore)operStore,
+ LogicalDatastoreType.CONFIGURATION, configStore);
+
+ domBroker = new DOMDataBrokerImpl(datastores, executor);
+ schemaContext = BenchmarkModel.createTestContext();
+ configStore.onGlobalContextUpdated(schemaContext);
+ operStore.onGlobalContextUpdated(schemaContext);
+ initTestNode();
+ }
+
+ @Override
+ public void tearDown() {
+ domBroker.close();
+ executor.shutdown();
+ }
+}
TestActorRef<MockRaftActor> mockActorRef = TestActorRef.create(getSystem(), MockRaftActor.props(persistenceId,
Collections.EMPTY_MAP, Optional.<ConfigParams>of(config)), persistenceId);
+ MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+
+ // Wait for akka's recovery to complete so it doesn't interfere.
+ mockRaftActor.waitForRecoveryComplete();
+
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
new MockRaftActorContext.MockPayload("B"),
Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
Lists.<ReplicatedLogEntry>newArrayList(), 3, 1 ,3, 1);
- MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
mockRaftActor.onReceiveRecover(new SnapshotOffer(new SnapshotMetadata(persistenceId, 100, 100), snapshot));
CountDownLatch applyRecoverySnapshotLatch = mockRaftActor.getApplyRecoverySnapshotLatch();
mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
- mockRaftActor.waitForRecoveryComplete();
-
mockActorRef.tell(PoisonPill.getInstance(), getRef());
}};
MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
+ // Wait for akka's recovery to complete so it doesn't interfere.
+ mockRaftActor.waitForRecoveryComplete();
+
ByteString snapshotBytes = fromObject(Arrays.asList(
new MockRaftActorContext.MockPayload("A"),
new MockRaftActorContext.MockPayload("B"),
mockRaftActor.onReceiveRecover(mock(RecoveryCompleted.class));
- mockRaftActor.waitForRecoveryComplete();
-
mockActorRef.tell(PoisonPill.getInstance(), getRef());
}};
}
public interface CanCommitTransactionOrBuilder
extends com.google.protobuf.MessageOrBuilder {
- // required string transactionId = 1;
+ // optional string transactionId = 1;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
boolean hasTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
java.lang.String getTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
com.google.protobuf.ByteString
getTransactionIdBytes();
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
public static final int TRANSACTIONID_FIELD_NUMBER = 1;
private java.lang.Object transactionId_;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
- if (!hasTransactionId()) {
- memoizedIsInitialized = 0;
- return false;
- }
memoizedIsInitialized = 1;
return true;
}
}
public final boolean isInitialized() {
- if (!hasTransactionId()) {
-
- return false;
- }
return true;
}
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
private java.lang.Object transactionId_ = "";
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionId(
java.lang.String value) {
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder clearTransactionId() {
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionIdBytes(
com.google.protobuf.ByteString value) {
public interface AbortTransactionOrBuilder
extends com.google.protobuf.MessageOrBuilder {
- // required string transactionId = 1;
+ // optional string transactionId = 1;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
boolean hasTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
java.lang.String getTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
com.google.protobuf.ByteString
getTransactionIdBytes();
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
public static final int TRANSACTIONID_FIELD_NUMBER = 1;
private java.lang.Object transactionId_;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
- if (!hasTransactionId()) {
- memoizedIsInitialized = 0;
- return false;
- }
memoizedIsInitialized = 1;
return true;
}
}
public final boolean isInitialized() {
- if (!hasTransactionId()) {
-
- return false;
- }
return true;
}
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
private java.lang.Object transactionId_ = "";
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionId(
java.lang.String value) {
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder clearTransactionId() {
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionIdBytes(
com.google.protobuf.ByteString value) {
public interface CommitTransactionOrBuilder
extends com.google.protobuf.MessageOrBuilder {
- // required string transactionId = 1;
+ // optional string transactionId = 1;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
boolean hasTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
java.lang.String getTransactionId();
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
com.google.protobuf.ByteString
getTransactionIdBytes();
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
public static final int TRANSACTIONID_FIELD_NUMBER = 1;
private java.lang.Object transactionId_;
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
- if (!hasTransactionId()) {
- memoizedIsInitialized = 0;
- return false;
- }
memoizedIsInitialized = 1;
return true;
}
}
public final boolean isInitialized() {
- if (!hasTransactionId()) {
-
- return false;
- }
return true;
}
}
private int bitField0_;
- // required string transactionId = 1;
+ // optional string transactionId = 1;
private java.lang.Object transactionId_ = "";
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public boolean hasTransactionId() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public java.lang.String getTransactionId() {
java.lang.Object ref = transactionId_;
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public com.google.protobuf.ByteString
getTransactionIdBytes() {
}
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionId(
java.lang.String value) {
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder clearTransactionId() {
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
/**
- * <code>required string transactionId = 1;</code>
+ * <code>optional string transactionId = 1;</code>
*/
public Builder setTransactionIdBytes(
com.google.protobuf.ByteString value) {
java.lang.String[] descriptorData = {
"\n\014Cohort.proto\022!org.opendaylight.control" +
"ler.mdsal\"-\n\024CanCommitTransaction\022\025\n\rtra" +
- "nsactionId\030\001 \002(\t\".\n\031CanCommitTransaction" +
+ "nsactionId\030\001 \001(\t\".\n\031CanCommitTransaction" +
"Reply\022\021\n\tcanCommit\030\001 \002(\010\")\n\020AbortTransac" +
- "tion\022\025\n\rtransactionId\030\001 \002(\t\"\027\n\025AbortTran" +
+ "tion\022\025\n\rtransactionId\030\001 \001(\t\"\027\n\025AbortTran" +
"sactionReply\"*\n\021CommitTransaction\022\025\n\rtra" +
- "nsactionId\030\001 \002(\t\"\030\n\026CommitTransactionRep" +
+ "nsactionId\030\001 \001(\t\"\030\n\026CommitTransactionRep" +
"ly\"\026\n\024PreCommitTransaction\"\033\n\031PreCommitT" +
"ransactionReplyBZ\n8org.opendaylight.cont" +
"roller.protobuff.messages.cohort3pcB\036Thr",
*/
com.google.protobuf.ByteString
getTransactionChainIdBytes();
+
+ // optional int32 messageVersion = 4;
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ boolean hasMessageVersion();
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ int getMessageVersion();
}
/**
* Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransaction}
transactionChainId_ = input.readBytes();
break;
}
+ case 32: {
+ bitField0_ |= 0x00000008;
+ messageVersion_ = input.readInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
}
}
+ // optional int32 messageVersion = 4;
+ public static final int MESSAGEVERSION_FIELD_NUMBER = 4;
+ private int messageVersion_;
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public boolean hasMessageVersion() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public int getMessageVersion() {
+ return messageVersion_;
+ }
+
private void initFields() {
transactionId_ = "";
transactionType_ = 0;
transactionChainId_ = "";
+ messageVersion_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, getTransactionChainIdBytes());
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeInt32(4, messageVersion_);
+ }
getUnknownFields().writeTo(output);
}
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(3, getTransactionChainIdBytes());
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(4, messageVersion_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
bitField0_ = (bitField0_ & ~0x00000002);
transactionChainId_ = "";
bitField0_ = (bitField0_ & ~0x00000004);
+ messageVersion_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
to_bitField0_ |= 0x00000004;
}
result.transactionChainId_ = transactionChainId_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.messageVersion_ = messageVersion_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
transactionChainId_ = other.transactionChainId_;
onChanged();
}
+ if (other.hasMessageVersion()) {
+ setMessageVersion(other.getMessageVersion());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
return this;
}
+ // optional int32 messageVersion = 4;
+ private int messageVersion_ ;
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public boolean hasMessageVersion() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public int getMessageVersion() {
+ return messageVersion_;
+ }
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public Builder setMessageVersion(int value) {
+ bitField0_ |= 0x00000008;
+ messageVersion_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 messageVersion = 4;</code>
+ */
+ public Builder clearMessageVersion() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ messageVersion_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransaction)
}
*/
com.google.protobuf.ByteString
getTransactionIdBytes();
+
+ // optional int32 messageVersion = 3;
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ boolean hasMessageVersion();
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ int getMessageVersion();
}
/**
* Protobuf type {@code org.opendaylight.controller.mdsal.CreateTransactionReply}
transactionId_ = input.readBytes();
break;
}
+ case 24: {
+ bitField0_ |= 0x00000004;
+ messageVersion_ = input.readInt32();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
}
}
+ // optional int32 messageVersion = 3;
+ public static final int MESSAGEVERSION_FIELD_NUMBER = 3;
+ private int messageVersion_;
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public boolean hasMessageVersion() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public int getMessageVersion() {
+ return messageVersion_;
+ }
+
private void initFields() {
transactionActorPath_ = "";
transactionId_ = "";
+ messageVersion_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBytes(2, getTransactionIdBytes());
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeInt32(3, messageVersion_);
+ }
getUnknownFields().writeTo(output);
}
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(2, getTransactionIdBytes());
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(3, messageVersion_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
bitField0_ = (bitField0_ & ~0x00000001);
transactionId_ = "";
bitField0_ = (bitField0_ & ~0x00000002);
+ messageVersion_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
to_bitField0_ |= 0x00000002;
}
result.transactionId_ = transactionId_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.messageVersion_ = messageVersion_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
transactionId_ = other.transactionId_;
onChanged();
}
+ if (other.hasMessageVersion()) {
+ setMessageVersion(other.getMessageVersion());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
return this;
}
+ // optional int32 messageVersion = 3;
+ private int messageVersion_ ;
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public boolean hasMessageVersion() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public int getMessageVersion() {
+ return messageVersion_;
+ }
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public Builder setMessageVersion(int value) {
+ bitField0_ |= 0x00000004;
+ messageVersion_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional int32 messageVersion = 3;</code>
+ */
+ public Builder clearMessageVersion() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ messageVersion_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:org.opendaylight.controller.mdsal.CreateTransactionReply)
}
java.lang.String[] descriptorData = {
"\n\026ShardTransaction.proto\022!org.opendaylig" +
"ht.controller.mdsal\032\014Common.proto\"\022\n\020Clo" +
- "seTransaction\"\027\n\025CloseTransactionReply\"_" +
+ "seTransaction\"\027\n\025CloseTransactionReply\"w" +
"\n\021CreateTransaction\022\025\n\rtransactionId\030\001 \002" +
"(\t\022\027\n\017transactionType\030\002 \002(\005\022\032\n\022transacti" +
- "onChainId\030\003 \001(\t\"M\n\026CreateTransactionRepl" +
- "y\022\034\n\024transactionActorPath\030\001 \002(\t\022\025\n\rtrans" +
- "actionId\030\002 \002(\t\"\022\n\020ReadyTransaction\"*\n\025Re" +
- "adyTransactionReply\022\021\n\tactorPath\030\001 \002(\t\"l" +
- "\n\nDeleteData\022^\n\037instanceIdentifierPathAr",
+ "onChainId\030\003 \001(\t\022\026\n\016messageVersion\030\004 \001(\005\"" +
+ "e\n\026CreateTransactionReply\022\034\n\024transaction" +
+ "ActorPath\030\001 \002(\t\022\025\n\rtransactionId\030\002 \002(\t\022\026" +
+ "\n\016messageVersion\030\003 \001(\005\"\022\n\020ReadyTransacti" +
+ "on\"*\n\025ReadyTransactionReply\022\021\n\tactorPath",
+ "\030\001 \002(\t\"l\n\nDeleteData\022^\n\037instanceIdentifi" +
+ "erPathArguments\030\001 \002(\01325.org.opendaylight" +
+ ".controller.mdsal.InstanceIdentifier\"\021\n\017" +
+ "DeleteDataReply\"j\n\010ReadData\022^\n\037instanceI" +
+ "dentifierPathArguments\030\001 \002(\01325.org.opend" +
+ "aylight.controller.mdsal.InstanceIdentif" +
+ "ier\"P\n\rReadDataReply\022?\n\016normalizedNode\030\001" +
+ " \001(\0132\'.org.opendaylight.controller.mdsal" +
+ ".Node\"\254\001\n\tWriteData\022^\n\037instanceIdentifie" +
+ "rPathArguments\030\001 \002(\01325.org.opendaylight.",
+ "controller.mdsal.InstanceIdentifier\022?\n\016n" +
+ "ormalizedNode\030\002 \002(\0132\'.org.opendaylight.c" +
+ "ontroller.mdsal.Node\"\020\n\016WriteDataReply\"\254" +
+ "\001\n\tMergeData\022^\n\037instanceIdentifierPathAr" +
"guments\030\001 \002(\01325.org.opendaylight.control" +
- "ler.mdsal.InstanceIdentifier\"\021\n\017DeleteDa" +
- "taReply\"j\n\010ReadData\022^\n\037instanceIdentifie" +
- "rPathArguments\030\001 \002(\01325.org.opendaylight." +
- "controller.mdsal.InstanceIdentifier\"P\n\rR" +
- "eadDataReply\022?\n\016normalizedNode\030\001 \001(\0132\'.o" +
- "rg.opendaylight.controller.mdsal.Node\"\254\001" +
- "\n\tWriteData\022^\n\037instanceIdentifierPathArg" +
- "uments\030\001 \002(\01325.org.opendaylight.controll" +
- "er.mdsal.InstanceIdentifier\022?\n\016normalize",
- "dNode\030\002 \002(\0132\'.org.opendaylight.controlle" +
- "r.mdsal.Node\"\020\n\016WriteDataReply\"\254\001\n\tMerge" +
- "Data\022^\n\037instanceIdentifierPathArguments\030" +
- "\001 \002(\01325.org.opendaylight.controller.mdsa" +
- "l.InstanceIdentifier\022?\n\016normalizedNode\030\002" +
- " \002(\0132\'.org.opendaylight.controller.mdsal" +
- ".Node\"\020\n\016MergeDataReply\"l\n\nDataExists\022^\n" +
- "\037instanceIdentifierPathArguments\030\001 \002(\01325" +
- ".org.opendaylight.controller.mdsal.Insta" +
- "nceIdentifier\"!\n\017DataExistsReply\022\016\n\006exis",
- "ts\030\001 \002(\010BV\n:org.opendaylight.controller." +
- "protobuff.messages.transactionB\030ShardTra" +
- "nsactionMessages"
+ "ler.mdsal.InstanceIdentifier\022?\n\016normaliz" +
+ "edNode\030\002 \002(\0132\'.org.opendaylight.controll" +
+ "er.mdsal.Node\"\020\n\016MergeDataReply\"l\n\nDataE" +
+ "xists\022^\n\037instanceIdentifierPathArguments" +
+ "\030\001 \002(\01325.org.opendaylight.controller.mds",
+ "al.InstanceIdentifier\"!\n\017DataExistsReply" +
+ "\022\016\n\006exists\030\001 \002(\010BV\n:org.opendaylight.con" +
+ "troller.protobuff.messages.transactionB\030" +
+ "ShardTransactionMessages"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
internal_static_org_opendaylight_controller_mdsal_CreateTransaction_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_mdsal_CreateTransaction_descriptor,
- new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", });
+ new java.lang.String[] { "TransactionId", "TransactionType", "TransactionChainId", "MessageVersion", });
internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor =
getDescriptor().getMessageTypes().get(3);
internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_org_opendaylight_controller_mdsal_CreateTransactionReply_descriptor,
- new java.lang.String[] { "TransactionActorPath", "TransactionId", });
+ new java.lang.String[] { "TransactionActorPath", "TransactionId", "MessageVersion", });
internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_descriptor =
getDescriptor().getMessageTypes().get(4);
internal_static_org_opendaylight_controller_mdsal_ReadyTransaction_fieldAccessorTable = new
message CanCommitTransaction{
- required string transactionId = 1;
+ optional string transactionId = 1;
}
message CanCommitTransactionReply{
}
message AbortTransaction{
- required string transactionId = 1;
+ optional string transactionId = 1;
}
message AbortTransactionReply {
}
message CommitTransaction{
- required string transactionId = 1;
+ optional string transactionId = 1;
}
message CommitTransactionReply{
required string transactionId = 1;
required int32 transactionType =2;
optional string transactionChainId = 3;
+ optional int32 messageVersion = 4;
}
message CreateTransactionReply{
-required string transactionActorPath = 1;
-required string transactionId = 2;
-
+ required string transactionActorPath = 1;
+ required string transactionId = 2;
+ optional int32 messageVersion = 3;
}
message ReadyTransaction{
}
message ReadyTransactionReply{
-required string actorPath = 1;
+ required string actorPath = 1;
}
message DeleteData {
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
+import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
}
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
- LOG.debug("Readying transaction {}", ready.getTransactionID());
+ LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(),
+ ready.getTxnClientVersion());
// This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
// commitCoordinator in preparation for the subsequent three phase commit initiated by
commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
ready.getModification());
- // Return our actor path as we'll handle the three phase commit.
- ReadyTransactionReply readyTransactionReply =
- new ReadyTransactionReply(Serialization.serializedActorPath(self()));
- getSender().tell(
- ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply,
- getSelf());
+ // Return our actor path as we'll handle the three phase commit, except if the Tx client
+ // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
+ // node. In that case, the subsequent 3-phase commit messages won't contain the
+ // transactionId so to maintain backwards compatibility, we create a separate cohort actor
+ // to provide the compatible behavior.
+ ActorRef replyActorPath = self();
+ if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+ LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
+ replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ready.getTransactionID()));
+ }
+
+ ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
+ Serialization.serializedActorPath(replyActorPath));
+ getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+ readyTransactionReply, getSelf());
}
private void handleAbortTransaction(AbortTransaction abort) {
}
}
- private ActorRef createTypedTransactionActor(
- int transactionType,
- ShardTransactionIdentifier transactionId,
- String transactionChainId ) {
+ private ActorRef createTypedTransactionActor(int transactionType,
+ ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
DOMStoreTransactionFactory factory = store;
return getContext().actorOf(
ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
schemaContext,datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId()), transactionId.toString());
+ transactionId.getRemoteTransactionId(), clientVersion),
+ transactionId.toString());
} else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return getContext().actorOf(
ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId()), transactionId.toString());
+ transactionId.getRemoteTransactionId(), clientVersion),
+ transactionId.toString());
} else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId()), transactionId.toString());
+ transactionId.getRemoteTransactionId(), clientVersion),
+ transactionId.toString());
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
private void createTransaction(CreateTransaction createTransaction) {
createTransaction(createTransaction.getTransactionType(),
- createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+ createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+ createTransaction.getVersion());
}
- private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
+ private ActorRef createTransaction(int transactionType, String remoteTransactionId,
+ String transactionChainId, int clientVersion) {
ShardTransactionIdentifier transactionId =
ShardTransactionIdentifier.builder()
if(LOG.isDebugEnabled()) {
LOG.debug("Creating transaction : {} ", transactionId);
}
- ActorRef transactionActor =
- createTypedTransactionActor(transactionType, transactionId, transactionChainId);
+ ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
+ transactionChainId, clientVersion);
getSender()
.tell(new CreateTransactionReply(
LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
listenerRegistration.path());
- getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf());
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
}
private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
// so that this actor does not get block building the snapshot
createSnapshotTransaction = createTransaction(
TransactionProxy.TransactionType.READ_ONLY.ordinal(),
- "createSnapshot" + ++createSnapshotTransactionCounter, "");
+ "createSnapshot" + ++createSnapshotTransactionCounter, "",
+ CreateTransaction.CURRENT_VERSION);
createSnapshotTransaction.tell(
new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());
private final DOMStoreReadTransaction transaction;
public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
- super(shardActor, schemaContext, shardStats, transactionID);
+ SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+ int txnClientVersion) {
+ super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
this.transaction = transaction;
}
private final DOMStoreReadWriteTransaction transaction;
public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
- super(transaction, shardActor, schemaContext, shardStats, transactionID);
+ SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+ int txnClientVersion) {
+ super(transaction, shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
this.transaction = transaction;
}
*/
public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
+ protected static final boolean SERIALIZED_REPLY = true;
+
private final ActorRef shardActor;
private final SchemaContext schemaContext;
private final ShardStats shardStats;
private final String transactionID;
- protected static final boolean SERIALIZED_REPLY = true;
+ private final int txnClientVersion;
protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
- ShardStats shardStats, String transactionID) {
+ ShardStats shardStats, String transactionID, int txnClientVersion) {
super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
this.shardActor = shardActor;
this.schemaContext = schemaContext;
this.shardStats = shardStats;
this.transactionID = transactionID;
+ this.txnClientVersion = txnClientVersion;
}
public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
- String transactionID) {
+ String transactionID, int txnClientVersion) {
return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
- datastoreContext, shardStats, transactionID));
+ datastoreContext, shardStats, transactionID, txnClientVersion));
}
protected abstract DOMStoreTransaction getDOMStoreTransaction();
return schemaContext;
}
+ protected int getTxnClientVersion() {
+ return txnClientVersion;
+ }
+
@Override
public void handleReceive(Object message) throws Exception {
if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
final DatastoreContext datastoreContext;
final ShardStats shardStats;
final String transactionID;
+ final int txnClientVersion;
ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
SchemaContext schemaContext, DatastoreContext datastoreContext,
- ShardStats shardStats, String transactionID) {
+ ShardStats shardStats, String transactionID, int txnClientVersion) {
this.transaction = transaction;
this.shardActor = shardActor;
this.shardStats = shardStats;
this.schemaContext = schemaContext;
this.datastoreContext = datastoreContext;
this.transactionID = transactionID;
+ this.txnClientVersion = txnClientVersion;
}
@Override
ShardTransaction tx;
if(transaction instanceof DOMStoreReadWriteTransaction) {
tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
- shardActor, schemaContext, shardStats, transactionID);
+ shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
} else if(transaction instanceof DOMStoreReadTransaction) {
tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
- schemaContext, shardStats, transactionID);
+ schemaContext, shardStats, transactionID, txnClientVersion);
} else {
tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
- shardActor, schemaContext, shardStats, transactionID);
+ shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
}
tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
return getContext().actorOf(
ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId()), transactionName);
+ createTransaction.getTransactionId(),
+ createTransaction.getVersion()), transactionName);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId()), transactionName);
+ createTransaction.getTransactionId(),
+ createTransaction.getVersion()), transactionName);
} else if (createTransaction.getTransactionType() ==
TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
return getContext().actorOf(
ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
schemaContext, datastoreContext, shardStats,
- createTransaction.getTransactionId()), transactionName);
+ createTransaction.getTransactionId(),
+ createTransaction.getVersion()), transactionName);
} else {
throw new IllegalArgumentException (
"CreateTransaction message has unidentified transaction type=" +
private final DOMStoreWriteTransaction transaction;
public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
- SchemaContext schemaContext, ShardStats shardStats, String transactionID) {
- super(shardActor, schemaContext, shardStats, transactionID);
+ SchemaContext schemaContext, ShardStats shardStats, String transactionID,
+ int txnClientVersion) {
+ super(shardActor, schemaContext, shardStats, transactionID, txnClientVersion);
this.transaction = transaction;
}
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
- getShardActor().forward(new ForwardedReadyTransaction(transactionID, cohort, modification,
- returnSerialized), getContext());
+ getShardActor().forward(new ForwardedReadyTransaction(transactionID, getTxnClientVersion(),
+ cohort, modification, returnSerialized), getContext());
// The shard will handle the commit from here so we're no longer needed - self-destruct.
getSelf().tell(PoisonPill.getInstance(), getSelf());
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.concurrent.GuardedBy;
/**
* TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
- transactionContext = new TransactionContextImpl(transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal);
+ transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
+ actorContext, schemaContext, isTxActorLocal, reply.getVersion());
}
}
private final ActorContext actorContext;
private final SchemaContext schemaContext;
+ private final String transactionPath;
private final ActorSelection actor;
private final boolean isTxActorLocal;
+ private final int remoteTransactionVersion;
- private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
+ private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
ActorContext actorContext, SchemaContext schemaContext,
- boolean isTxActorLocal) {
+ boolean isTxActorLocal, int remoteTransactionVersion) {
super(identifier);
+ this.transactionPath = transactionPath;
this.actor = actor;
this.actorContext = actorContext;
this.schemaContext = schemaContext;
this.isTxActorLocal = isTxActorLocal;
+ this.remoteTransactionVersion = remoteTransactionVersion;
}
private ActorSelection getActor() {
} else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
- return actorContext.actorSelection(reply.getCohortPath());
+ String cohortPath = reply.getCohortPath();
+
+ // In Helium we used to return the local path of the actor which represented
+ // a remote ThreePhaseCommitCohort. The local path would then be converted to
+ // a remote path using this resolvePath method. To maintain compatibility with
+ // a Helium node we need to continue to do this conversion.
+ // At some point in the future when upgrades from Helium are not supported
+ // we could remove this code to resolvePath and just use the cohortPath as the
+ // resolved cohortPath
+ if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
+ cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
+ }
+
+ return actorContext.actorSelection(cohortPath);
} else {
// Throwing an exception here will fail the Future.
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.compat;
+
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+
+/**
+ * An actor to maintain backwards compatibility for the base Helium version where the 3-phase commit
+ * messages don't contain the transactionId. This actor just forwards a new message containing the
+ * transactionId to the parent Shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class BackwardsCompatibleThreePhaseCommitCohort extends AbstractUntypedActor {
+
+ private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
+
+ private final String transactionId;
+
+ private BackwardsCompatibleThreePhaseCommitCohort(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CanCommitTransaction");
+
+ getContext().parent().forward(new CanCommitTransaction(transactionId).toSerializable(),
+ getContext());
+ } else if(message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
+ LOG.debug("BackwardsCompatibleThreePhaseCommitCohort PreCommitTransaction");
+
+ // The Shard doesn't need the PreCommitTransaction message so just return the reply here.
+ getSender().tell(new PreCommitTransactionReply().toSerializable(), self());
+ } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ LOG.debug("BackwardsCompatibleThreePhaseCommitCohort CommitTransaction");
+
+ getContext().parent().forward(new CommitTransaction(transactionId).toSerializable(),
+ getContext());
+
+ // We're done now - we can self-destruct
+ self().tell(PoisonPill.getInstance(), self());
+ } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ LOG.debug("BackwardsCompatibleThreePhaseCommitCohort AbortTransaction");
+
+ getContext().parent().forward(new AbortTransaction(transactionId).toSerializable(),
+ getContext());
+ self().tell(PoisonPill.getInstance(), self());
+ }
+ }
+
+ public static Props props(String transactionId) {
+ return Props.create(new BackwardsCompatibleThreePhaseCommitCohortCreator(transactionId));
+ }
+
+ private static class BackwardsCompatibleThreePhaseCommitCohortCreator
+ implements Creator<BackwardsCompatibleThreePhaseCommitCohort> {
+ private static final long serialVersionUID = 1L;
+
+ private final String transactionId;
+
+ BackwardsCompatibleThreePhaseCommitCohortCreator(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ @Override
+ public BackwardsCompatibleThreePhaseCommitCohort create() throws Exception {
+ return new BackwardsCompatibleThreePhaseCommitCohort(transactionId);
+ }
+ }
+}
public class CreateTransaction implements SerializableMessage {
- public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransaction.class;
+ public static final Class<ShardTransactionMessages.CreateTransaction> SERIALIZABLE_CLASS =
+ ShardTransactionMessages.CreateTransaction.class;
+
+ public static final int HELIUM_1_VERSION = 1;
+ public static final int CURRENT_VERSION = HELIUM_1_VERSION;
+
private final String transactionId;
private final int transactionType;
private final String transactionChainId;
+ private final int version;
public CreateTransaction(String transactionId, int transactionType) {
this(transactionId, transactionType, "");
}
public CreateTransaction(String transactionId, int transactionType, String transactionChainId) {
+ this(transactionId, transactionType, transactionChainId, CURRENT_VERSION);
+ }
+ private CreateTransaction(String transactionId, int transactionType, String transactionChainId,
+ int version) {
this.transactionId = transactionId;
this.transactionType = transactionType;
this.transactionChainId = transactionChainId;
-
+ this.version = version;
}
-
public String getTransactionId() {
return transactionId;
}
return transactionType;
}
+ public int getVersion() {
+ return version;
+ }
+
@Override
public Object toSerializable() {
return ShardTransactionMessages.CreateTransaction.newBuilder()
.setTransactionId(transactionId)
.setTransactionType(transactionType)
- .setTransactionChainId(transactionChainId).build();
+ .setTransactionChainId(transactionChainId)
+ .setMessageVersion(version).build();
}
public static CreateTransaction fromSerializable(Object message) {
ShardTransactionMessages.CreateTransaction createTransaction =
(ShardTransactionMessages.CreateTransaction) message;
return new CreateTransaction(createTransaction.getTransactionId(),
- createTransaction.getTransactionType(), createTransaction.getTransactionChainId());
+ createTransaction.getTransactionType(), createTransaction.getTransactionChainId(),
+ createTransaction.getMessageVersion());
}
public String getTransactionChainId() {
public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.CreateTransactionReply.class;
private final String transactionPath;
private final String transactionId;
+ private final int version;
public CreateTransactionReply(String transactionPath,
String transactionId) {
+ this(transactionPath, transactionId, CreateTransaction.CURRENT_VERSION);
+ }
+
+ public CreateTransactionReply(String transactionPath,
+ String transactionId, int version) {
this.transactionPath = transactionPath;
this.transactionId = transactionId;
+ this.version = version;
}
+
public String getTransactionPath() {
return transactionPath;
}
return transactionId;
}
+ public int getVersion() {
+ return version;
+ }
+
public Object toSerializable(){
return ShardTransactionMessages.CreateTransactionReply.newBuilder()
.setTransactionActorPath(transactionPath)
.setTransactionId(transactionId)
+ .setMessageVersion(version)
.build();
}
public static CreateTransactionReply fromSerializable(Object serializable){
ShardTransactionMessages.CreateTransactionReply o = (ShardTransactionMessages.CreateTransactionReply) serializable;
- return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId());
+ return new CreateTransactionReply(o.getTransactionActorPath(), o.getTransactionId(), o.getMessageVersion());
}
}
private final DOMStoreThreePhaseCommitCohort cohort;
private final Modification modification;
private final boolean returnSerialized;
+ private final int txnClientVersion;
- public ForwardedReadyTransaction(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
- Modification modification, boolean returnSerialized) {
+ public ForwardedReadyTransaction(String transactionID, int txnClientVersion,
+ DOMStoreThreePhaseCommitCohort cohort, Modification modification,
+ boolean returnSerialized) {
this.transactionID = transactionID;
this.cohort = cohort;
this.modification = modification;
this.returnSerialized = returnSerialized;
-
+ this.txnClientVersion = txnClientVersion;
}
public String getTransactionID() {
public boolean isReturnSerialized() {
return returnSerialized;
}
+
+ public int getTxnClientVersion() {
+ return txnClientVersion;
+ }
}
private final String cohortPath;
public ReadyTransactionReply(String cohortPath) {
-
this.cohortPath = cohortPath;
}
@Override
public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
- return ShardTransactionMessages.ReadyTransactionReply.newBuilder().
- setActorPath(cohortPath).build();
+ return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
+ .setActorPath(cohortPath)
+ .build();
}
public static ReadyTransactionReply fromSerializable(Object serializable) {
return hostPort1.equals(hostPort2);
}
+
+ /**
+ * @deprecated This method is present only to support backward compatibility with Helium and should not be
+ * used any further
+ *
+ *
+ * @param primaryPath
+ * @param localPathOfRemoteActor
+ * @return
+ */
+ @Deprecated
+ public String resolvePath(final String primaryPath,
+ final String localPathOfRemoteActor) {
+ StringBuilder builder = new StringBuilder();
+ String[] primaryPathElements = primaryPath.split("/");
+ builder.append(primaryPathElements[0]).append("//")
+ .append(primaryPathElements[1]).append(primaryPathElements[2]);
+ String[] remotePathElements = localPathOfRemoteActor.split("/");
+ for (int i = 3; i < remotePathElements.length; i++) {
+ builder.append("/").append(remotePathElements[i]);
+ }
+
+ return builder.toString();
+ }
}
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
final Props props = DataChangeListener.props(mockListener);
final ActorRef subject = getSystem().actorOf(props, "testDataChangedWithNoSender");
- // Let the DataChangeListener know that notifications should be enabled
- subject.tell(new EnableNotification(true), ActorRef.noSender());
+ getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
subject.tell(new DataChanged(CompositeModel.createTestContext(), mockChangeEvent),
ActorRef.noSender());
- getSystem().eventStream().subscribe(getRef(), DeadLetter.class);
- new Within(duration("1 seconds")) {
- @Override
- protected void run() {
- expectNoMsg();
+ // Make sure no DataChangedReply is sent to DeadLetters.
+ while(true) {
+ DeadLetter deadLetter;
+ try {
+ deadLetter = expectMsgClass(duration("1 seconds"), DeadLetter.class);
+ } catch (AssertionError e) {
+ // Timed out - got no DeadLetter - this is good
+ break;
}
- };
+
+ // We may get DeadLetters for other messages we don't care about.
+ Assert.assertFalse("Unexpected DataChangedReply",
+ deadLetter.message() instanceof DataChangedReply);
+ }
}};
}
}
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.cluster.datastore.messages.CreateTransaction.CURRENT_VERSION;
public class ShardTest extends AbstractActorTest {
// Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
// Send the ForwardedReadyTransaction for the next 2 Tx's.
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+ cohort3, modification3, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the first Tx.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message.
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
modification, preCommit);
- shard.tell(new ForwardedReadyTransaction(transactionID, cohort, modification, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID, CURRENT_VERSION,
+ cohort, modification, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
shard.tell(new CanCommitTransaction(transactionID).toSerializable(), getRef());
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// canCommit 1st Tx. We don't send the commit so it should timeout.
// Ready the Tx's
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, cohort3, modification3, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+ cohort3, modification3, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// canCommit 1st Tx.
// Simulate the ForwardedReadyTransaction messages that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, cohort1, modification1, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+ cohort1, modification1, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID2, cohort2, modification2, true), getRef());
+ shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+ cohort2, modification2, true), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the first Tx.
};
}
- private NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
+ static NormalizedNode<?,?> readStore(TestActorRef<Shard> shard, YangInstanceIdentifier id)
throws ExecutionException, InterruptedException {
DOMStoreReadTransaction transaction = shard.underlyingActor().getDataStore().newReadOnlyTransaction();
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.pattern.AskTimeoutException;
-import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.node.utils.serialization.NormalizedNodeSerializer;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.pattern.AskTimeoutException;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
/**
* Covers negative test cases
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props, "testNegativeMergeTransactionReady");
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> subject = TestActorRef
.create(getSystem(), props,
--- /dev/null
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.Collections;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.FiniteDuration;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+
+/**
+ * Tests backwards compatibility support from Helium-1 to Helium.
+ *
+ * In Helium-1, the 3-phase commit support was moved from the ThreePhaseCommitCohort actor to the
+ * Shard. As a consequence, a new transactionId field was added to the CanCommitTransaction,
+ * CommitTransaction and AbortTransaction messages. With a base Helium version node, these messages
+ * would be sans transactionId so this test verifies the Shard handles that properly.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardTransactionHeliumBackwardsCompatibilityTest extends AbstractActorTest {
+
+ @Test
+ public void testTransactionCommit() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ SchemaContext schemaContext = TestModel.createTestContext();
+ Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
+ shardName("inventory").type("config").build(),
+ Collections.<ShardIdentifier,String>emptyMap(),
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
+ schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
+ "testTransactionCommit");
+
+ waitUntilLeader(shard);
+
+ // Send CreateTransaction message with no messages version
+
+ String transactionID = "txn-1";
+ shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
+ .setTransactionId(transactionID)
+ .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+ .setTransactionChainId("").build(), getRef());
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
+
+ ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
+
+ // Write data to the Tx
+
+ txActor.tell(new WriteData(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+
+ expectMsgClass(duration, WriteDataReply.class);
+
+ // Ready the Tx
+
+ txActor.tell(new ReadyTransaction().toSerializable(), getRef());
+
+ ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
+ duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+
+ ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
+
+ // Send the CanCommitTransaction message with no transactionId.
+
+ cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
+ getRef());
+
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Send the PreCommitTransaction message with no transactionId.
+
+ cohortActor.tell(ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder().build(),
+ getRef());
+
+ expectMsgClass(duration, PreCommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Send the CommitTransaction message with no transactionId.
+
+ cohortActor.tell(ThreePhaseCommitCohortMessages.CommitTransaction.newBuilder().build(),
+ getRef());
+
+ expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+ NormalizedNode<?, ?> node = ShardTest.readStore(shard, TestModel.TEST_PATH);
+ Assert.assertNotNull("Data not found in store", node);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+
+ @Test
+ public void testTransactionAbort() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ SchemaContext schemaContext = TestModel.createTestContext();
+ Props shardProps = Shard.props(ShardIdentifier.builder().memberName("member-1").
+ shardName("inventory").type("config").build(),
+ Collections.<ShardIdentifier,String>emptyMap(),
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).build(),
+ schemaContext).withDispatcher(Dispatchers.DefaultDispatcherId());
+
+ final TestActorRef<Shard> shard = TestActorRef.create(getSystem(), shardProps,
+ "testTransactionAbort");
+
+ waitUntilLeader(shard);
+
+ // Send CreateTransaction message with no messages version
+
+ String transactionID = "txn-1";
+ shard.tell(ShardTransactionMessages.CreateTransaction.newBuilder()
+ .setTransactionId(transactionID)
+ .setTransactionType(TransactionProxy.TransactionType.WRITE_ONLY.ordinal())
+ .setTransactionChainId("").build(), getRef());
+
+ final FiniteDuration duration = duration("5 seconds");
+
+ CreateTransactionReply reply = expectMsgClass(duration, CreateTransactionReply.class);
+
+ ActorSelection txActor = getSystem().actorSelection(reply.getTransactionActorPath());
+
+ // Write data to the Tx
+
+ txActor.tell(new WriteData(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext), getRef());
+
+ expectMsgClass(duration, WriteDataReply.class);
+
+ // Ready the Tx
+
+ txActor.tell(new ReadyTransaction().toSerializable(), getRef());
+
+ ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(expectMsgClass(
+ duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
+
+ ActorSelection cohortActor = getSystem().actorSelection(readyReply.getCohortPath());
+
+ // Send the CanCommitTransaction message with no transactionId.
+
+ cohortActor.tell(ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder().build(),
+ getRef());
+
+ expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+ // Send the AbortTransaction message with no transactionId.
+
+ cohortActor.tell(ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder().build(),
+ getRef());
+
+ expectMsgClass(duration, AbortTransactionReply.SERIALIZABLE_CLASS);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
+ }
+}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.actor.Terminated;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
public class ShardTransactionTest extends AbstractActorTest {
private static ListeningExecutorService storeExecutor =
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
}
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRO"));
props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRW"));
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
}
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
}
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
transaction.tell(new WriteData(TestModel.TEST_PATH,
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
transaction.tell(new MergeData(TestModel.TEST_PATH,
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
watch(transaction);
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
watch(transaction);
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
watch(transaction);
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
final ActorRef transaction =
getSystem().actorOf(props, "testShardTransactionInactivity");
return getSystem().actorSelection(actorRef.path());
}
- private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+ private CreateTransactionReply createTransactionReply(ActorRef actorRef, int transactionVersion){
return CreateTransactionReply.newBuilder()
.setTransactionActorPath(actorRef.path().toString())
- .setTransactionId("txn-1").build();
+ .setTransactionId("txn-1")
+ .setMessageVersion(transactionVersion)
+ .build();
}
- private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type, int transactionVersion) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
- doReturn(Futures.successful(createTransactionReply(actorRef))).when(mockActorContext).
+ doReturn(Futures.successful(createTransactionReply(actorRef, transactionVersion))).when(mockActorContext).
executeOperationAsync(eq(actorSystem.actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
return actorRef;
}
+ private ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+ return setupActorContextWithInitialCreateTransaction(actorSystem, type, CreateTransaction.CURRENT_VERSION);
+ }
+
+
private void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future)
throws Throwable {
verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadyForwardCompatibility() throws Exception {
+ ActorRef actorRef = setupActorContextWithInitialCreateTransaction(getSystem(), READ_WRITE, 0);
+
+ NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+
+ doReturn(readSerializedDataReply(null)).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedReadData());
+
+ doReturn(writeSerializedDataReply()).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), eqSerializedWriteData(nodeToWrite));
+
+ doReturn(readySerializedTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
+ eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
+
+ doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+
+ TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
+ READ_WRITE);
+
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
+ WriteDataReply.SERIALIZABLE_CLASS);
+
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
+
+ verify(mockActorContext).resolvePath(eq(actorRef.path().toString()),
+ eq(actorRef.path().toString()));
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testReadyWithRecordingOperationFailure() throws Exception {
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
+
import java.util.concurrent.TimeUnit;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
clusterWrapper.setSelfAddress("akka.tcp://system@127.0.0.1:2551/");
assertEquals(false, actorContext.isLocalPath("akka.tcp://system@127.0.0.1:2550/"));
}
+
+ @Test
+ public void testResolvePathForRemoteActor() {
+ ActorContext actorContext =
+ new ActorContext(mock(ActorSystem.class), mock(ActorRef.class), mock(
+ ClusterWrapper.class),
+ mock(Configuration.class));
+
+ String actual = actorContext.resolvePath(
+ "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
+ "akka://system/user/shardmanager/shard/transaction");
+
+ String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testResolvePathForLocalActor() {
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ String actual = actorContext.resolvePath(
+ "akka://system/user/shardmanager/shard",
+ "akka://system/user/shardmanager/shard/transaction");
+
+ String expected = "akka://system/user/shardmanager/shard/transaction";
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testResolvePathForRemoteActorWithProperRemoteAddress() {
+ ActorContext actorContext =
+ new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
+ mock(Configuration.class));
+
+ String actual = actorContext.resolvePath(
+ "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
+ "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
+
+ String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
+
+ assertEquals(expected, actual);
+ }
+
}