Merge "BUG 1712 - Distributed DataStore does not work properly with Transaction...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / BasicIntegrationTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import akka.event.Logging;
16 import akka.testkit.JavaTestKit;
17 import org.junit.Test;
18 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
19 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
20 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
21 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
22 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
27 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
28 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
29 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
30 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
31 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
32 import scala.concurrent.Await;
33 import scala.concurrent.Future;
34 import scala.concurrent.duration.FiniteDuration;
35
36 import java.util.Collections;
37
38 import static junit.framework.Assert.assertEquals;
39 import static junit.framework.Assert.assertTrue;
40 import static org.junit.Assert.assertNotNull;
41
42 public class BasicIntegrationTest extends AbstractActorTest {
43
44     @Test
45     public void integrationTest() throws Exception{
46         // System.setProperty("shard.persistent", "true");
47         // This test will
48         // - create a Shard
49         // - initiate a transaction
50         // - write something
51         // - read the transaction for commit
52         // - commit the transaction
53
54
55         new JavaTestKit(getSystem()) {{
56             final ShardIdentifier identifier =
57                 ShardIdentifier.builder().memberName("member-1")
58                     .shardName("inventory").type("config").build();
59
60             final SchemaContext schemaContext = TestModel.createTestContext();
61             DatastoreContext datastoreContext = new DatastoreContext();
62
63             final Props props = Shard.props(identifier, Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext());
64             final ActorRef shard = getSystem().actorOf(props);
65
66             new Within(duration("10 seconds")) {
67                 @Override
68                 protected void run() {
69                     shard.tell(new UpdateSchemaContext(schemaContext), getRef());
70
71
72                     // Wait for a specific log message to show up
73                     final boolean result =
74                         new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
75                         ) {
76                             @Override
77                             protected Boolean run() {
78                                 return true;
79                             }
80                         }.from(shard.path().toString())
81                             .message("Switching from state Candidate to Leader")
82                             .occurrences(1).exec();
83
84                     assertEquals(true, result);
85
86                     // Create a transaction on the shard
87                     shard.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
88
89                     final ActorSelection transaction =
90                         new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionReply") {
91                             @Override
92                             protected ActorSelection match(Object in) {
93                                 if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(in.getClass())) {
94                                     CreateTransactionReply reply = CreateTransactionReply.fromSerializable(in);
95                                     return getSystem()
96                                         .actorSelection(reply
97                                             .getTransactionPath());
98                                 } else {
99                                     throw noMatch();
100                                 }
101                             }
102                         }.get(); // this extracts the received message
103
104                     assertNotNull(transaction);
105
106                     System.out.println("Successfully created transaction");
107
108                     // 3. Write some data
109                     transaction.tell(new WriteData(TestModel.TEST_PATH,
110                         ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext).toSerializable(),
111                         getRef());
112
113                     Boolean writeDone = new ExpectMsg<Boolean>(duration("3 seconds"), "WriteDataReply") {
114                         @Override
115                         protected Boolean match(Object in) {
116                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
117                                 return true;
118                             } else {
119                                 throw noMatch();
120                             }
121                         }
122                     }.get(); // this extracts the received message
123
124                     assertTrue(writeDone);
125
126                     System.out.println("Successfully wrote data");
127
128                     // 4. Ready the transaction for commit
129
130                     transaction.tell(new ReadyTransaction().toSerializable(), getRef());
131
132                     final ActorSelection cohort =
133                         new ExpectMsg<ActorSelection>(duration("3 seconds"), "ReadyTransactionReply") {
134                             @Override
135                             protected ActorSelection match(Object in) {
136                                 if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
137                                     ActorPath cohortPath =
138                                         ReadyTransactionReply.fromSerializable(getSystem(),in)
139                                             .getCohortPath();
140                                     return getSystem()
141                                         .actorSelection(cohortPath);
142                                 } else {
143                                     throw noMatch();
144                                 }
145                             }
146                         }.get(); // this extracts the received message
147
148                     assertNotNull(cohort);
149
150                     System.out.println("Successfully readied the transaction");
151
152                     // 5. PreCommit the transaction
153
154                     cohort.tell(new PreCommitTransaction().toSerializable(), getRef());
155
156                     Boolean preCommitDone =
157                         new ExpectMsg<Boolean>(duration("3 seconds"), "PreCommitTransactionReply") {
158                             @Override
159                             protected Boolean match(Object in) {
160                                 if (in.getClass().equals(PreCommitTransactionReply.SERIALIZABLE_CLASS)) {
161                                     return true;
162                                 } else {
163                                     throw noMatch();
164                                 }
165                             }
166                         }.get(); // this extracts the received message
167
168                     assertTrue(preCommitDone);
169
170                     System.out.println("Successfully pre-committed the transaction");
171
172                     // 6. Commit the transaction
173                     cohort.tell(new CommitTransaction().toSerializable(), getRef());
174
175                     // FIXME : Add assertions that the commit worked and that the cohort and transaction actors were terminated
176
177                     System.out.println("TODO : Check Successfully committed the transaction");
178                 }
179
180
181             };
182         }
183
184             private ActorRef watchActor(ActorSelection actor) {
185                 Future<ActorRef> future = actor
186                     .resolveOne(FiniteDuration.apply(100, "milliseconds"));
187
188                 try {
189                     ActorRef actorRef = Await.result(future,
190                         FiniteDuration.apply(100, "milliseconds"));
191
192                     watch(actorRef);
193
194                     return actorRef;
195                 } catch (Exception e) {
196                     throw new RuntimeException(e);
197                 }
198
199             }
200         };
201
202
203     }
204 }