Remove CompositeModification field in ShardWriteTransaction
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
38 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
40 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
46 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
47 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
48 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
49 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
50 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
51 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
52 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
54 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
55 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
60 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
61 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
62 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64
65 public class ShardTransactionTest extends AbstractActorTest {
66
67     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
68     private static final TransactionType RO = TransactionType.READ_ONLY;
69     private static final TransactionType RW = TransactionType.READ_WRITE;
70     private static final TransactionType WO = TransactionType.WRITE_ONLY;
71
72     private static final ShardIdentifier SHARD_IDENTIFIER =
73         ShardIdentifier.builder().memberName("member-1")
74             .shardName("inventory").type("config").build();
75
76     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
77
78     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
79
80     private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
81
82     private int txCounter = 0;
83
84     private ActorRef createShard() {
85         return getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
86                 schemaContext(TestModel.createTestContext()).props());
87     }
88
89     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
90         return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
91     }
92
93     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
94         return newTransactionActor(type, transaction, null, name, version);
95     }
96
97     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
98         return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
99     }
100
101     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
102             short version) {
103         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
104                 datastoreContext, shardStats, "txn", version);
105         return getSystem().actorOf(props, name);
106     }
107
108     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
109         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
110     }
111
112     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
113         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
114     }
115
116     @Test
117     public void testOnReceiveReadData() throws Exception {
118         new JavaTestKit(getSystem()) {{
119             final ActorRef shard = createShard();
120
121             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
122
123             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
124         }
125
126         private void testOnReceiveReadData(final ActorRef transaction) {
127             //serialized read
128             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
129                 getRef());
130
131             Object replySerialized =
132                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
133
134             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
135
136             // unserialized read
137             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
138
139             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
140
141             assertNotNull(reply.getNormalizedNode());
142         }};
143     }
144
145     @Test
146     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
147         new JavaTestKit(getSystem()) {{
148             final ActorRef shard = createShard();
149
150             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
151                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
152
153             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
154                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
155         }
156
157         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
158             // serialized read
159             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
160
161             Object replySerialized =
162                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
163
164             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
165
166             // unserialized read
167             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
168
169             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
170
171             assertTrue(reply.getNormalizedNode() == null);
172         }};
173     }
174
175     @Test
176     public void testOnReceiveReadDataHeliumR1() throws Exception {
177         new JavaTestKit(getSystem()) {{
178             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
179                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
180
181             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
182                     getRef());
183
184             ShardTransactionMessages.ReadDataReply replySerialized =
185                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
186
187             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
188         }};
189     }
190
191     @Test
192     public void testOnReceiveDataExistsPositive() throws Exception {
193         new JavaTestKit(getSystem()) {{
194             final ActorRef shard = createShard();
195
196             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
197                     "testDataExistsPositiveRO"));
198
199             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
200                     "testDataExistsPositiveRW"));
201         }
202
203         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
204             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
205                 getRef());
206
207             ShardTransactionMessages.DataExistsReply replySerialized =
208                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
209
210             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
211
212             // unserialized read
213             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
214
215             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
216
217             assertTrue(reply.exists());
218         }};
219     }
220
221     @Test
222     public void testOnReceiveDataExistsNegative() throws Exception {
223         new JavaTestKit(getSystem()) {{
224             final ActorRef shard = createShard();
225
226             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
227                     "testDataExistsNegativeRO"));
228
229             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
230                     "testDataExistsNegativeRW"));
231         }
232
233         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
234             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
235
236             ShardTransactionMessages.DataExistsReply replySerialized =
237                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
238
239             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
240
241             // unserialized read
242             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
243
244             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
245
246             assertFalse(reply.exists());
247         }};
248     }
249
250     @Test
251     public void testOnReceiveWriteData() {
252         new JavaTestKit(getSystem()) {{
253             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
254                     "testOnReceiveWriteData");
255
256             transaction.tell(new WriteData(TestModel.TEST_PATH,
257                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
258                         toSerializable(), getRef());
259
260             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
261
262             // unserialized write
263             transaction.tell(new WriteData(TestModel.TEST_PATH,
264                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
265                 getRef());
266
267             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
268         }};
269     }
270
271     @Test
272     public void testOnReceiveHeliumR1WriteData() {
273         new JavaTestKit(getSystem()) {{
274             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
275                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
276
277             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
278                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
279             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
280                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
281                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
282
283             transaction.tell(serialized, getRef());
284
285             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
286         }};
287     }
288
289     @Test
290     public void testOnReceiveMergeData() {
291         new JavaTestKit(getSystem()) {{
292             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
293                     "testMergeData");
294
295             transaction.tell(new MergeData(TestModel.TEST_PATH,
296                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
297                         toSerializable(), getRef());
298
299             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
300
301             //unserialized merge
302             transaction.tell(new MergeData(TestModel.TEST_PATH,
303                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
304                 getRef());
305
306             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
307         }};
308     }
309
310     @Test
311     public void testOnReceiveHeliumR1MergeData() throws Exception {
312         new JavaTestKit(getSystem()) {{
313             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
314                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
315
316             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
317                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
318             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
319                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
320                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
321
322             transaction.tell(serialized, getRef());
323
324             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
325         }};
326     }
327
328     @Test
329     public void testOnReceiveDeleteData() throws Exception {
330         new JavaTestKit(getSystem()) {{
331             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
332                     "testDeleteData");
333
334             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
335                     toSerializable(), getRef());
336
337             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
338
339             //unserialized
340             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
341
342             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
343         }};
344     }
345
346     @Test
347     public void testOnReceiveBatchedModifications() throws Exception {
348         new JavaTestKit(getSystem()) {{
349
350             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
351             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
352             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
353             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
354
355             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
356             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
357                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
358                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
359
360             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
361             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
362                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
363
364             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
365
366             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
367             batched.addModification(new WriteModification(writePath, writeData));
368             batched.addModification(new MergeModification(mergePath, mergeData));
369             batched.addModification(new DeleteModification(deletePath));
370
371             transaction.tell(batched, getRef());
372
373             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
374             assertEquals("getNumBatched", 3, reply.getNumBatched());
375
376             InOrder inOrder = Mockito.inOrder(mockModification);
377             inOrder.verify(mockModification).write(writePath, writeData);
378             inOrder.verify(mockModification).merge(mergePath, mergeData);
379             inOrder.verify(mockModification).delete(deletePath);
380         }};
381     }
382
383     @Test
384     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
385         new JavaTestKit(getSystem()) {{
386
387             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
388                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
389
390             JavaTestKit watcher = new JavaTestKit(getSystem());
391             watcher.watch(transaction);
392
393             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
394             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
395                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
396                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
397
398             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
399             batched.addModification(new WriteModification(writePath, writeData));
400
401             transaction.tell(batched, getRef());
402             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
403             assertEquals("getNumBatched", 1, reply.getNumBatched());
404
405             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
406             batched.setReady(true);
407             batched.setTotalMessagesSent(2);
408
409             transaction.tell(batched, getRef());
410             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
411             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
412         }};
413     }
414
415     @Test
416     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
417         new JavaTestKit(getSystem()) {{
418
419             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
420                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
421
422             JavaTestKit watcher = new JavaTestKit(getSystem());
423             watcher.watch(transaction);
424
425             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
426             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
427                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
428                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
429
430             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
431             batched.addModification(new WriteModification(writePath, writeData));
432             batched.setReady(true);
433             batched.setDoCommitOnReady(true);
434             batched.setTotalMessagesSent(1);
435
436             transaction.tell(batched, getRef());
437             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
438             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
439         }};
440     }
441
442     @Test(expected=TestException.class)
443     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
444         new JavaTestKit(getSystem()) {{
445
446             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
447             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
448             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
449             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
450                     "testOnReceiveBatchedModificationsFailure");
451
452             JavaTestKit watcher = new JavaTestKit(getSystem());
453             watcher.watch(transaction);
454
455             YangInstanceIdentifier path = TestModel.TEST_PATH;
456             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
457
458             doThrow(new TestException()).when(mockModification).write(path, node);
459
460             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
461             batched.addModification(new WriteModification(path, node));
462
463             transaction.tell(batched, getRef());
464             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
465
466             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
467             batched.setReady(true);
468             batched.setTotalMessagesSent(2);
469
470             transaction.tell(batched, getRef());
471             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
472             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
473
474             if(failure != null) {
475                 throw failure.cause();
476             }
477         }};
478     }
479
480     @Test(expected=IllegalStateException.class)
481     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
482         new JavaTestKit(getSystem()) {{
483
484             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
485                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
486
487             JavaTestKit watcher = new JavaTestKit(getSystem());
488             watcher.watch(transaction);
489
490             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
491             batched.setReady(true);
492             batched.setTotalMessagesSent(2);
493
494             transaction.tell(batched, getRef());
495
496             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
497             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
498
499             if(failure != null) {
500                 throw failure.cause();
501             }
502         }};
503     }
504
505     @Test
506     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
507         new JavaTestKit(getSystem()) {{
508             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
509                     "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
510
511             JavaTestKit watcher = new JavaTestKit(getSystem());
512             watcher.watch(transaction);
513
514             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
515
516             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
517             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
518         }};
519
520         // test
521         new JavaTestKit(getSystem()) {{
522             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
523                     "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
524
525             JavaTestKit watcher = new JavaTestKit(getSystem());
526             watcher.watch(transaction);
527
528             transaction.tell(new ReadyTransaction(), getRef());
529
530             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
531             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
532         }};
533     }
534
535     @Test
536     public void testOnReceiveCreateSnapshot() throws Exception {
537         new JavaTestKit(getSystem()) {{
538             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
539                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
540
541             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
542                     YangInstanceIdentifier.builder().build());
543
544             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
545                     "testOnReceiveCreateSnapshot");
546
547             watch(transaction);
548
549             transaction.tell(CreateSnapshot.INSTANCE, getRef());
550
551             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
552
553             assertNotNull("getSnapshot is null", reply.getSnapshot());
554
555             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
556                     reply.getSnapshot());
557
558             assertEquals("Root node", expectedRoot, actualRoot);
559
560             expectTerminated(duration("3 seconds"), transaction);
561         }};
562     }
563
564     @Test
565     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
566         new JavaTestKit(getSystem()) {{
567             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
568                     "testReadWriteTxOnReceiveCloseTransaction");
569
570             watch(transaction);
571
572             transaction.tell(new CloseTransaction().toSerializable(), getRef());
573
574             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
575             expectTerminated(duration("3 seconds"), transaction);
576         }};
577     }
578
579     @Test
580     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
581         new JavaTestKit(getSystem()) {{
582             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
583                     "testWriteTxOnReceiveCloseTransaction");
584
585             watch(transaction);
586
587             transaction.tell(new CloseTransaction().toSerializable(), getRef());
588
589             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
590             expectTerminated(duration("3 seconds"), transaction);
591         }};
592     }
593
594     @Test
595     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
596         new JavaTestKit(getSystem()) {{
597             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
598                     "testReadOnlyTxOnReceiveCloseTransaction");
599
600             watch(transaction);
601
602             transaction.tell(new CloseTransaction().toSerializable(), getRef());
603
604             expectMsgClass(duration("3 seconds"), Terminated.class);
605         }};
606     }
607
608     @Test(expected=UnknownMessageException.class)
609     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
610         final ActorRef shard = createShard();
611         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
612                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
613         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
614
615         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
616                 toSerializable(), ActorRef.noSender());
617     }
618
619     @Test
620     public void testShardTransactionInactivity() {
621
622         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
623                 500, TimeUnit.MILLISECONDS).build();
624
625         new JavaTestKit(getSystem()) {{
626             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
627                     "testShardTransactionInactivity");
628
629             watch(transaction);
630
631             expectMsgClass(duration("3 seconds"), Terminated.class);
632         }};
633     }
634
635     public static class TestException extends RuntimeException {
636         private static final long serialVersionUID = 1L;
637     }
638 }