ae9694a8572b7c71303a437558b204fd6e6f959b
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
38 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
40 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
44 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
45 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
46 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
47 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
48 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
49 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
50 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
51 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
52 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
53 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
54 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
57 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
60 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
61 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
62 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
63
64 public class ShardTransactionTest extends AbstractActorTest {
65
66     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
67     private static final TransactionType RO = TransactionType.READ_ONLY;
68     private static final TransactionType RW = TransactionType.READ_WRITE;
69     private static final TransactionType WO = TransactionType.WRITE_ONLY;
70
71     private static final ShardIdentifier SHARD_IDENTIFIER =
72         ShardIdentifier.builder().memberName("member-1")
73             .shardName("inventory").type("config").build();
74
75     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
76
77     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
78
79     private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
80
81     private int txCounter = 0;
82
83     private ActorRef createShard() {
84         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
85                 schemaContext(TestModel.createTestContext()).props());
86         ShardTestKit.waitUntilLeader(shard);
87         return shard;
88     }
89
90     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
91         return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
92     }
93
94     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
95         return newTransactionActor(type, transaction, null, name, version);
96     }
97
98     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
99         return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
100     }
101
102     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
103             short version) {
104         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
105                 datastoreContext, shardStats, "txn", version);
106         return getSystem().actorOf(props, name);
107     }
108
109     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
110         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
111     }
112
113     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
114         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
115     }
116
117     @Test
118     public void testOnReceiveReadData() throws Exception {
119         new JavaTestKit(getSystem()) {{
120             final ActorRef shard = createShard();
121
122             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
123
124             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
125         }
126
127         private void testOnReceiveReadData(final ActorRef transaction) {
128             //serialized read
129             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
130                 getRef());
131
132             Object replySerialized =
133                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
134
135             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
136
137             // unserialized read
138             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
139
140             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
141
142             assertNotNull(reply.getNormalizedNode());
143         }};
144     }
145
146     @Test
147     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
148         new JavaTestKit(getSystem()) {{
149             final ActorRef shard = createShard();
150
151             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
152                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
153
154             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
155                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
156         }
157
158         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
159             // serialized read
160             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
161
162             Object replySerialized =
163                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
164
165             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
166
167             // unserialized read
168             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
169
170             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
171
172             assertTrue(reply.getNormalizedNode() == null);
173         }};
174     }
175
176     @Test
177     public void testOnReceiveReadDataHeliumR1() throws Exception {
178         new JavaTestKit(getSystem()) {{
179             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
180                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
181
182             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
183                     getRef());
184
185             ShardTransactionMessages.ReadDataReply replySerialized =
186                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
187
188             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
189         }};
190     }
191
192     @Test
193     public void testOnReceiveDataExistsPositive() throws Exception {
194         new JavaTestKit(getSystem()) {{
195             final ActorRef shard = createShard();
196
197             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
198                     "testDataExistsPositiveRO"));
199
200             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
201                     "testDataExistsPositiveRW"));
202         }
203
204         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
205             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
206                 getRef());
207
208             ShardTransactionMessages.DataExistsReply replySerialized =
209                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
210
211             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
212
213             // unserialized read
214             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
215
216             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
217
218             assertTrue(reply.exists());
219         }};
220     }
221
222     @Test
223     public void testOnReceiveDataExistsNegative() throws Exception {
224         new JavaTestKit(getSystem()) {{
225             final ActorRef shard = createShard();
226
227             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
228                     "testDataExistsNegativeRO"));
229
230             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
231                     "testDataExistsNegativeRW"));
232         }
233
234         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
235             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
236
237             ShardTransactionMessages.DataExistsReply replySerialized =
238                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
239
240             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
241
242             // unserialized read
243             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
244
245             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
246
247             assertFalse(reply.exists());
248         }};
249     }
250
251     @Test
252     public void testOnReceiveWriteData() {
253         new JavaTestKit(getSystem()) {{
254             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
255                     "testOnReceiveWriteData");
256
257             transaction.tell(new WriteData(TestModel.TEST_PATH,
258                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
259                         toSerializable(), getRef());
260
261             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
262
263             // unserialized write
264             transaction.tell(new WriteData(TestModel.TEST_PATH,
265                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
266                 getRef());
267
268             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
269         }};
270     }
271
272     @Test
273     public void testOnReceiveHeliumR1WriteData() {
274         new JavaTestKit(getSystem()) {{
275             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
276                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
277
278             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
279                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
280             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
281                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
282                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
283
284             transaction.tell(serialized, getRef());
285
286             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
287         }};
288     }
289
290     @Test
291     public void testOnReceiveMergeData() {
292         new JavaTestKit(getSystem()) {{
293             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
294                     "testMergeData");
295
296             transaction.tell(new MergeData(TestModel.TEST_PATH,
297                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
298                         toSerializable(), getRef());
299
300             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
301
302             //unserialized merge
303             transaction.tell(new MergeData(TestModel.TEST_PATH,
304                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
305                 getRef());
306
307             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
308         }};
309     }
310
311     @Test
312     public void testOnReceiveHeliumR1MergeData() throws Exception {
313         new JavaTestKit(getSystem()) {{
314             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
315                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
316
317             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
318                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
319             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
320                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
321                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
322
323             transaction.tell(serialized, getRef());
324
325             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
326         }};
327     }
328
329     @Test
330     public void testOnReceiveDeleteData() throws Exception {
331         new JavaTestKit(getSystem()) {{
332             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
333                     "testDeleteData");
334
335             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
336                     toSerializable(), getRef());
337
338             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
339
340             //unserialized
341             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
342
343             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
344         }};
345     }
346
347     @Test
348     public void testOnReceiveBatchedModifications() throws Exception {
349         new JavaTestKit(getSystem()) {{
350
351             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
352             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
353             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
354             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
355
356             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
357             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
358                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
359                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
360
361             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
362             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
363                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
364
365             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
366
367             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
368             batched.addModification(new WriteModification(writePath, writeData));
369             batched.addModification(new MergeModification(mergePath, mergeData));
370             batched.addModification(new DeleteModification(deletePath));
371
372             transaction.tell(batched, getRef());
373
374             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
375             assertEquals("getNumBatched", 3, reply.getNumBatched());
376
377             InOrder inOrder = Mockito.inOrder(mockModification);
378             inOrder.verify(mockModification).write(writePath, writeData);
379             inOrder.verify(mockModification).merge(mergePath, mergeData);
380             inOrder.verify(mockModification).delete(deletePath);
381         }};
382     }
383
384     @Test
385     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
386         new JavaTestKit(getSystem()) {{
387
388             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
389                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
390
391             JavaTestKit watcher = new JavaTestKit(getSystem());
392             watcher.watch(transaction);
393
394             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
395             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
396                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
397                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
398
399             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
400             batched.addModification(new WriteModification(writePath, writeData));
401
402             transaction.tell(batched, getRef());
403             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
404             assertEquals("getNumBatched", 1, reply.getNumBatched());
405
406             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
407             batched.setReady(true);
408             batched.setTotalMessagesSent(2);
409
410             transaction.tell(batched, getRef());
411             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
412             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
413         }};
414     }
415
416     @Test
417     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
418         new JavaTestKit(getSystem()) {{
419
420             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
421                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
422
423             JavaTestKit watcher = new JavaTestKit(getSystem());
424             watcher.watch(transaction);
425
426             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
427             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
428                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
429                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
430
431             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
432             batched.addModification(new WriteModification(writePath, writeData));
433             batched.setReady(true);
434             batched.setDoCommitOnReady(true);
435             batched.setTotalMessagesSent(1);
436
437             transaction.tell(batched, getRef());
438             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
439             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
440         }};
441     }
442
443     @Test(expected=TestException.class)
444     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
445         new JavaTestKit(getSystem()) {{
446
447             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
448             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
449             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
450             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
451                     "testOnReceiveBatchedModificationsFailure");
452
453             JavaTestKit watcher = new JavaTestKit(getSystem());
454             watcher.watch(transaction);
455
456             YangInstanceIdentifier path = TestModel.TEST_PATH;
457             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
458
459             doThrow(new TestException()).when(mockModification).write(path, node);
460
461             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
462             batched.addModification(new WriteModification(path, node));
463
464             transaction.tell(batched, getRef());
465             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
466
467             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
468             batched.setReady(true);
469             batched.setTotalMessagesSent(2);
470
471             transaction.tell(batched, getRef());
472             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
473             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
474
475             if(failure != null) {
476                 throw failure.cause();
477             }
478         }};
479     }
480
481     @Test(expected=IllegalStateException.class)
482     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
483         new JavaTestKit(getSystem()) {{
484
485             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
486                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
487
488             JavaTestKit watcher = new JavaTestKit(getSystem());
489             watcher.watch(transaction);
490
491             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
492             batched.setReady(true);
493             batched.setTotalMessagesSent(2);
494
495             transaction.tell(batched, getRef());
496
497             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
498             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
499
500             if(failure != null) {
501                 throw failure.cause();
502             }
503         }};
504     }
505
506     @Test
507     public void testOnReceiveCreateSnapshot() throws Exception {
508         new JavaTestKit(getSystem()) {{
509             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
510                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
511
512             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
513                     YangInstanceIdentifier.builder().build());
514
515             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
516                     "testOnReceiveCreateSnapshot");
517
518             watch(transaction);
519
520             transaction.tell(CreateSnapshot.INSTANCE, getRef());
521
522             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
523
524             assertNotNull("getSnapshot is null", reply.getSnapshot());
525
526             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
527                     reply.getSnapshot());
528
529             assertEquals("Root node", expectedRoot, actualRoot);
530
531             expectTerminated(duration("3 seconds"), transaction);
532         }};
533     }
534
535     @Test
536     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
537         new JavaTestKit(getSystem()) {{
538             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
539                     "testReadWriteTxOnReceiveCloseTransaction");
540
541             watch(transaction);
542
543             transaction.tell(new CloseTransaction().toSerializable(), getRef());
544
545             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
546             expectTerminated(duration("3 seconds"), transaction);
547         }};
548     }
549
550     @Test
551     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
552         new JavaTestKit(getSystem()) {{
553             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
554                     "testWriteTxOnReceiveCloseTransaction");
555
556             watch(transaction);
557
558             transaction.tell(new CloseTransaction().toSerializable(), getRef());
559
560             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
561             expectTerminated(duration("3 seconds"), transaction);
562         }};
563     }
564
565     @Test
566     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
567         new JavaTestKit(getSystem()) {{
568             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
569                     "testReadOnlyTxOnReceiveCloseTransaction");
570
571             watch(transaction);
572
573             transaction.tell(new CloseTransaction().toSerializable(), getRef());
574
575             expectMsgClass(duration("3 seconds"), Terminated.class);
576         }};
577     }
578
579     @Test(expected=UnknownMessageException.class)
580     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
581         final ActorRef shard = createShard();
582         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
583                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
584         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
585
586         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
587                 toSerializable(), ActorRef.noSender());
588     }
589
590     @Test
591     public void testShardTransactionInactivity() {
592
593         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
594                 500, TimeUnit.MILLISECONDS).build();
595
596         new JavaTestKit(getSystem()) {{
597             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
598                     "testShardTransactionInactivity");
599
600             watch(transaction);
601
602             expectMsgClass(duration("3 seconds"), Terminated.class);
603         }};
604     }
605
606     public static class TestException extends RuntimeException {
607         private static final long serialVersionUID = 1L;
608     }
609 }