Merge "Remove isCloseMsg check for each rpc"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import static org.mockito.Mockito.doThrow;
8 import akka.actor.ActorRef;
9 import akka.actor.Props;
10 import akka.actor.Status.Failure;
11 import akka.actor.Terminated;
12 import akka.testkit.JavaTestKit;
13 import akka.testkit.TestActorRef;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.Collections;
16 import java.util.concurrent.TimeUnit;
17 import org.junit.Before;
18 import org.junit.Test;
19 import org.mockito.InOrder;
20 import org.mockito.Mockito;
21 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
22 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
23 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
42 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
44 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
45 import org.opendaylight.controller.cluster.datastore.modification.Modification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
48 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
49 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
50 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
51 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
52 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
53 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
54 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
59 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
60 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
61 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
62
63 public class ShardTransactionTest extends AbstractActorTest {
64
65     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
66
67     private static final ShardIdentifier SHARD_IDENTIFIER =
68         ShardIdentifier.builder().memberName("member-1")
69             .shardName("inventory").type("config").build();
70
71     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
72
73     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
74
75     private final InMemoryDOMDataStore store =
76             new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
77
78     @Before
79     public void setup() {
80         store.onGlobalContextUpdated(testSchemaContext);
81     }
82
83     private ActorRef createShard(){
84         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
85             Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
86     }
87
88     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
89         return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
90     }
91
92     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
93         return newTransactionActor(transaction, null, name, version);
94     }
95
96     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
97         return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
98     }
99
100     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
101             short version) {
102         Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
103                 datastoreContext, shardStats, "txn", version);
104         return getSystem().actorOf(props, name);
105     }
106
107     @Test
108     public void testOnReceiveReadData() throws Exception {
109         new JavaTestKit(getSystem()) {{
110             final ActorRef shard = createShard();
111
112             testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
113
114             testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
115         }
116
117         private void testOnReceiveReadData(final ActorRef transaction) {
118             //serialized read
119             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
120                 getRef());
121
122             Object replySerialized =
123                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
124
125             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
126
127             // unserialized read
128             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
129
130             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
131
132             assertNotNull(reply.getNormalizedNode());
133         }};
134     }
135
136     @Test
137     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
138         new JavaTestKit(getSystem()) {{
139             final ActorRef shard = createShard();
140
141             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
142                     store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
143
144             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
145                     store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
146         }
147
148         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
149             // serialized read
150             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
151
152             Object replySerialized =
153                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
154
155             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
156
157             // unserialized read
158             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
159
160             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
161
162             assertTrue(reply.getNormalizedNode() == null);
163         }};
164     }
165
166     @Test
167     public void testOnReceiveReadDataHeliumR1() throws Exception {
168         new JavaTestKit(getSystem()) {{
169             ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
170                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
171
172             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
173                     getRef());
174
175             ShardTransactionMessages.ReadDataReply replySerialized =
176                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
177
178             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
179         }};
180     }
181
182     @Test
183     public void testOnReceiveDataExistsPositive() throws Exception {
184         new JavaTestKit(getSystem()) {{
185             final ActorRef shard = createShard();
186
187             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
188                     "testDataExistsPositiveRO"));
189
190             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
191                     "testDataExistsPositiveRW"));
192         }
193
194         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
195             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
196                 getRef());
197
198             ShardTransactionMessages.DataExistsReply replySerialized =
199                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
200
201             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
202
203             // unserialized read
204             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
205
206             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
207
208             assertTrue(reply.exists());
209         }};
210     }
211
212     @Test
213     public void testOnReceiveDataExistsNegative() throws Exception {
214         new JavaTestKit(getSystem()) {{
215             final ActorRef shard = createShard();
216
217             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
218                     "testDataExistsNegativeRO"));
219
220             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
221                     "testDataExistsNegativeRW"));
222         }
223
224         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
225             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
226
227             ShardTransactionMessages.DataExistsReply replySerialized =
228                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
229
230             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
231
232             // unserialized read
233             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
234
235             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
236
237             assertFalse(reply.exists());
238         }};
239     }
240
241     private void assertModification(final ActorRef subject,
242         final Class<? extends Modification> modificationType) {
243         new JavaTestKit(getSystem()) {{
244             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
245
246             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
247                     GetCompositeModificationReply.class).getModification();
248
249             assertTrue(compositeModification.getModifications().size() == 1);
250             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
251         }};
252     }
253
254     @Test
255     public void testOnReceiveWriteData() throws Exception {
256         new JavaTestKit(getSystem()) {{
257             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
258                     "testOnReceiveWriteData");
259
260             transaction.tell(new WriteData(TestModel.TEST_PATH,
261                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
262                         toSerializable(), getRef());
263
264             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
265
266             assertModification(transaction, WriteModification.class);
267
268             // unserialized write
269             transaction.tell(new WriteData(TestModel.TEST_PATH,
270                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
271                 getRef());
272
273             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
274         }};
275     }
276
277     @Test
278     public void testOnReceiveHeliumR1WriteData() throws Exception {
279         new JavaTestKit(getSystem()) {{
280             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
281                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
282
283             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
284                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
285             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
286                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
287                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
288
289             transaction.tell(serialized, getRef());
290
291             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
292
293             assertModification(transaction, WriteModification.class);
294         }};
295     }
296
297     @Test
298     public void testOnReceiveMergeData() throws Exception {
299         new JavaTestKit(getSystem()) {{
300             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
301                     "testMergeData");
302
303             transaction.tell(new MergeData(TestModel.TEST_PATH,
304                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
305                         toSerializable(), getRef());
306
307             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
308
309             assertModification(transaction, MergeModification.class);
310
311             //unserialized merge
312             transaction.tell(new MergeData(TestModel.TEST_PATH,
313                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
314                 getRef());
315
316             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
317         }};
318     }
319
320     @Test
321     public void testOnReceiveHeliumR1MergeData() throws Exception {
322         new JavaTestKit(getSystem()) {{
323             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
324                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
325
326             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
327                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
328             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
329                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
330                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
331
332             transaction.tell(serialized, getRef());
333
334             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
335
336             assertModification(transaction, MergeModification.class);
337         }};
338     }
339
340     @Test
341     public void testOnReceiveDeleteData() throws Exception {
342         new JavaTestKit(getSystem()) {{
343             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
344                     "testDeleteData");
345
346             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
347                     toSerializable(), getRef());
348
349             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
350
351             assertModification(transaction, DeleteModification.class);
352
353             //unserialized
354             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
355
356             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
357         }};
358     }
359
360     @Test
361     public void testOnReceiveBatchedModifications() throws Exception {
362         new JavaTestKit(getSystem()) {{
363
364             DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
365             final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
366
367             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
368             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
369                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
370                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
371
372             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
373             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
374                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
375
376             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
377
378             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
379             batched.addModification(new WriteModification(writePath, writeData));
380             batched.addModification(new MergeModification(mergePath, mergeData));
381             batched.addModification(new DeleteModification(deletePath));
382
383             transaction.tell(batched, getRef());
384
385             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
386             assertEquals("getNumBatched", 3, reply.getNumBatched());
387
388             JavaTestKit verification = new JavaTestKit(getSystem());
389             transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
390
391             CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
392                         GetCompositeModificationReply.class).getModification();
393
394             assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
395
396             WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
397             assertEquals("getPath", writePath, write.getPath());
398             assertEquals("getData", writeData, write.getData());
399
400             MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
401             assertEquals("getPath", mergePath, merge.getPath());
402             assertEquals("getData", mergeData, merge.getData());
403
404             DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
405             assertEquals("getPath", deletePath, delete.getPath());
406
407             InOrder inOrder = Mockito.inOrder(mockWriteTx);
408             inOrder.verify(mockWriteTx).write(writePath, writeData);
409             inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
410             inOrder.verify(mockWriteTx).delete(deletePath);
411         }};
412     }
413
414     @Test
415     public void testOnReceiveBatchedModificationsReady() throws Exception {
416         new JavaTestKit(getSystem()) {{
417
418             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
419                     "testOnReceiveBatchedModificationsReady");
420
421             JavaTestKit watcher = new JavaTestKit(getSystem());
422             watcher.watch(transaction);
423
424             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
425             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
426                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
427                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
428
429             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
430             batched.addModification(new WriteModification(writePath, writeData));
431
432             transaction.tell(batched, getRef());
433             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
434             assertEquals("getNumBatched", 1, reply.getNumBatched());
435
436             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
437             batched.setReady(true);
438             batched.setTotalMessagesSent(2);
439
440             transaction.tell(batched, getRef());
441             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
442             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
443         }};
444     }
445
446     @Test(expected=TestException.class)
447     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
448         new JavaTestKit(getSystem()) {{
449
450             DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
451             final ActorRef transaction = newTransactionActor(mockWriteTx,
452                     "testOnReceiveBatchedModificationsFailure");
453
454             JavaTestKit watcher = new JavaTestKit(getSystem());
455             watcher.watch(transaction);
456
457             YangInstanceIdentifier path = TestModel.TEST_PATH;
458             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
459
460             doThrow(new TestException()).when(mockWriteTx).write(path, node);
461
462             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
463             batched.addModification(new WriteModification(path, node));
464
465             transaction.tell(batched, getRef());
466             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
467
468             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
469             batched.setReady(true);
470             batched.setTotalMessagesSent(2);
471
472             transaction.tell(batched, getRef());
473             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
474             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
475
476             if(failure != null) {
477                 throw failure.cause();
478             }
479         }};
480     }
481
482     @Test(expected=IllegalStateException.class)
483     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
484         new JavaTestKit(getSystem()) {{
485
486             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
487                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
488
489             JavaTestKit watcher = new JavaTestKit(getSystem());
490             watcher.watch(transaction);
491
492             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
493             batched.setReady(true);
494             batched.setTotalMessagesSent(2);
495
496             transaction.tell(batched, getRef());
497
498             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
499             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
500
501             if(failure != null) {
502                 throw failure.cause();
503             }
504         }};
505     }
506
507     @Test
508     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
509         new JavaTestKit(getSystem()) {{
510             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
511                     "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
512
513             JavaTestKit watcher = new JavaTestKit(getSystem());
514             watcher.watch(transaction);
515
516             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
517
518             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
519             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
520         }};
521
522         // test
523         new JavaTestKit(getSystem()) {{
524             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
525                     "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
526
527             JavaTestKit watcher = new JavaTestKit(getSystem());
528             watcher.watch(transaction);
529
530             transaction.tell(new ReadyTransaction(), getRef());
531
532             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
533             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
534         }};
535     }
536
537     @Test
538     public void testOnReceiveCreateSnapshot() throws Exception {
539         new JavaTestKit(getSystem()) {{
540             ShardTest.writeToStore(store, TestModel.TEST_PATH,
541                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
542
543             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
544                     YangInstanceIdentifier.builder().build());
545
546             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
547                     "testOnReceiveCreateSnapshot");
548
549             watch(transaction);
550
551             transaction.tell(CreateSnapshot.INSTANCE, getRef());
552
553             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
554
555             assertNotNull("getSnapshot is null", reply.getSnapshot());
556
557             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
558                     reply.getSnapshot());
559
560             assertEquals("Root node", expectedRoot, actualRoot);
561
562             expectTerminated(duration("3 seconds"), transaction);
563         }};
564     }
565
566     @Test
567     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
568         new JavaTestKit(getSystem()) {{
569             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
570                     "testReadWriteTxOnReceiveCloseTransaction");
571
572             watch(transaction);
573
574             transaction.tell(new CloseTransaction().toSerializable(), getRef());
575
576             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
577             expectTerminated(duration("3 seconds"), transaction);
578         }};
579     }
580
581     @Test
582     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
583         new JavaTestKit(getSystem()) {{
584             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
585                     "testWriteTxOnReceiveCloseTransaction");
586
587             watch(transaction);
588
589             transaction.tell(new CloseTransaction().toSerializable(), getRef());
590
591             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
592             expectTerminated(duration("3 seconds"), transaction);
593         }};
594     }
595
596     @Test
597     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
598         new JavaTestKit(getSystem()) {{
599             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
600                     "testReadOnlyTxOnReceiveCloseTransaction");
601
602             watch(transaction);
603
604             transaction.tell(new CloseTransaction().toSerializable(), getRef());
605
606             expectMsgClass(duration("3 seconds"), Terminated.class);
607         }};
608     }
609
610     @Test(expected=UnknownMessageException.class)
611     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
612         final ActorRef shard = createShard();
613         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
614                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
615         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
616
617         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
618                 toSerializable(), ActorRef.noSender());
619     }
620
621     @Test
622     public void testShardTransactionInactivity() {
623
624         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
625                 500, TimeUnit.MILLISECONDS).build();
626
627         new JavaTestKit(getSystem()) {{
628             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
629                     "testShardTransactionInactivity");
630
631             watch(transaction);
632
633             expectMsgClass(duration("3 seconds"), Terminated.class);
634         }};
635     }
636
637     public static class TestException extends RuntimeException {
638         private static final long serialVersionUID = 1L;
639     }
640 }