2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.Props;
14 import akka.event.Logging;
15 import akka.testkit.JavaTestKit;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
18 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
20 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
26 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
27 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
28 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
29 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
30 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
31 import scala.concurrent.Await;
32 import scala.concurrent.Future;
33 import scala.concurrent.duration.FiniteDuration;
35 import java.util.Collections;
37 import static junit.framework.Assert.assertEquals;
38 import static junit.framework.Assert.assertTrue;
39 import static org.junit.Assert.assertNotNull;
41 public class BasicIntegrationTest extends AbstractActorTest {
44 public void integrationTest() throws Exception{
45 // System.setProperty("shard.persistent", "true");
48 // - initiate a transaction
50 // - read the transaction for commit
51 // - commit the transaction
54 new JavaTestKit(getSystem()) {{
55 final ShardIdentifier identifier =
56 ShardIdentifier.builder().memberName("member-1")
57 .shardName("inventory").type("config").build();
59 final SchemaContext schemaContext = TestModel.createTestContext();
60 DatastoreContext datastoreContext = new DatastoreContext();
62 final Props props = Shard.props(identifier, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext());
63 final ActorRef shard = getSystem().actorOf(props);
65 new Within(duration("10 seconds")) {
67 protected void run() {
68 shard.tell(new UpdateSchemaContext(schemaContext), getRef());
71 // Wait for a specific log message to show up
72 final boolean result =
73 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
76 protected Boolean run() {
79 }.from(shard.path().toString())
80 .message("Switching from state Candidate to Leader")
81 .occurrences(1).exec();
83 assertEquals(true, result);
85 // Create a transaction on the shard
86 shard.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
88 final ActorSelection transaction =
89 new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionReply") {
91 protected ActorSelection match(Object in) {
92 if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(in.getClass())) {
93 CreateTransactionReply reply = CreateTransactionReply.fromSerializable(in);
96 .getTransactionPath());
101 }.get(); // this extracts the received message
103 assertNotNull(transaction);
105 System.out.println("Successfully created transaction");
107 // 3. Write some data
108 transaction.tell(new WriteData(TestModel.TEST_PATH,
109 ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext).toSerializable(),
112 Boolean writeDone = new ExpectMsg<Boolean>(duration("3 seconds"), "WriteDataReply") {
114 protected Boolean match(Object in) {
115 if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
121 }.get(); // this extracts the received message
123 assertTrue(writeDone);
125 System.out.println("Successfully wrote data");
127 // 4. Ready the transaction for commit
129 transaction.tell(new ReadyTransaction().toSerializable(), getRef());
131 final ActorSelection cohort =
132 new ExpectMsg<ActorSelection>(duration("3 seconds"), "ReadyTransactionReply") {
134 protected ActorSelection match(Object in) {
135 if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
137 ReadyTransactionReply.fromSerializable(in)
139 return getSystem().actorSelection(cohortPath);
144 }.get(); // this extracts the received message
146 assertNotNull(cohort);
148 System.out.println("Successfully readied the transaction");
150 // 5. PreCommit the transaction
152 cohort.tell(new PreCommitTransaction().toSerializable(), getRef());
154 Boolean preCommitDone =
155 new ExpectMsg<Boolean>(duration("3 seconds"), "PreCommitTransactionReply") {
157 protected Boolean match(Object in) {
158 if (in.getClass().equals(PreCommitTransactionReply.SERIALIZABLE_CLASS)) {
164 }.get(); // this extracts the received message
166 assertTrue(preCommitDone);
168 System.out.println("Successfully pre-committed the transaction");
170 // 6. Commit the transaction
171 cohort.tell(new CommitTransaction().toSerializable(), getRef());
173 // FIXME : Add assertions that the commit worked and that the cohort and transaction actors were terminated
175 System.out.println("TODO : Check Successfully committed the transaction");
182 private ActorRef watchActor(ActorSelection actor) {
183 Future<ActorRef> future = actor
184 .resolveOne(FiniteDuration.apply(100, "milliseconds"));
187 ActorRef actorRef = Await.result(future,
188 FiniteDuration.apply(100, "milliseconds"));
193 } catch (Exception e) {
194 throw new RuntimeException(e);