package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.actor.Terminated;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestActorRef;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.Terminated;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
public class ShardTransactionTest extends AbstractActorTest {
private static ListeningExecutorService storeExecutor =
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
}
- private void testOnReceiveReadData(final ActorRef subject) {
+ private void testOnReceiveReadData(final ActorRef transaction) {
//serialized read
- subject.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
+ transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
getRef());
ShardTransactionMessages.ReadDataReply replySerialized =
.getNormalizedNode());
// unserialized read
- subject.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
+ transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRO"));
props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
props, "testReadDataWhenDataNotFoundRW"));
}
- private void testOnReceiveReadDataWhenDataNotFound(final ActorRef subject) {
+ private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
// serialized read
- subject.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
+ transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
ShardTransactionMessages.ReadDataReply replySerialized =
expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
// unserialized read
- subject.tell(new ReadData(TestModel.TEST_PATH),getRef());
+ transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
}
- private void testOnReceiveDataExistsPositive(final ActorRef subject) {
- subject.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
+ private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
+ transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
getRef());
ShardTransactionMessages.DataExistsReply replySerialized =
assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
// unserialized read
- subject.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
+ transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
}
- private void testOnReceiveDataExistsNegative(final ActorRef subject) {
- subject.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
+ private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
+ transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
ShardTransactionMessages.DataExistsReply replySerialized =
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
// unserialized read
- subject.tell(new DataExists(TestModel.TEST_PATH),getRef());
+ transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testWriteData");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
- subject.tell(new WriteData(TestModel.TEST_PATH,
+ transaction.tell(new WriteData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
getRef());
- ShardTransactionMessages.WriteDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
- assertModification(subject, WriteModification.class);
+ assertModification(transaction, WriteModification.class);
//unserialized write
- subject.tell(new WriteData(TestModel.TEST_PATH,
+ transaction.tell(new WriteData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME),
TestModel.createTestContext()),
getRef());
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testMergeData");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
- subject.tell(new MergeData(TestModel.TEST_PATH,
+ transaction.tell(new MergeData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
getRef());
- ShardTransactionMessages.MergeDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
- assertModification(subject, MergeModification.class);
+ assertModification(transaction, MergeModification.class);
//unserialized merge
- subject.tell(new MergeData(TestModel.TEST_PATH,
+ transaction.tell(new MergeData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
getRef());
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testDeleteData");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
- subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
+ transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
- ShardTransactionMessages.DeleteDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
- assertModification(subject, DeleteModification.class);
+ assertModification(transaction, DeleteModification.class);
//unserialized merge
- subject.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+ transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
}};
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testReadyTransaction");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
- subject.tell(new ReadyTransaction().toSerializable(), getRef());
+ watch(transaction);
- expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
+ transaction.tell(new ReadyTransaction().toSerializable(), getRef());
+
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+ Terminated.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+ Terminated.class);
}};
// test
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testReadyTransaction2");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
+
+ watch(transaction);
- subject.tell(new ReadyTransaction(), getRef());
+ transaction.tell(new ReadyTransaction(), getRef());
- expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+ Terminated.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+ Terminated.class);
}};
}
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
- watch(subject);
+ watch(transaction);
- subject.tell(new CloseTransaction().toSerializable(), getRef());
+ transaction.tell(new CloseTransaction().toSerializable(), getRef());
expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
- expectMsgClass(duration("3 seconds"), Terminated.class);
+ expectTerminated(duration("3 seconds"), transaction);
}};
}
public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
- final TestActorRef subject = TestActorRef.apply(props,getSystem());
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
- subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
+ transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
}
@Test
new JavaTestKit(getSystem()) {{
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
- testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
+ testSchemaContext, datastoreContext, shardStats, "txn",
+ CreateTransaction.CURRENT_VERSION);
+ final ActorRef transaction =
getSystem().actorOf(props, "testShardTransactionInactivity");
- watch(subject);
-
- // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
+ watch(transaction);
- final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in instanceof Terminated) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", termination);
+ expectMsgClass(duration("3 seconds"), Terminated.class);
}};
}
}