ab695f90cde087ad24ab348e9ef74b2287619662
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
27 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
28 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
29 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
38 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
39 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
40 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
41 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
45 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
46 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
47 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
48 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
49 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
50 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
51 import org.opendaylight.controller.cluster.datastore.modification.Modification;
52 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
53 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
54 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
55 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
56 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
57 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
58 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
59 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
60 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
62 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
63 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
64 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
65 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
66
67 public class ShardTransactionTest extends AbstractActorTest {
68
69     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
70     private static final TransactionType RO = TransactionType.READ_ONLY;
71     private static final TransactionType RW = TransactionType.READ_WRITE;
72     private static final TransactionType WO = TransactionType.WRITE_ONLY;
73
74     private static final ShardIdentifier SHARD_IDENTIFIER =
75         ShardIdentifier.builder().memberName("member-1")
76             .shardName("inventory").type("config").build();
77
78     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
79
80     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
81
82     private final ShardDataTree store = new ShardDataTree(testSchemaContext);
83
84     private int txCounter = 0;
85
86     private ActorRef createShard() {
87         return getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
88                 schemaContext(TestModel.createTestContext()).props());
89     }
90
91     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
92         return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
93     }
94
95     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
96         return newTransactionActor(type, transaction, null, name, version);
97     }
98
99     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
100         return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
101     }
102
103     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
104             short version) {
105         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
106                 datastoreContext, shardStats, "txn", version);
107         return getSystem().actorOf(props, name);
108     }
109
110     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
111         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
112     }
113
114     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
115         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
116     }
117
118     @Test
119     public void testOnReceiveReadData() throws Exception {
120         new JavaTestKit(getSystem()) {{
121             final ActorRef shard = createShard();
122
123             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
124
125             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
126         }
127
128         private void testOnReceiveReadData(final ActorRef transaction) {
129             //serialized read
130             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
131                 getRef());
132
133             Object replySerialized =
134                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
135
136             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
137
138             // unserialized read
139             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
140
141             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
142
143             assertNotNull(reply.getNormalizedNode());
144         }};
145     }
146
147     @Test
148     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
149         new JavaTestKit(getSystem()) {{
150             final ActorRef shard = createShard();
151
152             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
153                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
154
155             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
156                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
157         }
158
159         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
160             // serialized read
161             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
162
163             Object replySerialized =
164                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
165
166             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
167
168             // unserialized read
169             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
170
171             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
172
173             assertTrue(reply.getNormalizedNode() == null);
174         }};
175     }
176
177     @Test
178     public void testOnReceiveReadDataHeliumR1() throws Exception {
179         new JavaTestKit(getSystem()) {{
180             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
181                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
182
183             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
184                     getRef());
185
186             ShardTransactionMessages.ReadDataReply replySerialized =
187                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
188
189             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
190         }};
191     }
192
193     @Test
194     public void testOnReceiveDataExistsPositive() throws Exception {
195         new JavaTestKit(getSystem()) {{
196             final ActorRef shard = createShard();
197
198             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
199                     "testDataExistsPositiveRO"));
200
201             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
202                     "testDataExistsPositiveRW"));
203         }
204
205         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
206             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
207                 getRef());
208
209             ShardTransactionMessages.DataExistsReply replySerialized =
210                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
211
212             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
213
214             // unserialized read
215             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
216
217             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
218
219             assertTrue(reply.exists());
220         }};
221     }
222
223     @Test
224     public void testOnReceiveDataExistsNegative() throws Exception {
225         new JavaTestKit(getSystem()) {{
226             final ActorRef shard = createShard();
227
228             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
229                     "testDataExistsNegativeRO"));
230
231             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
232                     "testDataExistsNegativeRW"));
233         }
234
235         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
236             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
237
238             ShardTransactionMessages.DataExistsReply replySerialized =
239                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
240
241             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
242
243             // unserialized read
244             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
245
246             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
247
248             assertFalse(reply.exists());
249         }};
250     }
251
252     private void assertModification(final ActorRef subject,
253         final Class<? extends Modification> modificationType) {
254         new JavaTestKit(getSystem()) {{
255             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
256
257             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
258                     GetCompositeModificationReply.class).getModification();
259
260             assertTrue(compositeModification.getModifications().size() == 1);
261             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
262         }};
263     }
264
265     @Test
266     public void testOnReceiveWriteData() {
267         new JavaTestKit(getSystem()) {{
268             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
269                     "testOnReceiveWriteData");
270
271             transaction.tell(new WriteData(TestModel.TEST_PATH,
272                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
273                         toSerializable(), getRef());
274
275             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
276
277             assertModification(transaction, WriteModification.class);
278
279             // unserialized write
280             transaction.tell(new WriteData(TestModel.TEST_PATH,
281                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
282                 getRef());
283
284             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
285         }};
286     }
287
288     @Test
289     public void testOnReceiveHeliumR1WriteData() {
290         new JavaTestKit(getSystem()) {{
291             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
292                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
293
294             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
295                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
296             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
297                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
298                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
299
300             transaction.tell(serialized, getRef());
301
302             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
303
304             assertModification(transaction, WriteModification.class);
305         }};
306     }
307
308     @Test
309     public void testOnReceiveMergeData() {
310         new JavaTestKit(getSystem()) {{
311             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
312                     "testMergeData");
313
314             transaction.tell(new MergeData(TestModel.TEST_PATH,
315                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
316                         toSerializable(), getRef());
317
318             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
319
320             assertModification(transaction, MergeModification.class);
321
322             //unserialized merge
323             transaction.tell(new MergeData(TestModel.TEST_PATH,
324                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
325                 getRef());
326
327             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
328         }};
329     }
330
331     @Test
332     public void testOnReceiveHeliumR1MergeData() throws Exception {
333         new JavaTestKit(getSystem()) {{
334             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
335                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
336
337             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
338                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
339             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
340                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
341                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
342
343             transaction.tell(serialized, getRef());
344
345             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
346
347             assertModification(transaction, MergeModification.class);
348         }};
349     }
350
351     @Test
352     public void testOnReceiveDeleteData() throws Exception {
353         new JavaTestKit(getSystem()) {{
354             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
355                     "testDeleteData");
356
357             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
358                     toSerializable(), getRef());
359
360             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
361
362             assertModification(transaction, DeleteModification.class);
363
364             //unserialized
365             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
366
367             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
368         }};
369     }
370
371     @Test
372     public void testOnReceiveBatchedModifications() throws Exception {
373         new JavaTestKit(getSystem()) {{
374
375             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
376             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
377             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
378             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
379
380             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
381             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
382                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
383                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
384
385             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
386             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
387                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
388
389             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
390
391             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
392             batched.addModification(new WriteModification(writePath, writeData));
393             batched.addModification(new MergeModification(mergePath, mergeData));
394             batched.addModification(new DeleteModification(deletePath));
395
396             transaction.tell(batched, getRef());
397
398             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
399             assertEquals("getNumBatched", 3, reply.getNumBatched());
400
401             JavaTestKit verification = new JavaTestKit(getSystem());
402             transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
403
404             CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
405                         GetCompositeModificationReply.class).getModification();
406
407             assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
408
409             WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
410             assertEquals("getPath", writePath, write.getPath());
411             assertEquals("getData", writeData, write.getData());
412
413             MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
414             assertEquals("getPath", mergePath, merge.getPath());
415             assertEquals("getData", mergeData, merge.getData());
416
417             DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
418             assertEquals("getPath", deletePath, delete.getPath());
419
420             InOrder inOrder = Mockito.inOrder(mockModification);
421             inOrder.verify(mockModification).write(writePath, writeData);
422             inOrder.verify(mockModification).merge(mergePath, mergeData);
423             inOrder.verify(mockModification).delete(deletePath);
424         }};
425     }
426
427     @Test
428     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
429         new JavaTestKit(getSystem()) {{
430
431             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
432                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
433
434             JavaTestKit watcher = new JavaTestKit(getSystem());
435             watcher.watch(transaction);
436
437             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
438             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
439                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
440                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
441
442             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
443             batched.addModification(new WriteModification(writePath, writeData));
444
445             transaction.tell(batched, getRef());
446             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
447             assertEquals("getNumBatched", 1, reply.getNumBatched());
448
449             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
450             batched.setReady(true);
451             batched.setTotalMessagesSent(2);
452
453             transaction.tell(batched, getRef());
454             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
455             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
456         }};
457     }
458
459     @Test
460     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
461         new JavaTestKit(getSystem()) {{
462
463             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
464                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
465
466             JavaTestKit watcher = new JavaTestKit(getSystem());
467             watcher.watch(transaction);
468
469             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
470             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
471                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
472                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
473
474             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
475             batched.addModification(new WriteModification(writePath, writeData));
476             batched.setReady(true);
477             batched.setDoCommitOnReady(true);
478             batched.setTotalMessagesSent(1);
479
480             transaction.tell(batched, getRef());
481             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
482             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
483         }};
484     }
485
486     @Test(expected=TestException.class)
487     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
488         new JavaTestKit(getSystem()) {{
489
490             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
491             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
492             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
493             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
494                     "testOnReceiveBatchedModificationsFailure");
495
496             JavaTestKit watcher = new JavaTestKit(getSystem());
497             watcher.watch(transaction);
498
499             YangInstanceIdentifier path = TestModel.TEST_PATH;
500             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
501
502             doThrow(new TestException()).when(mockModification).write(path, node);
503
504             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
505             batched.addModification(new WriteModification(path, node));
506
507             transaction.tell(batched, getRef());
508             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
509
510             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
511             batched.setReady(true);
512             batched.setTotalMessagesSent(2);
513
514             transaction.tell(batched, getRef());
515             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
516             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
517
518             if(failure != null) {
519                 throw failure.cause();
520             }
521         }};
522     }
523
524     @Test(expected=IllegalStateException.class)
525     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
526         new JavaTestKit(getSystem()) {{
527
528             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
529                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
530
531             JavaTestKit watcher = new JavaTestKit(getSystem());
532             watcher.watch(transaction);
533
534             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
535             batched.setReady(true);
536             batched.setTotalMessagesSent(2);
537
538             transaction.tell(batched, getRef());
539
540             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
541             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
542
543             if(failure != null) {
544                 throw failure.cause();
545             }
546         }};
547     }
548
549     @Test
550     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
551         new JavaTestKit(getSystem()) {{
552             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
553                     "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
554
555             JavaTestKit watcher = new JavaTestKit(getSystem());
556             watcher.watch(transaction);
557
558             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
559
560             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
561             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
562         }};
563
564         // test
565         new JavaTestKit(getSystem()) {{
566             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
567                     "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
568
569             JavaTestKit watcher = new JavaTestKit(getSystem());
570             watcher.watch(transaction);
571
572             transaction.tell(new ReadyTransaction(), getRef());
573
574             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
575             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
576         }};
577     }
578
579     @Test
580     public void testOnReceiveCreateSnapshot() throws Exception {
581         new JavaTestKit(getSystem()) {{
582             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
583                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
584
585             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
586                     YangInstanceIdentifier.builder().build());
587
588             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
589                     "testOnReceiveCreateSnapshot");
590
591             watch(transaction);
592
593             transaction.tell(CreateSnapshot.INSTANCE, getRef());
594
595             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
596
597             assertNotNull("getSnapshot is null", reply.getSnapshot());
598
599             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
600                     reply.getSnapshot());
601
602             assertEquals("Root node", expectedRoot, actualRoot);
603
604             expectTerminated(duration("3 seconds"), transaction);
605         }};
606     }
607
608     @Test
609     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
610         new JavaTestKit(getSystem()) {{
611             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
612                     "testReadWriteTxOnReceiveCloseTransaction");
613
614             watch(transaction);
615
616             transaction.tell(new CloseTransaction().toSerializable(), getRef());
617
618             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
619             expectTerminated(duration("3 seconds"), transaction);
620         }};
621     }
622
623     @Test
624     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
625         new JavaTestKit(getSystem()) {{
626             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
627                     "testWriteTxOnReceiveCloseTransaction");
628
629             watch(transaction);
630
631             transaction.tell(new CloseTransaction().toSerializable(), getRef());
632
633             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
634             expectTerminated(duration("3 seconds"), transaction);
635         }};
636     }
637
638     @Test
639     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
640         new JavaTestKit(getSystem()) {{
641             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
642                     "testReadOnlyTxOnReceiveCloseTransaction");
643
644             watch(transaction);
645
646             transaction.tell(new CloseTransaction().toSerializable(), getRef());
647
648             expectMsgClass(duration("3 seconds"), Terminated.class);
649         }};
650     }
651
652     @Test(expected=UnknownMessageException.class)
653     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
654         final ActorRef shard = createShard();
655         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
656                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
657         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
658
659         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
660                 toSerializable(), ActorRef.noSender());
661     }
662
663     @Test
664     public void testShardTransactionInactivity() {
665
666         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
667                 500, TimeUnit.MILLISECONDS).build();
668
669         new JavaTestKit(getSystem()) {{
670             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
671                     "testShardTransactionInactivity");
672
673             watch(transaction);
674
675             expectMsgClass(duration("3 seconds"), Terminated.class);
676         }};
677     }
678
679     public static class TestException extends RuntimeException {
680         private static final long serialVersionUID = 1L;
681     }
682 }