Merge "Avoid IllegalArgument on missing source"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.mockito.InOrder;
18 import org.mockito.Mockito;
19 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
20 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
28 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
30 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
39 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
40 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
41 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.Modification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
46 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
47 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
49 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
50 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
51 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
57 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59
60 public class ShardTransactionTest extends AbstractActorTest {
61
62     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
63
64     private static final ShardIdentifier SHARD_IDENTIFIER =
65         ShardIdentifier.builder().memberName("member-1")
66             .shardName("inventory").type("config").build();
67
68     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
69
70     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
71
72     private final InMemoryDOMDataStore store =
73             new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
74
75     @Before
76     public void setup() {
77         store.onGlobalContextUpdated(testSchemaContext);
78     }
79
80     private ActorRef createShard(){
81         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
82             Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
83     }
84
85     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
86         return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
87     }
88
89     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
90         return newTransactionActor(transaction, null, name, version);
91     }
92
93     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
94         return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
95     }
96
97     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
98             short version) {
99         Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
100                 testSchemaContext, datastoreContext, shardStats, "txn", version);
101         return getSystem().actorOf(props, name);
102     }
103
104     @Test
105     public void testOnReceiveReadData() throws Exception {
106         new JavaTestKit(getSystem()) {{
107             final ActorRef shard = createShard();
108
109             testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
110
111             testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
112         }
113
114         private void testOnReceiveReadData(final ActorRef transaction) {
115             //serialized read
116             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
117                 getRef());
118
119             Object replySerialized =
120                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
121
122             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
123
124             // unserialized read
125             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
126
127             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
128
129             assertNotNull(reply.getNormalizedNode());
130         }};
131     }
132
133     @Test
134     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
135         new JavaTestKit(getSystem()) {{
136             final ActorRef shard = createShard();
137
138             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
139                     store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
140
141             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
142                     store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
143         }
144
145         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
146             // serialized read
147             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
148
149             Object replySerialized =
150                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
151
152             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
153
154             // unserialized read
155             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
156
157             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
158
159             assertTrue(reply.getNormalizedNode() == null);
160         }};
161     }
162
163     @Test
164     public void testOnReceiveReadDataHeliumR1() throws Exception {
165         new JavaTestKit(getSystem()) {{
166             ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
167                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
168
169             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
170                     getRef());
171
172             ShardTransactionMessages.ReadDataReply replySerialized =
173                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
174
175             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
176         }};
177     }
178
179     @Test
180     public void testOnReceiveDataExistsPositive() throws Exception {
181         new JavaTestKit(getSystem()) {{
182             final ActorRef shard = createShard();
183
184             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
185                     "testDataExistsPositiveRO"));
186
187             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
188                     "testDataExistsPositiveRW"));
189         }
190
191         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
192             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
193                 getRef());
194
195             ShardTransactionMessages.DataExistsReply replySerialized =
196                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
197
198             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
199
200             // unserialized read
201             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
202
203             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
204
205             assertTrue(reply.exists());
206         }};
207     }
208
209     @Test
210     public void testOnReceiveDataExistsNegative() throws Exception {
211         new JavaTestKit(getSystem()) {{
212             final ActorRef shard = createShard();
213
214             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
215                     "testDataExistsNegativeRO"));
216
217             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
218                     "testDataExistsNegativeRW"));
219         }
220
221         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
222             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
223
224             ShardTransactionMessages.DataExistsReply replySerialized =
225                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
226
227             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
228
229             // unserialized read
230             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
231
232             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
233
234             assertFalse(reply.exists());
235         }};
236     }
237
238     private void assertModification(final ActorRef subject,
239         final Class<? extends Modification> modificationType) {
240         new JavaTestKit(getSystem()) {{
241             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
242
243             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
244                     GetCompositeModificationReply.class).getModification();
245
246             assertTrue(compositeModification.getModifications().size() == 1);
247             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
248         }};
249     }
250
251     @Test
252     public void testOnReceiveWriteData() throws Exception {
253         new JavaTestKit(getSystem()) {{
254             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
255                     "testOnReceiveWriteData");
256
257             transaction.tell(new WriteData(TestModel.TEST_PATH,
258                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
259                         toSerializable(), getRef());
260
261             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
262
263             assertModification(transaction, WriteModification.class);
264
265             // unserialized write
266             transaction.tell(new WriteData(TestModel.TEST_PATH,
267                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
268                 getRef());
269
270             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
271         }};
272     }
273
274     @Test
275     public void testOnReceiveHeliumR1WriteData() throws Exception {
276         new JavaTestKit(getSystem()) {{
277             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
278                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
279
280             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
281                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
282             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
283                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
284                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
285
286             transaction.tell(serialized, getRef());
287
288             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
289
290             assertModification(transaction, WriteModification.class);
291         }};
292     }
293
294     @Test
295     public void testOnReceiveMergeData() throws Exception {
296         new JavaTestKit(getSystem()) {{
297             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
298                     "testMergeData");
299
300             transaction.tell(new MergeData(TestModel.TEST_PATH,
301                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
302                         toSerializable(), getRef());
303
304             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
305
306             assertModification(transaction, MergeModification.class);
307
308             //unserialized merge
309             transaction.tell(new MergeData(TestModel.TEST_PATH,
310                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
311                 getRef());
312
313             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
314         }};
315     }
316
317     @Test
318     public void testOnReceiveHeliumR1MergeData() throws Exception {
319         new JavaTestKit(getSystem()) {{
320             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
321                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
322
323             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
324                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
325             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
326                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
327                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
328
329             transaction.tell(serialized, getRef());
330
331             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
332
333             assertModification(transaction, MergeModification.class);
334         }};
335     }
336
337     @Test
338     public void testOnReceiveDeleteData() throws Exception {
339         new JavaTestKit(getSystem()) {{
340             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
341                     "testDeleteData");
342
343             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
344                     toSerializable(), getRef());
345
346             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
347
348             assertModification(transaction, DeleteModification.class);
349
350             //unserialized
351             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
352
353             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
354         }};
355     }
356
357     @Test
358     public void testOnReceiveBatchedModifications() throws Exception {
359         new JavaTestKit(getSystem()) {{
360
361             DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
362             final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
363
364             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
365             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
366                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
367                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
368
369             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
370             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
371                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
372
373             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
374
375             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
376             batched.addModification(new WriteModification(writePath, writeData));
377             batched.addModification(new MergeModification(mergePath, mergeData));
378             batched.addModification(new DeleteModification(deletePath));
379
380             transaction.tell(batched, getRef());
381
382             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
383             assertEquals("getNumBatched", 3, reply.getNumBatched());
384
385             JavaTestKit verification = new JavaTestKit(getSystem());
386             transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
387
388             CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
389                         GetCompositeModificationReply.class).getModification();
390
391             assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
392
393             WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
394             assertEquals("getPath", writePath, write.getPath());
395             assertEquals("getData", writeData, write.getData());
396
397             MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
398             assertEquals("getPath", mergePath, merge.getPath());
399             assertEquals("getData", mergeData, merge.getData());
400
401             DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
402             assertEquals("getPath", deletePath, delete.getPath());
403
404             InOrder inOrder = Mockito.inOrder(mockWriteTx);
405             inOrder.verify(mockWriteTx).write(writePath, writeData);
406             inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
407             inOrder.verify(mockWriteTx).delete(deletePath);
408         }};
409     }
410
411     @Test
412     public void testOnReceiveReadyTransaction() throws Exception {
413         new JavaTestKit(getSystem()) {{
414             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
415                     "testReadyTransaction");
416
417             watch(transaction);
418
419             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
420
421             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
422                     Terminated.class);
423             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
424                     Terminated.class);
425         }};
426
427         // test
428         new JavaTestKit(getSystem()) {{
429             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
430                     "testReadyTransaction2");
431
432             watch(transaction);
433
434             transaction.tell(new ReadyTransaction(), getRef());
435
436             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
437                     Terminated.class);
438             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
439                     Terminated.class);
440         }};
441     }
442
443     @Test
444     public void testOnReceiveCreateSnapshot() throws Exception {
445         new JavaTestKit(getSystem()) {{
446             ShardTest.writeToStore(store, TestModel.TEST_PATH,
447                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
448
449             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
450                     YangInstanceIdentifier.builder().build());
451
452             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
453                     "testOnReceiveCreateSnapshot");
454
455             watch(transaction);
456
457             transaction.tell(CreateSnapshot.INSTANCE, getRef());
458
459             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
460
461             assertNotNull("getSnapshot is null", reply.getSnapshot());
462
463             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
464                     reply.getSnapshot());
465
466             assertEquals("Root node", expectedRoot, actualRoot);
467
468             expectTerminated(duration("3 seconds"), transaction);
469         }};
470     }
471
472     @Test
473     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
474         new JavaTestKit(getSystem()) {{
475             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
476                     "testReadWriteTxOnReceiveCloseTransaction");
477
478             watch(transaction);
479
480             transaction.tell(new CloseTransaction().toSerializable(), getRef());
481
482             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
483             expectTerminated(duration("3 seconds"), transaction);
484         }};
485     }
486
487     @Test
488     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
489         new JavaTestKit(getSystem()) {{
490             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
491                     "testWriteTxOnReceiveCloseTransaction");
492
493             watch(transaction);
494
495             transaction.tell(new CloseTransaction().toSerializable(), getRef());
496
497             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
498             expectTerminated(duration("3 seconds"), transaction);
499         }};
500     }
501
502     @Test
503     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
504         new JavaTestKit(getSystem()) {{
505             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
506                     "testReadOnlyTxOnReceiveCloseTransaction");
507
508             watch(transaction);
509
510             transaction.tell(new CloseTransaction().toSerializable(), getRef());
511
512             expectMsgClass(duration("3 seconds"), Terminated.class);
513         }};
514     }
515
516     @Test(expected=UnknownMessageException.class)
517     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
518         final ActorRef shard = createShard();
519         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
520                 testSchemaContext, datastoreContext, shardStats, "txn",
521                 DataStoreVersions.CURRENT_VERSION);
522         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
523
524         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
525                 toSerializable(), ActorRef.noSender());
526     }
527
528     @Test
529     public void testShardTransactionInactivity() {
530
531         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
532                 500, TimeUnit.MILLISECONDS).build();
533
534         new JavaTestKit(getSystem()) {{
535             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
536                     "testShardTransactionInactivity");
537
538             watch(transaction);
539
540             expectMsgClass(duration("3 seconds"), Terminated.class);
541         }};
542     }
543 }