package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
}
}
- private void writeData(DOMStoreWriteTransaction transaction, WriteData message, boolean returnSerialized) {
+ private void writeData(DOMStoreWriteTransaction transaction, WriteData message,
+ boolean returnSerialized) {
+ LOG.debug("writeData at path : {}", message.getPath());
+
modification.addModification(
new WriteModification(message.getPath(), message.getData(), getSchemaContext()));
- if(LOG.isDebugEnabled()) {
- LOG.debug("writeData at path : " + message.getPath().toString());
- }
try {
transaction.write(message.getPath(), message.getData());
WriteDataReply writeDataReply = new WriteDataReply();
}
}
- private void mergeData(DOMStoreWriteTransaction transaction, MergeData message, boolean returnSerialized) {
+ private void mergeData(DOMStoreWriteTransaction transaction, MergeData message,
+ boolean returnSerialized) {
+ LOG.debug("mergeData at path : {}", message.getPath());
+
modification.addModification(
new MergeModification(message.getPath(), message.getData(), getSchemaContext()));
- if(LOG.isDebugEnabled()) {
- LOG.debug("mergeData at path : " + message.getPath().toString());
- }
+
try {
transaction.merge(message.getPath(), message.getData());
MergeDataReply mergeDataReply = new MergeDataReply();
}
}
- private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message, boolean returnSerialized) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("deleteData at path : " + message.getPath().toString());
- }
+ private void deleteData(DOMStoreWriteTransaction transaction, DeleteData message,
+ boolean returnSerialized) {
+ LOG.debug("deleteData at path : {}", message.getPath());
+
modification.addModification(new DeleteModification(message.getPath()));
try {
transaction.delete(message.getPath());
}
}
- private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message, boolean returnSerialized) {
+ private void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message,
+ boolean returnSerialized) {
+ String transactionID = getTransactionID();
+
+ LOG.debug("readyTransaction : {}", transactionID);
+
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
- getShardActor().forward(new ForwardedReadyTransaction(
- getTransactionID(), cohort, modification, returnSerialized),
- getContext());
+ getShardActor().forward(new ForwardedReadyTransaction(transactionID, cohort, modification,
+ returnSerialized), getContext());
+
+ // The shard will handle the commit from here so we're no longer needed - self-destruct.
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
}
// These classes are in here for test purposes only
testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
}
- private void testOnReceiveReadData(final ActorRef subject) {
+ private void testOnReceiveReadData(final ActorRef transaction) {
//serialized read
- subject.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
+ transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
getRef());
ShardTransactionMessages.ReadDataReply replySerialized =
.getNormalizedNode());
// unserialized read
- subject.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
+ transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
props, "testReadDataWhenDataNotFoundRW"));
}
- private void testOnReceiveReadDataWhenDataNotFound(final ActorRef subject) {
+ private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
// serialized read
- subject.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
+ transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
ShardTransactionMessages.ReadDataReply replySerialized =
expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
testSchemaContext, TestModel.TEST_PATH, replySerialized).getNormalizedNode() == null);
// unserialized read
- subject.tell(new ReadData(TestModel.TEST_PATH),getRef());
+ transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
}
- private void testOnReceiveDataExistsPositive(final ActorRef subject) {
- subject.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
+ private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
+ transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
getRef());
ShardTransactionMessages.DataExistsReply replySerialized =
assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
// unserialized read
- subject.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
+ transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
}
- private void testOnReceiveDataExistsNegative(final ActorRef subject) {
- subject.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
+ private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
+ transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
ShardTransactionMessages.DataExistsReply replySerialized =
expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
// unserialized read
- subject.tell(new DataExists(TestModel.TEST_PATH),getRef());
+ transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testWriteData");
+ final ActorRef transaction = getSystem().actorOf(props, "testWriteData");
- subject.tell(new WriteData(TestModel.TEST_PATH,
+ transaction.tell(new WriteData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
getRef());
- ShardTransactionMessages.WriteDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
- assertModification(subject, WriteModification.class);
+ assertModification(transaction, WriteModification.class);
//unserialized write
- subject.tell(new WriteData(TestModel.TEST_PATH,
+ transaction.tell(new WriteData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME),
TestModel.createTestContext()),
getRef());
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testMergeData");
+ final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
- subject.tell(new MergeData(TestModel.TEST_PATH,
+ transaction.tell(new MergeData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
getRef());
- ShardTransactionMessages.MergeDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
- assertModification(subject, MergeModification.class);
+ assertModification(transaction, MergeModification.class);
//unserialized merge
- subject.tell(new MergeData(TestModel.TEST_PATH,
+ transaction.tell(new MergeData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext),
getRef());
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testDeleteData");
+ final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
- subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
+ transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
- ShardTransactionMessages.DeleteDataReply replySerialized =
- expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
+ expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
- assertModification(subject, DeleteModification.class);
+ assertModification(transaction, DeleteModification.class);
//unserialized merge
- subject.tell(new DeleteData(TestModel.TEST_PATH), getRef());
+ transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
}};
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testReadyTransaction");
+ final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
- subject.tell(new ReadyTransaction().toSerializable(), getRef());
+ watch(transaction);
- expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
+ transaction.tell(new ReadyTransaction().toSerializable(), getRef());
+
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+ Terminated.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
+ Terminated.class);
}};
// test
final ActorRef shard = createShard();
final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
- getSystem().actorOf(props, "testReadyTransaction2");
+ final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
+
+ watch(transaction);
- subject.tell(new ReadyTransaction(), getRef());
+ transaction.tell(new ReadyTransaction(), getRef());
- expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+ Terminated.class);
+ expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
+ Terminated.class);
}};
}
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject = getSystem().actorOf(props, "testCloseTransaction");
+ final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
- watch(subject);
+ watch(transaction);
- subject.tell(new CloseTransaction().toSerializable(), getRef());
+ transaction.tell(new CloseTransaction().toSerializable(), getRef());
expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
- expectMsgClass(duration("3 seconds"), Terminated.class);
+ expectTerminated(duration("3 seconds"), transaction);
}};
}
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final TestActorRef subject = TestActorRef.apply(props,getSystem());
+ final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
- subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
+ transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
}
@Test
final ActorRef shard = createShard();
final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
testSchemaContext, datastoreContext, shardStats, "txn");
- final ActorRef subject =
+ final ActorRef transaction =
getSystem().actorOf(props, "testShardTransactionInactivity");
- watch(subject);
+ watch(transaction);
- // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
-
- final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
- // do not put code outside this method, will run afterwards
- @Override
- protected String match(Object in) {
- if (in instanceof Terminated) {
- return "match";
- } else {
- throw noMatch();
- }
- }
- }.get(); // this extracts the received message
-
- assertEquals("match", termination);
+ expectMsgClass(duration("3 seconds"), Terminated.class);
}};
}
}