1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorRef;
4 import akka.actor.Props;
5 import akka.actor.Terminated;
6 import akka.testkit.JavaTestKit;
7 import akka.testkit.TestActorRef;
8 import com.google.common.util.concurrent.ListeningExecutorService;
9 import com.google.common.util.concurrent.MoreExecutors;
10 import org.junit.BeforeClass;
11 import org.junit.Test;
12 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
13 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
14 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
15 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
17 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
18 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
19 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
20 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
21 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
22 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
23 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
24 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
28 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
29 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
30 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
31 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
32 import org.opendaylight.controller.cluster.datastore.modification.Modification;
33 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
34 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
35 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
37 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import scala.concurrent.duration.Duration;
40 import java.util.Collections;
41 import java.util.concurrent.TimeUnit;
42 import static org.junit.Assert.assertEquals;
43 import static org.junit.Assert.assertTrue;
45 public class ShardTransactionTest extends AbstractActorTest {
46 private static ListeningExecutorService storeExecutor =
47 MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
49 private static final InMemoryDOMDataStore store =
50 new InMemoryDOMDataStore("OPER", storeExecutor, MoreExecutors.sameThreadExecutor());
52 private static final SchemaContext testSchemaContext = TestModel.createTestContext();
54 private static final ShardIdentifier SHARD_IDENTIFIER =
55 ShardIdentifier.builder().memberName("member-1")
56 .shardName("inventory").type("config").build();
58 private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
60 private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
63 public static void staticSetup() {
64 store.onGlobalContextUpdated(testSchemaContext);
67 private ActorRef createShard(){
68 return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
69 Collections.EMPTY_MAP, datastoreContext, TestModel.createTestContext()));
73 public void testOnReceiveReadData() throws Exception {
74 new JavaTestKit(getSystem()) {{
75 final ActorRef shard = createShard();
76 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
77 testSchemaContext, datastoreContext, shardStats, "txn");
78 final ActorRef subject = getSystem().actorOf(props, "testReadData");
80 new Within(duration("1 seconds")) {
82 protected void run() {
85 new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
88 final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
89 // do not put code outside this method, will run afterwards
91 protected String match(Object in) {
92 if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
93 if (ReadDataReply.fromSerializable(testSchemaContext,YangInstanceIdentifier.builder().build(), in)
94 .getNormalizedNode()!= null) {
102 }.get(); // this extracts the received message
104 assertEquals("match", out);
115 public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
116 new JavaTestKit(getSystem()) {{
117 final ActorRef shard = createShard();
118 final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
119 testSchemaContext, datastoreContext, shardStats, "txn");
120 final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
122 new Within(duration("1 seconds")) {
124 protected void run() {
127 new ReadData(TestModel.TEST_PATH).toSerializable(),
130 final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
131 // do not put code outside this method, will run afterwards
133 protected String match(Object in) {
134 if (in.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
135 if (ReadDataReply.fromSerializable(testSchemaContext,TestModel.TEST_PATH, in)
145 }.get(); // this extracts the received message
147 assertEquals("match", out);
158 public void testOnReceiveDataExistsPositive() throws Exception {
159 new JavaTestKit(getSystem()) {{
160 final ActorRef shard = createShard();
161 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
162 testSchemaContext, datastoreContext, shardStats, "txn");
163 final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
165 new Within(duration("1 seconds")) {
167 protected void run() {
170 new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
173 final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
174 // do not put code outside this method, will run afterwards
176 protected String match(Object in) {
177 if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
178 if (DataExistsReply.fromSerializable(in)
187 }.get(); // this extracts the received message
189 assertEquals("match", out);
200 public void testOnReceiveDataExistsNegative() throws Exception {
201 new JavaTestKit(getSystem()) {{
202 final ActorRef shard = createShard();
203 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
204 testSchemaContext, datastoreContext, shardStats, "txn");
205 final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
207 new Within(duration("1 seconds")) {
209 protected void run() {
212 new DataExists(TestModel.TEST_PATH).toSerializable(),
215 final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
216 // do not put code outside this method, will run afterwards
218 protected String match(Object in) {
219 if (in.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
220 if (!DataExistsReply.fromSerializable(in)
229 }.get(); // this extracts the received message
231 assertEquals("match", out);
241 private void assertModification(final ActorRef subject,
242 final Class<? extends Modification> modificationType) {
243 new JavaTestKit(getSystem()) {{
244 new Within(duration("3 seconds")) {
246 protected void run() {
248 .tell(new ShardWriteTransaction.GetCompositedModification(),
251 final CompositeModification compositeModification =
252 new ExpectMsg<CompositeModification>(duration("3 seconds"), "match hint") {
253 // do not put code outside this method, will run afterwards
255 protected CompositeModification match(Object in) {
256 if (in instanceof ShardWriteTransaction.GetCompositeModificationReply) {
257 return ((ShardWriteTransaction.GetCompositeModificationReply) in)
263 }.get(); // this extracts the received message
266 compositeModification.getModifications().size() == 1);
267 assertEquals(modificationType,
268 compositeModification.getModifications().get(0)
277 public void testOnReceiveWriteData() throws Exception {
278 new JavaTestKit(getSystem()) {{
279 final ActorRef shard = createShard();
280 final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
281 testSchemaContext, datastoreContext, shardStats, "txn");
282 final ActorRef subject =
283 getSystem().actorOf(props, "testWriteData");
285 new Within(duration("1 seconds")) {
287 protected void run() {
289 subject.tell(new WriteData(TestModel.TEST_PATH,
290 ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
293 final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
294 // do not put code outside this method, will run afterwards
296 protected String match(Object in) {
297 if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
303 }.get(); // this extracts the received message
305 assertEquals("match", out);
307 assertModification(subject, WriteModification.class);
317 public void testOnReceiveMergeData() throws Exception {
318 new JavaTestKit(getSystem()) {{
319 final ActorRef shard = createShard();
320 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
321 testSchemaContext, datastoreContext, shardStats, "txn");
322 final ActorRef subject =
323 getSystem().actorOf(props, "testMergeData");
325 new Within(duration("1 seconds")) {
327 protected void run() {
329 subject.tell(new MergeData(TestModel.TEST_PATH,
330 ImmutableNodes.containerNode(TestModel.TEST_QNAME), testSchemaContext).toSerializable(),
333 final String out = new ExpectMsg<String>(duration("500 milliseconds"), "match hint") {
334 // do not put code outside this method, will run afterwards
336 protected String match(Object in) {
337 if (in.getClass().equals(MergeDataReply.SERIALIZABLE_CLASS)) {
343 }.get(); // this extracts the received message
345 assertEquals("match", out);
347 assertModification(subject, MergeModification.class);
358 public void testOnReceiveDeleteData() throws Exception {
359 new JavaTestKit(getSystem()) {{
360 final ActorRef shard = createShard();
361 final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
362 testSchemaContext, datastoreContext, shardStats, "txn");
363 final ActorRef subject =
364 getSystem().actorOf(props, "testDeleteData");
366 new Within(duration("1 seconds")) {
368 protected void run() {
370 subject.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
372 final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
373 // do not put code outside this method, will run afterwards
375 protected String match(Object in) {
376 if (in.getClass().equals(DeleteDataReply.SERIALIZABLE_CLASS)) {
382 }.get(); // this extracts the received message
384 assertEquals("match", out);
386 assertModification(subject, DeleteModification.class);
397 public void testOnReceiveReadyTransaction() throws Exception {
398 new JavaTestKit(getSystem()) {{
399 final ActorRef shard = createShard();
400 final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
401 testSchemaContext, datastoreContext, shardStats, "txn");
402 final ActorRef subject =
403 getSystem().actorOf(props, "testReadyTransaction");
405 new Within(duration("1 seconds")) {
407 protected void run() {
409 subject.tell(new ReadyTransaction().toSerializable(), getRef());
411 final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
412 // do not put code outside this method, will run afterwards
414 protected String match(Object in) {
415 if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
421 }.get(); // this extracts the received message
423 assertEquals("match", out);
435 public void testOnReceiveCloseTransaction() throws Exception {
436 new JavaTestKit(getSystem()) {{
437 final ActorRef shard = createShard();
438 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
439 testSchemaContext, datastoreContext, shardStats, "txn");
440 final ActorRef subject =
441 getSystem().actorOf(props, "testCloseTransaction");
445 new Within(duration("6 seconds")) {
447 protected void run() {
449 subject.tell(new CloseTransaction().toSerializable(), getRef());
451 final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
452 // do not put code outside this method, will run afterwards
454 protected String match(Object in) {
455 System.out.println("!!!IN match 1: "+(in!=null?in.getClass():"NULL"));
456 if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
462 }.get(); // this extracts the received message
464 assertEquals("match", out);
466 final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
467 // do not put code outside this method, will run afterwards
469 protected String match(Object in) {
470 System.out.println("!!!IN match 2: "+(in!=null?in.getClass():"NULL"));
471 if (in instanceof Terminated) {
477 }.get(); // this extracts the received message
479 assertEquals("match", termination);
485 @Test(expected=UnknownMessageException.class)
486 public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
487 final ActorRef shard = createShard();
488 final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
489 testSchemaContext, datastoreContext, shardStats, "txn");
490 final TestActorRef subject = TestActorRef.apply(props,getSystem());
492 subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
496 public void testShardTransactionInactivity() {
498 datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
499 Duration.create(500, TimeUnit.MILLISECONDS)).build();
501 new JavaTestKit(getSystem()) {{
502 final ActorRef shard = createShard();
503 final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
504 testSchemaContext, datastoreContext, shardStats, "txn");
505 final ActorRef subject =
506 getSystem().actorOf(props, "testShardTransactionInactivity");
510 // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
512 final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
513 // do not put code outside this method, will run afterwards
515 protected String match(Object in) {
516 if (in instanceof Terminated) {
522 }.get(); // this extracts the received message
524 assertEquals("match", termination);