Merge "Event Source: switch from wildcards to regexs"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import static org.mockito.Mockito.doThrow;
8 import akka.actor.ActorRef;
9 import akka.actor.Props;
10 import akka.actor.Status.Failure;
11 import akka.actor.Terminated;
12 import akka.testkit.JavaTestKit;
13 import akka.testkit.TestActorRef;
14 import java.util.Collections;
15 import java.util.concurrent.TimeUnit;
16 import org.junit.Test;
17 import org.mockito.InOrder;
18 import org.mockito.Mockito;
19 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
20 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
21 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
42 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
44 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
45 import org.opendaylight.controller.cluster.datastore.modification.Modification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
48 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
49 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
51 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
52 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
57 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
58 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
59 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
60
61 public class ShardTransactionTest extends AbstractActorTest {
62
63     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
64     private static final TransactionType RO = TransactionType.READ_ONLY;
65     private static final TransactionType RW = TransactionType.READ_WRITE;
66     private static final TransactionType WO = TransactionType.WRITE_ONLY;
67
68     private static final ShardIdentifier SHARD_IDENTIFIER =
69         ShardIdentifier.builder().memberName("member-1")
70             .shardName("inventory").type("config").build();
71
72     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
73
74     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
75
76     private final ShardDataTree store = new ShardDataTree(testSchemaContext);
77
78     private int txCounter = 0;
79
80     private ActorRef createShard() {
81         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
82             Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
83     }
84
85     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
86         return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
87     }
88
89     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
90         return newTransactionActor(type, transaction, null, name, version);
91     }
92
93     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
94         return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
95     }
96
97     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
98             short version) {
99         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
100                 datastoreContext, shardStats, "txn", version);
101         return getSystem().actorOf(props, name);
102     }
103
104     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
105         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
106     }
107
108     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
109         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
110     }
111
112     @Test
113     public void testOnReceiveReadData() throws Exception {
114         new JavaTestKit(getSystem()) {{
115             final ActorRef shard = createShard();
116
117             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
118
119             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
120         }
121
122         private void testOnReceiveReadData(final ActorRef transaction) {
123             //serialized read
124             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
125                 getRef());
126
127             Object replySerialized =
128                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
129
130             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
131
132             // unserialized read
133             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
134
135             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
136
137             assertNotNull(reply.getNormalizedNode());
138         }};
139     }
140
141     @Test
142     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
143         new JavaTestKit(getSystem()) {{
144             final ActorRef shard = createShard();
145
146             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
147                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
148
149             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
150                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
151         }
152
153         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
154             // serialized read
155             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
156
157             Object replySerialized =
158                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
159
160             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
161
162             // unserialized read
163             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
164
165             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
166
167             assertTrue(reply.getNormalizedNode() == null);
168         }};
169     }
170
171     @Test
172     public void testOnReceiveReadDataHeliumR1() throws Exception {
173         new JavaTestKit(getSystem()) {{
174             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
175                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
176
177             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
178                     getRef());
179
180             ShardTransactionMessages.ReadDataReply replySerialized =
181                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
182
183             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
184         }};
185     }
186
187     @Test
188     public void testOnReceiveDataExistsPositive() throws Exception {
189         new JavaTestKit(getSystem()) {{
190             final ActorRef shard = createShard();
191
192             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
193                     "testDataExistsPositiveRO"));
194
195             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
196                     "testDataExistsPositiveRW"));
197         }
198
199         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
200             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
201                 getRef());
202
203             ShardTransactionMessages.DataExistsReply replySerialized =
204                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
205
206             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
207
208             // unserialized read
209             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
210
211             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
212
213             assertTrue(reply.exists());
214         }};
215     }
216
217     @Test
218     public void testOnReceiveDataExistsNegative() throws Exception {
219         new JavaTestKit(getSystem()) {{
220             final ActorRef shard = createShard();
221
222             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
223                     "testDataExistsNegativeRO"));
224
225             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
226                     "testDataExistsNegativeRW"));
227         }
228
229         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
230             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
231
232             ShardTransactionMessages.DataExistsReply replySerialized =
233                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
234
235             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
236
237             // unserialized read
238             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
239
240             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
241
242             assertFalse(reply.exists());
243         }};
244     }
245
246     private void assertModification(final ActorRef subject,
247         final Class<? extends Modification> modificationType) {
248         new JavaTestKit(getSystem()) {{
249             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
250
251             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
252                     GetCompositeModificationReply.class).getModification();
253
254             assertTrue(compositeModification.getModifications().size() == 1);
255             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
256         }};
257     }
258
259     @Test
260     public void testOnReceiveWriteData() {
261         new JavaTestKit(getSystem()) {{
262             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
263                     "testOnReceiveWriteData");
264
265             transaction.tell(new WriteData(TestModel.TEST_PATH,
266                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
267                         toSerializable(), getRef());
268
269             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
270
271             assertModification(transaction, WriteModification.class);
272
273             // unserialized write
274             transaction.tell(new WriteData(TestModel.TEST_PATH,
275                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
276                 getRef());
277
278             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
279         }};
280     }
281
282     @Test
283     public void testOnReceiveHeliumR1WriteData() {
284         new JavaTestKit(getSystem()) {{
285             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
286                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
287
288             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
289                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
290             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
291                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
292                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
293
294             transaction.tell(serialized, getRef());
295
296             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
297
298             assertModification(transaction, WriteModification.class);
299         }};
300     }
301
302     @Test
303     public void testOnReceiveMergeData() {
304         new JavaTestKit(getSystem()) {{
305             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
306                     "testMergeData");
307
308             transaction.tell(new MergeData(TestModel.TEST_PATH,
309                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
310                         toSerializable(), getRef());
311
312             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
313
314             assertModification(transaction, MergeModification.class);
315
316             //unserialized merge
317             transaction.tell(new MergeData(TestModel.TEST_PATH,
318                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
319                 getRef());
320
321             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
322         }};
323     }
324
325     @Test
326     public void testOnReceiveHeliumR1MergeData() throws Exception {
327         new JavaTestKit(getSystem()) {{
328             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
329                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
330
331             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
332                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
333             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
334                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
335                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
336
337             transaction.tell(serialized, getRef());
338
339             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
340
341             assertModification(transaction, MergeModification.class);
342         }};
343     }
344
345     @Test
346     public void testOnReceiveDeleteData() throws Exception {
347         new JavaTestKit(getSystem()) {{
348             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
349                     "testDeleteData");
350
351             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
352                     toSerializable(), getRef());
353
354             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
355
356             assertModification(transaction, DeleteModification.class);
357
358             //unserialized
359             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
360
361             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
362         }};
363     }
364
365     @Test
366     public void testOnReceiveBatchedModifications() throws Exception {
367         new JavaTestKit(getSystem()) {{
368
369             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
370             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
371             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
372             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
373
374             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
375             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
376                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
377                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
378
379             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
380             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
381                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
382
383             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
384
385             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
386             batched.addModification(new WriteModification(writePath, writeData));
387             batched.addModification(new MergeModification(mergePath, mergeData));
388             batched.addModification(new DeleteModification(deletePath));
389
390             transaction.tell(batched, getRef());
391
392             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
393             assertEquals("getNumBatched", 3, reply.getNumBatched());
394
395             JavaTestKit verification = new JavaTestKit(getSystem());
396             transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
397
398             CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
399                         GetCompositeModificationReply.class).getModification();
400
401             assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
402
403             WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
404             assertEquals("getPath", writePath, write.getPath());
405             assertEquals("getData", writeData, write.getData());
406
407             MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
408             assertEquals("getPath", mergePath, merge.getPath());
409             assertEquals("getData", mergeData, merge.getData());
410
411             DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
412             assertEquals("getPath", deletePath, delete.getPath());
413
414             InOrder inOrder = Mockito.inOrder(mockModification);
415             inOrder.verify(mockModification).write(writePath, writeData);
416             inOrder.verify(mockModification).merge(mergePath, mergeData);
417             inOrder.verify(mockModification).delete(deletePath);
418         }};
419     }
420
421     @Test
422     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
423         new JavaTestKit(getSystem()) {{
424
425             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
426                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
427
428             JavaTestKit watcher = new JavaTestKit(getSystem());
429             watcher.watch(transaction);
430
431             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
432             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
433                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
434                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
435
436             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
437             batched.addModification(new WriteModification(writePath, writeData));
438
439             transaction.tell(batched, getRef());
440             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
441             assertEquals("getNumBatched", 1, reply.getNumBatched());
442
443             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
444             batched.setReady(true);
445             batched.setTotalMessagesSent(2);
446
447             transaction.tell(batched, getRef());
448             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
449             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
450         }};
451     }
452
453     @Test
454     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
455         new JavaTestKit(getSystem()) {{
456
457             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
458                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
459
460             JavaTestKit watcher = new JavaTestKit(getSystem());
461             watcher.watch(transaction);
462
463             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
464             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
465                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
466                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
467
468             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
469             batched.addModification(new WriteModification(writePath, writeData));
470             batched.setReady(true);
471             batched.setDoCommitOnReady(true);
472             batched.setTotalMessagesSent(1);
473
474             transaction.tell(batched, getRef());
475             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
476             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
477         }};
478     }
479
480     @Test(expected=TestException.class)
481     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
482         new JavaTestKit(getSystem()) {{
483
484             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
485             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
486             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
487             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
488                     "testOnReceiveBatchedModificationsFailure");
489
490             JavaTestKit watcher = new JavaTestKit(getSystem());
491             watcher.watch(transaction);
492
493             YangInstanceIdentifier path = TestModel.TEST_PATH;
494             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
495
496             doThrow(new TestException()).when(mockModification).write(path, node);
497
498             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
499             batched.addModification(new WriteModification(path, node));
500
501             transaction.tell(batched, getRef());
502             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
503
504             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
505             batched.setReady(true);
506             batched.setTotalMessagesSent(2);
507
508             transaction.tell(batched, getRef());
509             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
510             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
511
512             if(failure != null) {
513                 throw failure.cause();
514             }
515         }};
516     }
517
518     @Test(expected=IllegalStateException.class)
519     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
520         new JavaTestKit(getSystem()) {{
521
522             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
523                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
524
525             JavaTestKit watcher = new JavaTestKit(getSystem());
526             watcher.watch(transaction);
527
528             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
529             batched.setReady(true);
530             batched.setTotalMessagesSent(2);
531
532             transaction.tell(batched, getRef());
533
534             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
535             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
536
537             if(failure != null) {
538                 throw failure.cause();
539             }
540         }};
541     }
542
543     @Test
544     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
545         new JavaTestKit(getSystem()) {{
546             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
547                     "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
548
549             JavaTestKit watcher = new JavaTestKit(getSystem());
550             watcher.watch(transaction);
551
552             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
553
554             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
555             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
556         }};
557
558         // test
559         new JavaTestKit(getSystem()) {{
560             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
561                     "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
562
563             JavaTestKit watcher = new JavaTestKit(getSystem());
564             watcher.watch(transaction);
565
566             transaction.tell(new ReadyTransaction(), getRef());
567
568             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
569             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
570         }};
571     }
572
573     @Test
574     public void testOnReceiveCreateSnapshot() throws Exception {
575         new JavaTestKit(getSystem()) {{
576             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
577                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
578
579             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
580                     YangInstanceIdentifier.builder().build());
581
582             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
583                     "testOnReceiveCreateSnapshot");
584
585             watch(transaction);
586
587             transaction.tell(CreateSnapshot.INSTANCE, getRef());
588
589             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
590
591             assertNotNull("getSnapshot is null", reply.getSnapshot());
592
593             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
594                     reply.getSnapshot());
595
596             assertEquals("Root node", expectedRoot, actualRoot);
597
598             expectTerminated(duration("3 seconds"), transaction);
599         }};
600     }
601
602     @Test
603     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
604         new JavaTestKit(getSystem()) {{
605             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
606                     "testReadWriteTxOnReceiveCloseTransaction");
607
608             watch(transaction);
609
610             transaction.tell(new CloseTransaction().toSerializable(), getRef());
611
612             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
613             expectTerminated(duration("3 seconds"), transaction);
614         }};
615     }
616
617     @Test
618     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
619         new JavaTestKit(getSystem()) {{
620             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
621                     "testWriteTxOnReceiveCloseTransaction");
622
623             watch(transaction);
624
625             transaction.tell(new CloseTransaction().toSerializable(), getRef());
626
627             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
628             expectTerminated(duration("3 seconds"), transaction);
629         }};
630     }
631
632     @Test
633     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
634         new JavaTestKit(getSystem()) {{
635             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
636                     "testReadOnlyTxOnReceiveCloseTransaction");
637
638             watch(transaction);
639
640             transaction.tell(new CloseTransaction().toSerializable(), getRef());
641
642             expectMsgClass(duration("3 seconds"), Terminated.class);
643         }};
644     }
645
646     @Test(expected=UnknownMessageException.class)
647     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
648         final ActorRef shard = createShard();
649         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
650                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
651         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
652
653         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
654                 toSerializable(), ActorRef.noSender());
655     }
656
657     @Test
658     public void testShardTransactionInactivity() {
659
660         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
661                 500, TimeUnit.MILLISECONDS).build();
662
663         new JavaTestKit(getSystem()) {{
664             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
665                     "testShardTransactionInactivity");
666
667             watch(transaction);
668
669             expectMsgClass(duration("3 seconds"), Terminated.class);
670         }};
671     }
672
673     public static class TestException extends RuntimeException {
674         private static final long serialVersionUID = 1L;
675     }
676 }