Make REUSABLE_*_TL final
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.Collections;
23 import java.util.concurrent.TimeUnit;
24 import org.junit.Test;
25 import org.mockito.InOrder;
26 import org.mockito.Mockito;
27 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
28 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
30 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
31 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
38 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
39 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
40 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
42 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
45 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
46 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
47 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
48 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
49 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
50 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
51 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
52 import org.opendaylight.controller.cluster.datastore.modification.Modification;
53 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
54 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
55 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
56 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
57 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
58 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
59 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
60 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
61 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
62 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
63 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
64 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
65 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
66 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
67
68 public class ShardTransactionTest extends AbstractActorTest {
69
70     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
71     private static final TransactionType RO = TransactionType.READ_ONLY;
72     private static final TransactionType RW = TransactionType.READ_WRITE;
73     private static final TransactionType WO = TransactionType.WRITE_ONLY;
74
75     private static final ShardIdentifier SHARD_IDENTIFIER =
76         ShardIdentifier.builder().memberName("member-1")
77             .shardName("inventory").type("config").build();
78
79     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
80
81     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
82
83     private final ShardDataTree store = new ShardDataTree(testSchemaContext);
84
85     private int txCounter = 0;
86
87     private ActorRef createShard() {
88         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
89             Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
90     }
91
92     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
93         return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
94     }
95
96     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
97         return newTransactionActor(type, transaction, null, name, version);
98     }
99
100     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
101         return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
102     }
103
104     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
105             short version) {
106         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
107                 datastoreContext, shardStats, "txn", version);
108         return getSystem().actorOf(props, name);
109     }
110
111     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
112         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
113     }
114
115     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
116         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
117     }
118
119     @Test
120     public void testOnReceiveReadData() throws Exception {
121         new JavaTestKit(getSystem()) {{
122             final ActorRef shard = createShard();
123
124             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
125
126             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
127         }
128
129         private void testOnReceiveReadData(final ActorRef transaction) {
130             //serialized read
131             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
132                 getRef());
133
134             Object replySerialized =
135                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
136
137             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
138
139             // unserialized read
140             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
141
142             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
143
144             assertNotNull(reply.getNormalizedNode());
145         }};
146     }
147
148     @Test
149     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
150         new JavaTestKit(getSystem()) {{
151             final ActorRef shard = createShard();
152
153             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
154                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
155
156             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
157                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
158         }
159
160         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
161             // serialized read
162             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
163
164             Object replySerialized =
165                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
166
167             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
168
169             // unserialized read
170             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
171
172             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
173
174             assertTrue(reply.getNormalizedNode() == null);
175         }};
176     }
177
178     @Test
179     public void testOnReceiveReadDataHeliumR1() throws Exception {
180         new JavaTestKit(getSystem()) {{
181             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
182                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
183
184             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
185                     getRef());
186
187             ShardTransactionMessages.ReadDataReply replySerialized =
188                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
189
190             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
191         }};
192     }
193
194     @Test
195     public void testOnReceiveDataExistsPositive() throws Exception {
196         new JavaTestKit(getSystem()) {{
197             final ActorRef shard = createShard();
198
199             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
200                     "testDataExistsPositiveRO"));
201
202             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
203                     "testDataExistsPositiveRW"));
204         }
205
206         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
207             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
208                 getRef());
209
210             ShardTransactionMessages.DataExistsReply replySerialized =
211                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
212
213             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
214
215             // unserialized read
216             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
217
218             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
219
220             assertTrue(reply.exists());
221         }};
222     }
223
224     @Test
225     public void testOnReceiveDataExistsNegative() throws Exception {
226         new JavaTestKit(getSystem()) {{
227             final ActorRef shard = createShard();
228
229             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
230                     "testDataExistsNegativeRO"));
231
232             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
233                     "testDataExistsNegativeRW"));
234         }
235
236         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
237             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
238
239             ShardTransactionMessages.DataExistsReply replySerialized =
240                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
241
242             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
243
244             // unserialized read
245             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
246
247             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
248
249             assertFalse(reply.exists());
250         }};
251     }
252
253     private void assertModification(final ActorRef subject,
254         final Class<? extends Modification> modificationType) {
255         new JavaTestKit(getSystem()) {{
256             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
257
258             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
259                     GetCompositeModificationReply.class).getModification();
260
261             assertTrue(compositeModification.getModifications().size() == 1);
262             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
263         }};
264     }
265
266     @Test
267     public void testOnReceiveWriteData() {
268         new JavaTestKit(getSystem()) {{
269             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
270                     "testOnReceiveWriteData");
271
272             transaction.tell(new WriteData(TestModel.TEST_PATH,
273                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
274                         toSerializable(), getRef());
275
276             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
277
278             assertModification(transaction, WriteModification.class);
279
280             // unserialized write
281             transaction.tell(new WriteData(TestModel.TEST_PATH,
282                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
283                 getRef());
284
285             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
286         }};
287     }
288
289     @Test
290     public void testOnReceiveHeliumR1WriteData() {
291         new JavaTestKit(getSystem()) {{
292             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
293                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
294
295             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
296                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
297             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
298                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
299                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
300
301             transaction.tell(serialized, getRef());
302
303             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
304
305             assertModification(transaction, WriteModification.class);
306         }};
307     }
308
309     @Test
310     public void testOnReceiveMergeData() {
311         new JavaTestKit(getSystem()) {{
312             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
313                     "testMergeData");
314
315             transaction.tell(new MergeData(TestModel.TEST_PATH,
316                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
317                         toSerializable(), getRef());
318
319             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
320
321             assertModification(transaction, MergeModification.class);
322
323             //unserialized merge
324             transaction.tell(new MergeData(TestModel.TEST_PATH,
325                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
326                 getRef());
327
328             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
329         }};
330     }
331
332     @Test
333     public void testOnReceiveHeliumR1MergeData() throws Exception {
334         new JavaTestKit(getSystem()) {{
335             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
336                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
337
338             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
339                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
340             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
341                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
342                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
343
344             transaction.tell(serialized, getRef());
345
346             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
347
348             assertModification(transaction, MergeModification.class);
349         }};
350     }
351
352     @Test
353     public void testOnReceiveDeleteData() throws Exception {
354         new JavaTestKit(getSystem()) {{
355             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
356                     "testDeleteData");
357
358             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
359                     toSerializable(), getRef());
360
361             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
362
363             assertModification(transaction, DeleteModification.class);
364
365             //unserialized
366             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
367
368             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
369         }};
370     }
371
372     @Test
373     public void testOnReceiveBatchedModifications() throws Exception {
374         new JavaTestKit(getSystem()) {{
375
376             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
377             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
378             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
379             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
380
381             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
382             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
383                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
384                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
385
386             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
387             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
388                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
389
390             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
391
392             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
393             batched.addModification(new WriteModification(writePath, writeData));
394             batched.addModification(new MergeModification(mergePath, mergeData));
395             batched.addModification(new DeleteModification(deletePath));
396
397             transaction.tell(batched, getRef());
398
399             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
400             assertEquals("getNumBatched", 3, reply.getNumBatched());
401
402             JavaTestKit verification = new JavaTestKit(getSystem());
403             transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
404
405             CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
406                         GetCompositeModificationReply.class).getModification();
407
408             assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
409
410             WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
411             assertEquals("getPath", writePath, write.getPath());
412             assertEquals("getData", writeData, write.getData());
413
414             MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
415             assertEquals("getPath", mergePath, merge.getPath());
416             assertEquals("getData", mergeData, merge.getData());
417
418             DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
419             assertEquals("getPath", deletePath, delete.getPath());
420
421             InOrder inOrder = Mockito.inOrder(mockModification);
422             inOrder.verify(mockModification).write(writePath, writeData);
423             inOrder.verify(mockModification).merge(mergePath, mergeData);
424             inOrder.verify(mockModification).delete(deletePath);
425         }};
426     }
427
428     @Test
429     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
430         new JavaTestKit(getSystem()) {{
431
432             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
433                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
434
435             JavaTestKit watcher = new JavaTestKit(getSystem());
436             watcher.watch(transaction);
437
438             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
439             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
440                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
441                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
442
443             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
444             batched.addModification(new WriteModification(writePath, writeData));
445
446             transaction.tell(batched, getRef());
447             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
448             assertEquals("getNumBatched", 1, reply.getNumBatched());
449
450             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
451             batched.setReady(true);
452             batched.setTotalMessagesSent(2);
453
454             transaction.tell(batched, getRef());
455             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
456             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
457         }};
458     }
459
460     @Test
461     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
462         new JavaTestKit(getSystem()) {{
463
464             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
465                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
466
467             JavaTestKit watcher = new JavaTestKit(getSystem());
468             watcher.watch(transaction);
469
470             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
471             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
472                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
473                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
474
475             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
476             batched.addModification(new WriteModification(writePath, writeData));
477             batched.setReady(true);
478             batched.setDoCommitOnReady(true);
479             batched.setTotalMessagesSent(1);
480
481             transaction.tell(batched, getRef());
482             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
483             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
484         }};
485     }
486
487     @Test(expected=TestException.class)
488     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
489         new JavaTestKit(getSystem()) {{
490
491             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
492             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
493             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
494             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
495                     "testOnReceiveBatchedModificationsFailure");
496
497             JavaTestKit watcher = new JavaTestKit(getSystem());
498             watcher.watch(transaction);
499
500             YangInstanceIdentifier path = TestModel.TEST_PATH;
501             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
502
503             doThrow(new TestException()).when(mockModification).write(path, node);
504
505             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
506             batched.addModification(new WriteModification(path, node));
507
508             transaction.tell(batched, getRef());
509             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
510
511             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
512             batched.setReady(true);
513             batched.setTotalMessagesSent(2);
514
515             transaction.tell(batched, getRef());
516             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
517             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
518
519             if(failure != null) {
520                 throw failure.cause();
521             }
522         }};
523     }
524
525     @Test(expected=IllegalStateException.class)
526     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
527         new JavaTestKit(getSystem()) {{
528
529             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
530                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
531
532             JavaTestKit watcher = new JavaTestKit(getSystem());
533             watcher.watch(transaction);
534
535             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
536             batched.setReady(true);
537             batched.setTotalMessagesSent(2);
538
539             transaction.tell(batched, getRef());
540
541             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
542             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
543
544             if(failure != null) {
545                 throw failure.cause();
546             }
547         }};
548     }
549
550     @Test
551     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
552         new JavaTestKit(getSystem()) {{
553             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
554                     "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
555
556             JavaTestKit watcher = new JavaTestKit(getSystem());
557             watcher.watch(transaction);
558
559             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
560
561             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
562             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
563         }};
564
565         // test
566         new JavaTestKit(getSystem()) {{
567             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
568                     "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
569
570             JavaTestKit watcher = new JavaTestKit(getSystem());
571             watcher.watch(transaction);
572
573             transaction.tell(new ReadyTransaction(), getRef());
574
575             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
576             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
577         }};
578     }
579
580     @Test
581     public void testOnReceiveCreateSnapshot() throws Exception {
582         new JavaTestKit(getSystem()) {{
583             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
584                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
585
586             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
587                     YangInstanceIdentifier.builder().build());
588
589             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
590                     "testOnReceiveCreateSnapshot");
591
592             watch(transaction);
593
594             transaction.tell(CreateSnapshot.INSTANCE, getRef());
595
596             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
597
598             assertNotNull("getSnapshot is null", reply.getSnapshot());
599
600             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
601                     reply.getSnapshot());
602
603             assertEquals("Root node", expectedRoot, actualRoot);
604
605             expectTerminated(duration("3 seconds"), transaction);
606         }};
607     }
608
609     @Test
610     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
611         new JavaTestKit(getSystem()) {{
612             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
613                     "testReadWriteTxOnReceiveCloseTransaction");
614
615             watch(transaction);
616
617             transaction.tell(new CloseTransaction().toSerializable(), getRef());
618
619             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
620             expectTerminated(duration("3 seconds"), transaction);
621         }};
622     }
623
624     @Test
625     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
626         new JavaTestKit(getSystem()) {{
627             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
628                     "testWriteTxOnReceiveCloseTransaction");
629
630             watch(transaction);
631
632             transaction.tell(new CloseTransaction().toSerializable(), getRef());
633
634             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
635             expectTerminated(duration("3 seconds"), transaction);
636         }};
637     }
638
639     @Test
640     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
641         new JavaTestKit(getSystem()) {{
642             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
643                     "testReadOnlyTxOnReceiveCloseTransaction");
644
645             watch(transaction);
646
647             transaction.tell(new CloseTransaction().toSerializable(), getRef());
648
649             expectMsgClass(duration("3 seconds"), Terminated.class);
650         }};
651     }
652
653     @Test(expected=UnknownMessageException.class)
654     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
655         final ActorRef shard = createShard();
656         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
657                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
658         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
659
660         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
661                 toSerializable(), ActorRef.noSender());
662     }
663
664     @Test
665     public void testShardTransactionInactivity() {
666
667         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
668                 500, TimeUnit.MILLISECONDS).build();
669
670         new JavaTestKit(getSystem()) {{
671             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
672                     "testShardTransactionInactivity");
673
674             watch(transaction);
675
676             expectMsgClass(duration("3 seconds"), Terminated.class);
677         }};
678     }
679
680     public static class TestException extends RuntimeException {
681         private static final long serialVersionUID = 1L;
682     }
683 }