9715f668e353fe71d527865d2ffd68a4759952b5
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.mockito.InOrder;
18 import org.mockito.Mockito;
19 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
20 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
28 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
30 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
39 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
40 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
41 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.Modification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
46 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
47 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
49 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
50 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
51 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
57 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59
60 public class ShardTransactionTest extends AbstractActorTest {
61
62     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
63
64     private static final ShardIdentifier SHARD_IDENTIFIER =
65         ShardIdentifier.builder().memberName("member-1")
66             .shardName("inventory").type("config").build();
67
68     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
69
70     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
71
72     private final InMemoryDOMDataStore store =
73             new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
74
75     @Before
76     public void setup() {
77         store.onGlobalContextUpdated(testSchemaContext);
78     }
79
80     private ActorRef createShard(){
81         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
82             Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
83     }
84
85     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
86         return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
87     }
88
89     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
90         return newTransactionActor(transaction, null, name, version);
91     }
92
93     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
94         return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
95     }
96
97     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
98             short version) {
99         Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
100                 testSchemaContext, datastoreContext, shardStats, "txn", version);
101         return getSystem().actorOf(props, name);
102     }
103
104     @Test
105     public void testOnReceiveReadData() throws Exception {
106         new JavaTestKit(getSystem()) {{
107             final ActorRef shard = createShard();
108
109             testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
110
111             testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
112         }
113
114         private void testOnReceiveReadData(final ActorRef transaction) {
115             //serialized read
116             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
117                 getRef());
118
119             Object replySerialized =
120                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
121
122             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
123
124             // unserialized read
125             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
126
127             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
128
129             assertNotNull(reply.getNormalizedNode());
130         }};
131     }
132
133     @Test
134     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
135         new JavaTestKit(getSystem()) {{
136             final ActorRef shard = createShard();
137
138             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
139                     store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
140
141             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
142                     store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
143         }
144
145         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
146             // serialized read
147             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
148
149             Object replySerialized =
150                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
151
152             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
153
154             // unserialized read
155             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
156
157             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
158
159             assertTrue(reply.getNormalizedNode() == null);
160         }};
161     }
162
163     @Test
164     public void testOnReceiveReadDataHeliumR1() throws Exception {
165         new JavaTestKit(getSystem()) {{
166             ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
167                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
168
169             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
170                     getRef());
171
172             ShardTransactionMessages.ReadDataReply replySerialized =
173                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
174
175             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
176         }};
177     }
178
179     @Test
180     public void testOnReceiveDataExistsPositive() throws Exception {
181         new JavaTestKit(getSystem()) {{
182             final ActorRef shard = createShard();
183
184             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
185                     "testDataExistsPositiveRO"));
186
187             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
188                     "testDataExistsPositiveRW"));
189         }
190
191         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
192             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
193                 getRef());
194
195             ShardTransactionMessages.DataExistsReply replySerialized =
196                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
197
198             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
199
200             // unserialized read
201             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
202
203             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
204
205             assertTrue(reply.exists());
206         }};
207     }
208
209     @Test
210     public void testOnReceiveDataExistsNegative() throws Exception {
211         new JavaTestKit(getSystem()) {{
212             final ActorRef shard = createShard();
213
214             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
215                     "testDataExistsNegativeRO"));
216
217             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
218                     "testDataExistsNegativeRW"));
219         }
220
221         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
222             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
223
224             ShardTransactionMessages.DataExistsReply replySerialized =
225                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
226
227             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
228
229             // unserialized read
230             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
231
232             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
233
234             assertFalse(reply.exists());
235         }};
236     }
237
238     private void assertModification(final ActorRef subject,
239         final Class<? extends Modification> modificationType) {
240         new JavaTestKit(getSystem()) {{
241             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
242
243             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
244                     GetCompositeModificationReply.class).getModification();
245
246             assertTrue(compositeModification.getModifications().size() == 1);
247             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
248         }};
249     }
250
251     @Test
252     public void testOnReceiveWriteData() throws Exception {
253         new JavaTestKit(getSystem()) {{
254             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
255                     "testOnReceiveWriteData");
256
257             transaction.tell(new WriteData(TestModel.TEST_PATH,
258                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
259                         toSerializable(), getRef());
260
261             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
262
263             assertModification(transaction, WriteModification.class);
264
265             // unserialized write
266             transaction.tell(new WriteData(TestModel.TEST_PATH,
267                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
268                 getRef());
269
270             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
271         }};
272     }
273
274     @Test
275     public void testOnReceiveHeliumR1WriteData() throws Exception {
276         new JavaTestKit(getSystem()) {{
277             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
278                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
279
280             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
281                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
282             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
283                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
284                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
285
286             transaction.tell(serialized, getRef());
287
288             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
289
290             assertModification(transaction, WriteModification.class);
291         }};
292     }
293
294     @Test
295     public void testOnReceiveMergeData() throws Exception {
296         new JavaTestKit(getSystem()) {{
297             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
298                     "testMergeData");
299
300             transaction.tell(new MergeData(TestModel.TEST_PATH,
301                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
302                         toSerializable(), getRef());
303
304             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
305
306             assertModification(transaction, MergeModification.class);
307
308             //unserialized merge
309             transaction.tell(new MergeData(TestModel.TEST_PATH,
310                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
311                 getRef());
312
313             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
314         }};
315     }
316
317     @Test
318     public void testOnReceiveHeliumR1MergeData() throws Exception {
319         new JavaTestKit(getSystem()) {{
320             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
321                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
322
323             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
324                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
325             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
326                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
327                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
328
329             transaction.tell(serialized, getRef());
330
331             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
332
333             assertModification(transaction, MergeModification.class);
334         }};
335     }
336
337     @Test
338     public void testOnReceiveDeleteData() throws Exception {
339         new JavaTestKit(getSystem()) {{
340             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
341                     "testDeleteData");
342
343             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
344                     toSerializable(), getRef());
345
346             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
347
348             assertModification(transaction, DeleteModification.class);
349
350             //unserialized
351             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
352
353             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
354         }};
355     }
356
357     @Test
358     public void testOnReceiveBatchedModifications() throws Exception {
359         new JavaTestKit(getSystem()) {{
360
361             DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
362             final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
363
364             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
365             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
366                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
367                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
368
369             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
370             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
371                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
372
373             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
374
375             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
376             batched.addModification(new WriteModification(writePath, writeData));
377             batched.addModification(new MergeModification(mergePath, mergeData));
378             batched.addModification(new DeleteModification(deletePath));
379
380             transaction.tell(batched, getRef());
381
382             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
383             assertEquals("getNumBatched", 3, reply.getNumBatched());
384
385             JavaTestKit verification = new JavaTestKit(getSystem());
386             transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
387
388             CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
389                         GetCompositeModificationReply.class).getModification();
390
391             assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
392
393             WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
394             assertEquals("getPath", writePath, write.getPath());
395             assertEquals("getData", writeData, write.getData());
396
397             MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
398             assertEquals("getPath", mergePath, merge.getPath());
399             assertEquals("getData", mergeData, merge.getData());
400
401             DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
402             assertEquals("getPath", deletePath, delete.getPath());
403
404             InOrder inOrder = Mockito.inOrder(mockWriteTx);
405             inOrder.verify(mockWriteTx).write(writePath, writeData);
406             inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
407             inOrder.verify(mockWriteTx).delete(deletePath);
408         }};
409     }
410
411     @Test
412     public void testOnReceiveBatchedModificationsReady() throws Exception {
413         new JavaTestKit(getSystem()) {{
414
415             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
416                     "testOnReceiveBatchedModificationsReady");
417
418             JavaTestKit watcher = new JavaTestKit(getSystem());
419             watcher.watch(transaction);
420
421             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
422             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
423                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
424                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
425
426             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
427             batched.setReady(true);
428             batched.addModification(new WriteModification(writePath, writeData));
429
430             transaction.tell(batched, getRef());
431
432             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
433             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
434         }};
435     }
436
437     @Test
438     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
439         new JavaTestKit(getSystem()) {{
440             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
441                     "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
442
443             JavaTestKit watcher = new JavaTestKit(getSystem());
444             watcher.watch(transaction);
445
446             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
447
448             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
449             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
450         }};
451
452         // test
453         new JavaTestKit(getSystem()) {{
454             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
455                     "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
456
457             JavaTestKit watcher = new JavaTestKit(getSystem());
458             watcher.watch(transaction);
459
460             transaction.tell(new ReadyTransaction(), getRef());
461
462             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
463             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
464         }};
465     }
466
467     @Test
468     public void testOnReceiveCreateSnapshot() throws Exception {
469         new JavaTestKit(getSystem()) {{
470             ShardTest.writeToStore(store, TestModel.TEST_PATH,
471                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
472
473             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
474                     YangInstanceIdentifier.builder().build());
475
476             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
477                     "testOnReceiveCreateSnapshot");
478
479             watch(transaction);
480
481             transaction.tell(CreateSnapshot.INSTANCE, getRef());
482
483             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
484
485             assertNotNull("getSnapshot is null", reply.getSnapshot());
486
487             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
488                     reply.getSnapshot());
489
490             assertEquals("Root node", expectedRoot, actualRoot);
491
492             expectTerminated(duration("3 seconds"), transaction);
493         }};
494     }
495
496     @Test
497     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
498         new JavaTestKit(getSystem()) {{
499             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
500                     "testReadWriteTxOnReceiveCloseTransaction");
501
502             watch(transaction);
503
504             transaction.tell(new CloseTransaction().toSerializable(), getRef());
505
506             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
507             expectTerminated(duration("3 seconds"), transaction);
508         }};
509     }
510
511     @Test
512     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
513         new JavaTestKit(getSystem()) {{
514             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
515                     "testWriteTxOnReceiveCloseTransaction");
516
517             watch(transaction);
518
519             transaction.tell(new CloseTransaction().toSerializable(), getRef());
520
521             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
522             expectTerminated(duration("3 seconds"), transaction);
523         }};
524     }
525
526     @Test
527     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
528         new JavaTestKit(getSystem()) {{
529             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
530                     "testReadOnlyTxOnReceiveCloseTransaction");
531
532             watch(transaction);
533
534             transaction.tell(new CloseTransaction().toSerializable(), getRef());
535
536             expectMsgClass(duration("3 seconds"), Terminated.class);
537         }};
538     }
539
540     @Test(expected=UnknownMessageException.class)
541     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
542         final ActorRef shard = createShard();
543         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
544                 testSchemaContext, datastoreContext, shardStats, "txn",
545                 DataStoreVersions.CURRENT_VERSION);
546         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
547
548         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
549                 toSerializable(), ActorRef.noSender());
550     }
551
552     @Test
553     public void testShardTransactionInactivity() {
554
555         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
556                 500, TimeUnit.MILLISECONDS).build();
557
558         new JavaTestKit(getSystem()) {{
559             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
560                     "testShardTransactionInactivity");
561
562             watch(transaction);
563
564             expectMsgClass(duration("3 seconds"), Terminated.class);
565         }};
566     }
567 }