+/*
+ * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.cluster.datastore;
import static org.junit.Assert.assertEquals;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
-import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
-import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
-import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.MergeData;
-import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
-import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
-import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
- private final ShardDataTree store = new ShardDataTree(testSchemaContext);
+ private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
private int txCounter = 0;
private ActorRef createShard() {
- return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
- Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
+ ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
+ schemaContext(TestModel.createTestContext()).props());
+ ShardTestKit.waitUntilLeader(shard);
+ return shard;
}
private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
- return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
- }
-
- private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
- return newTransactionActor(type, transaction, null, name, version);
+ return newTransactionActor(type, transaction, null, name);
}
private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
- return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
- }
-
- private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
- short version) {
Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
- datastoreContext, shardStats, "txn", version);
+ datastoreContext, shardStats, "txn");
return getSystem().actorOf(props, name);
}
}
private void testOnReceiveReadData(final ActorRef transaction) {
- //serialized read
- transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
- getRef());
-
- Object replySerialized =
- expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
-
- assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
-
- // unserialized read
- transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
+ transaction.tell(new ReadData(YangInstanceIdentifier.builder().build(),
+ DataStoreVersions.CURRENT_VERSION),getRef());
ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
}
private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
- // serialized read
- transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
-
- Object replySerialized =
- expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
-
- assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
-
- // unserialized read
- transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
+ transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
}};
}
- @Test
- public void testOnReceiveReadDataHeliumR1() throws Exception {
- new JavaTestKit(getSystem()) {{
- ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
- "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
-
- transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
- getRef());
-
- ShardTransactionMessages.ReadDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
-
- assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
- }};
- }
-
@Test
public void testOnReceiveDataExistsPositive() throws Exception {
new JavaTestKit(getSystem()) {{
}
private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
- transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
- getRef());
-
- ShardTransactionMessages.DataExistsReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
-
- assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
-
- // unserialized read
- transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
+ transaction.tell(new DataExists(YangInstanceIdentifier.builder().build(),
+ DataStoreVersions.CURRENT_VERSION),getRef());
DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
}
private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
- transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
-
- ShardTransactionMessages.DataExistsReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
-
- assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
-
- // unserialized read
- transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
+ transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION),getRef());
DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
}};
}
- private void assertModification(final ActorRef subject,
- final Class<? extends Modification> modificationType) {
- new JavaTestKit(getSystem()) {{
- subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
-
- CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
- GetCompositeModificationReply.class).getModification();
-
- assertTrue(compositeModification.getModifications().size() == 1);
- assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
- }};
- }
-
- @Test
- public void testOnReceiveWriteData() {
- new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
- "testOnReceiveWriteData");
-
- transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
- toSerializable(), getRef());
-
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
-
- assertModification(transaction, WriteModification.class);
-
- // unserialized write
- transaction.tell(new WriteData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
- getRef());
-
- expectMsgClass(duration("5 seconds"), WriteDataReply.class);
- }};
- }
-
- @Test
- public void testOnReceiveHeliumR1WriteData() {
- new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
- "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
-
- Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
- .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
- .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
-
- transaction.tell(serialized, getRef());
-
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
-
- assertModification(transaction, WriteModification.class);
- }};
- }
-
- @Test
- public void testOnReceiveMergeData() {
- new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
- "testMergeData");
-
- transaction.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
- toSerializable(), getRef());
-
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
-
- assertModification(transaction, MergeModification.class);
-
- //unserialized merge
- transaction.tell(new MergeData(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
- getRef());
-
- expectMsgClass(duration("5 seconds"), MergeDataReply.class);
- }};
- }
-
- @Test
- public void testOnReceiveHeliumR1MergeData() throws Exception {
- new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
- "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
-
- Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
- ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
- .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
- .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
-
- transaction.tell(serialized, getRef());
-
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
-
- assertModification(transaction, MergeModification.class);
- }};
- }
-
- @Test
- public void testOnReceiveDeleteData() throws Exception {
- new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
- "testDeleteData");
-
- transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
- toSerializable(), getRef());
-
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
-
- assertModification(transaction, DeleteModification.class);
-
- //unserialized
- transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
-
- expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
- }};
- }
-
@Test
public void testOnReceiveBatchedModifications() throws Exception {
new JavaTestKit(getSystem()) {{
BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
assertEquals("getNumBatched", 3, reply.getNumBatched());
- JavaTestKit verification = new JavaTestKit(getSystem());
- transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
-
- CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
- GetCompositeModificationReply.class).getModification();
-
- assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
-
- WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
- assertEquals("getPath", writePath, write.getPath());
- assertEquals("getData", writeData, write.getData());
-
- MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
- assertEquals("getPath", mergePath, merge.getPath());
- assertEquals("getData", mergeData, merge.getData());
-
- DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
- assertEquals("getPath", deletePath, delete.getPath());
-
InOrder inOrder = Mockito.inOrder(mockModification);
inOrder.verify(mockModification).write(writePath, writeData);
inOrder.verify(mockModification).merge(mergePath, mergeData);
batched.setTotalMessagesSent(1);
transaction.tell(batched, getRef());
- expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
+ expectMsgClass(duration("5 seconds"), CommitTransactionReply.class);
watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
}};
}
}};
}
- @Test
- public void testOnReceivePreLithiumReadyTransaction() throws Exception {
- new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
- "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
-
- JavaTestKit watcher = new JavaTestKit(getSystem());
- watcher.watch(transaction);
-
- transaction.tell(new ReadyTransaction().toSerializable(), getRef());
-
- expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
- watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
- }};
-
- // test
- new JavaTestKit(getSystem()) {{
- final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
- "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
-
- JavaTestKit watcher = new JavaTestKit(getSystem());
- watcher.watch(transaction);
-
- transaction.tell(new ReadyTransaction(), getRef());
-
- expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
- watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
- }};
- }
-
@Test
public void testOnReceiveCreateSnapshot() throws Exception {
new JavaTestKit(getSystem()) {{
transaction.tell(new CloseTransaction().toSerializable(), getRef());
- expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
+ expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
expectTerminated(duration("3 seconds"), transaction);
}};
}
transaction.tell(new CloseTransaction().toSerializable(), getRef());
- expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
+ expectMsgClass(duration("3 seconds"), CloseTransactionReply.class);
expectTerminated(duration("3 seconds"), transaction);
}};
}
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
- datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
+ datastoreContext, shardStats, "txn");
final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
- transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
- toSerializable(), ActorRef.noSender());
+ transaction.receive(new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null),
+ ActorRef.noSender());
}
@Test
}};
}
+ @Test
+ public void testOnReceivePreBoronReadData() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
+ "testOnReceivePreBoronReadData");
+
+ transaction.tell(new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
+ toSerializable(), getRef());
+
+ Object replySerialized = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
+ assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
+ }};
+ }
+
+ @Test
+ public void testOnReceivePreBoronDataExists() throws Exception {
+ new JavaTestKit(getSystem()) {{
+ ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(), createShard(),
+ "testOnReceivePreBoronDataExists");
+
+ transaction.tell(new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.LITHIUM_VERSION).
+ toSerializable(), getRef());
+
+ Object replySerialized = expectMsgClass(duration("5 seconds"),
+ ShardTransactionMessages.DataExistsReply.class);
+ assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
+ }};
+ }
+
public static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}