import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
+import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import junit.framework.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
public class BasicIntegrationTest extends AbstractActorTest {
@Test
- public void integrationTest() {
+ public void integrationTest() throws Exception{
// This test will
// - create a Shard
// - initiate a transaction
shard.tell(new CreateTransactionChain(), getRef());
final ActorSelection transactionChain =
- new ExpectMsg<ActorSelection>("match hint") {
+ new ExpectMsg<ActorSelection>("CreateTransactionChainReply") {
protected ActorSelection match(Object in) {
if (in instanceof CreateTransactionChainReply) {
ActorPath transactionChainPath =
Assert.assertNotNull(transactionChain);
- transactionChain.tell(new CreateTransaction(), getRef());
+ transactionChain.tell(new CreateTransaction("txn-1"), getRef());
final ActorSelection transaction =
- new ExpectMsg<ActorSelection>("match hint") {
+ new ExpectMsg<ActorSelection>("CreateTransactionReply") {
protected ActorSelection match(Object in) {
if (in instanceof CreateTransactionReply) {
ActorPath transactionPath =
Assert.assertNotNull(transaction);
+ // Add a watch on the transaction actor so that we are notified when it dies
+ final ActorRef transactionActorRef = watchActor(transaction);
+
transaction.tell(new WriteData(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
getRef());
- Boolean writeDone = new ExpectMsg<Boolean>("match hint") {
+ Boolean writeDone = new ExpectMsg<Boolean>("WriteDataReply") {
protected Boolean match(Object in) {
if (in instanceof WriteDataReply) {
return true;
transaction.tell(new ReadyTransaction(), getRef());
final ActorSelection cohort =
- new ExpectMsg<ActorSelection>("match hint") {
+ new ExpectMsg<ActorSelection>("ReadyTransactionReply") {
protected ActorSelection match(Object in) {
if (in instanceof ReadyTransactionReply) {
ActorPath cohortPath =
Assert.assertNotNull(cohort);
+ // Add a watch on the transaction actor so that we are notified when it dies
+ final ActorRef cohorActorRef = watchActor(cohort);
+
cohort.tell(new PreCommitTransaction(), getRef());
Boolean preCommitDone =
- new ExpectMsg<Boolean>("match hint") {
+ new ExpectMsg<Boolean>("PreCommitTransactionReply") {
protected Boolean match(Object in) {
if (in instanceof PreCommitTransactionReply) {
return true;
Assert.assertTrue(preCommitDone);
+ // FIXME : When we commit on the cohort it "kills" the Transaction.
+ // This in turn kills the child of Transaction as well.
+ // The order in which we receive the terminated event for both
+ // these actors is not fixed which may cause this test to fail
cohort.tell(new CommitTransaction(), getRef());
+ final Boolean terminatedCohort =
+ new ExpectMsg<Boolean>("Terminated Cohort") {
+ protected Boolean match(Object in) {
+ if (in instanceof Terminated) {
+ return cohorActorRef.equals(((Terminated) in).actor());
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(terminatedCohort);
+
+
+ final Boolean terminatedTransaction =
+ new ExpectMsg<Boolean>("Terminated Transaction") {
+ protected Boolean match(Object in) {
+ if (in instanceof Terminated) {
+ return transactionActorRef.equals(((Terminated) in).actor());
+ } else {
+ throw noMatch();
+ }
+ }
+ }.get(); // this extracts the received message
+
+ Assert.assertTrue(terminatedTransaction);
+
final Boolean commitDone =
- new ExpectMsg<Boolean>("match hint") {
+ new ExpectMsg<Boolean>("CommitTransactionReply") {
protected Boolean match(Object in) {
if (in instanceof CommitTransactionReply) {
return true;
};
- }};
+ }
+
+ private ActorRef watchActor(ActorSelection actor) {
+ Future<ActorRef> future = actor
+ .resolveOne(FiniteDuration.apply(100, "milliseconds"));
+
+ try {
+ ActorRef actorRef = Await.result(future,
+ FiniteDuration.apply(100, "milliseconds"));
+
+ watch(actorRef);
+
+ return actorRef;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ };
}