Merge "Bug 2597: Batch modification operations in TransactionProxy"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.mockito.InOrder;
18 import org.mockito.Mockito;
19 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
20 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
28 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
30 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
39 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
40 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
41 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.Modification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
46 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
47 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
48 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
49 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
50 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
51 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
56 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
57 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59 import scala.concurrent.duration.Duration;
60
61 public class ShardTransactionTest extends AbstractActorTest {
62
63     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
64
65     private static final ShardIdentifier SHARD_IDENTIFIER =
66         ShardIdentifier.builder().memberName("member-1")
67             .shardName("inventory").type("config").build();
68
69     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
70
71     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
72
73     private final InMemoryDOMDataStore store =
74             new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
75
76     @Before
77     public void setup() {
78         store.onGlobalContextUpdated(testSchemaContext);
79     }
80
81     private ActorRef createShard(){
82         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
83             Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
84     }
85
86     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
87         return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
88     }
89
90     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
91         return newTransactionActor(transaction, null, name, version);
92     }
93
94     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
95         return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
96     }
97
98     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
99             short version) {
100         Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
101                 testSchemaContext, datastoreContext, shardStats, "txn", version);
102         return getSystem().actorOf(props, name);
103     }
104
105     @Test
106     public void testOnReceiveReadData() throws Exception {
107         new JavaTestKit(getSystem()) {{
108             final ActorRef shard = createShard();
109
110             testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
111
112             testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
113         }
114
115         private void testOnReceiveReadData(final ActorRef transaction) {
116             //serialized read
117             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
118                 getRef());
119
120             Object replySerialized =
121                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
122
123             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
124
125             // unserialized read
126             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
127
128             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
129
130             assertNotNull(reply.getNormalizedNode());
131         }};
132     }
133
134     @Test
135     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
136         new JavaTestKit(getSystem()) {{
137             final ActorRef shard = createShard();
138
139             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
140                     store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
141
142             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
143                     store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
144         }
145
146         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
147             // serialized read
148             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
149
150             Object replySerialized =
151                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
152
153             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
154
155             // unserialized read
156             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
157
158             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
159
160             assertTrue(reply.getNormalizedNode() == null);
161         }};
162     }
163
164     @Test
165     public void testOnReceiveReadDataHeliumR1() throws Exception {
166         new JavaTestKit(getSystem()) {{
167             ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
168                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
169
170             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
171                     getRef());
172
173             ShardTransactionMessages.ReadDataReply replySerialized =
174                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
175
176             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
177         }};
178     }
179
180     @Test
181     public void testOnReceiveDataExistsPositive() throws Exception {
182         new JavaTestKit(getSystem()) {{
183             final ActorRef shard = createShard();
184
185             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
186                     "testDataExistsPositiveRO"));
187
188             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
189                     "testDataExistsPositiveRW"));
190         }
191
192         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
193             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
194                 getRef());
195
196             ShardTransactionMessages.DataExistsReply replySerialized =
197                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
198
199             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
200
201             // unserialized read
202             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
203
204             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
205
206             assertTrue(reply.exists());
207         }};
208     }
209
210     @Test
211     public void testOnReceiveDataExistsNegative() throws Exception {
212         new JavaTestKit(getSystem()) {{
213             final ActorRef shard = createShard();
214
215             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
216                     "testDataExistsNegativeRO"));
217
218             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
219                     "testDataExistsNegativeRW"));
220         }
221
222         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
223             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
224
225             ShardTransactionMessages.DataExistsReply replySerialized =
226                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
227
228             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
229
230             // unserialized read
231             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
232
233             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
234
235             assertFalse(reply.exists());
236         }};
237     }
238
239     private void assertModification(final ActorRef subject,
240         final Class<? extends Modification> modificationType) {
241         new JavaTestKit(getSystem()) {{
242             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
243
244             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
245                     GetCompositeModificationReply.class).getModification();
246
247             assertTrue(compositeModification.getModifications().size() == 1);
248             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
249         }};
250     }
251
252     @Test
253     public void testOnReceiveWriteData() throws Exception {
254         new JavaTestKit(getSystem()) {{
255             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
256                     "testOnReceiveWriteData");
257
258             transaction.tell(new WriteData(TestModel.TEST_PATH,
259                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
260                         toSerializable(), getRef());
261
262             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
263
264             assertModification(transaction, WriteModification.class);
265
266             // unserialized write
267             transaction.tell(new WriteData(TestModel.TEST_PATH,
268                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
269                 getRef());
270
271             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
272         }};
273     }
274
275     @Test
276     public void testOnReceiveHeliumR1WriteData() throws Exception {
277         new JavaTestKit(getSystem()) {{
278             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
279                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
280
281             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
282                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
283             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
284                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
285                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
286
287             transaction.tell(serialized, getRef());
288
289             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
290
291             assertModification(transaction, WriteModification.class);
292         }};
293     }
294
295     @Test
296     public void testOnReceiveMergeData() throws Exception {
297         new JavaTestKit(getSystem()) {{
298             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
299                     "testMergeData");
300
301             transaction.tell(new MergeData(TestModel.TEST_PATH,
302                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
303                         toSerializable(), getRef());
304
305             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
306
307             assertModification(transaction, MergeModification.class);
308
309             //unserialized merge
310             transaction.tell(new MergeData(TestModel.TEST_PATH,
311                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
312                 getRef());
313
314             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
315         }};
316     }
317
318     @Test
319     public void testOnReceiveHeliumR1MergeData() throws Exception {
320         new JavaTestKit(getSystem()) {{
321             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
322                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
323
324             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
325                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
326             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
327                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
328                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
329
330             transaction.tell(serialized, getRef());
331
332             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
333
334             assertModification(transaction, MergeModification.class);
335         }};
336     }
337
338     @Test
339     public void testOnReceiveDeleteData() throws Exception {
340         new JavaTestKit(getSystem()) {{
341             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
342                     "testDeleteData");
343
344             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
345                     toSerializable(), getRef());
346
347             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
348
349             assertModification(transaction, DeleteModification.class);
350
351             //unserialized
352             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
353
354             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
355         }};
356     }
357
358     @Test
359     public void testOnReceiveBatchedModifications() throws Exception {
360         new JavaTestKit(getSystem()) {{
361
362             DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
363             final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
364
365             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
366             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
367                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
368                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
369
370             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
371             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
372                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
373
374             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
375
376             BatchedModifications batched = new BatchedModifications(DataStoreVersions.CURRENT_VERSION);
377             batched.addModification(new WriteModification(writePath, writeData));
378             batched.addModification(new MergeModification(mergePath, mergeData));
379             batched.addModification(new DeleteModification(deletePath));
380
381             transaction.tell(batched, getRef());
382
383             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
384             assertEquals("getNumBatched", 3, reply.getNumBatched());
385
386             JavaTestKit verification = new JavaTestKit(getSystem());
387             transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
388
389             CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
390                         GetCompositeModificationReply.class).getModification();
391
392             assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
393
394             WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
395             assertEquals("getPath", writePath, write.getPath());
396             assertEquals("getData", writeData, write.getData());
397
398             MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
399             assertEquals("getPath", mergePath, merge.getPath());
400             assertEquals("getData", mergeData, merge.getData());
401
402             DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
403             assertEquals("getPath", deletePath, delete.getPath());
404
405             InOrder inOrder = Mockito.inOrder(mockWriteTx);
406             inOrder.verify(mockWriteTx).write(writePath, writeData);
407             inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
408             inOrder.verify(mockWriteTx).delete(deletePath);
409         }};
410     }
411
412     @Test
413     public void testOnReceiveReadyTransaction() throws Exception {
414         new JavaTestKit(getSystem()) {{
415             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
416                     "testReadyTransaction");
417
418             watch(transaction);
419
420             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
421
422             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
423                     Terminated.class);
424             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
425                     Terminated.class);
426         }};
427
428         // test
429         new JavaTestKit(getSystem()) {{
430             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
431                     "testReadyTransaction2");
432
433             watch(transaction);
434
435             transaction.tell(new ReadyTransaction(), getRef());
436
437             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
438                     Terminated.class);
439             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
440                     Terminated.class);
441         }};
442     }
443
444     @Test
445     public void testOnReceiveCreateSnapshot() throws Exception {
446         new JavaTestKit(getSystem()) {{
447             ShardTest.writeToStore(store, TestModel.TEST_PATH,
448                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
449
450             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
451                     YangInstanceIdentifier.builder().build());
452
453             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
454                     "testOnReceiveCreateSnapshot");
455
456             watch(transaction);
457
458             transaction.tell(CreateSnapshot.INSTANCE, getRef());
459
460             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
461
462             assertNotNull("getSnapshot is null", reply.getSnapshot());
463
464             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
465                     reply.getSnapshot());
466
467             assertEquals("Root node", expectedRoot, actualRoot);
468
469             expectTerminated(duration("3 seconds"), transaction);
470         }};
471     }
472
473     @Test
474     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
475         new JavaTestKit(getSystem()) {{
476             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
477                     "testReadWriteTxOnReceiveCloseTransaction");
478
479             watch(transaction);
480
481             transaction.tell(new CloseTransaction().toSerializable(), getRef());
482
483             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
484             expectTerminated(duration("3 seconds"), transaction);
485         }};
486     }
487
488     @Test
489     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
490         new JavaTestKit(getSystem()) {{
491             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
492                     "testWriteTxOnReceiveCloseTransaction");
493
494             watch(transaction);
495
496             transaction.tell(new CloseTransaction().toSerializable(), getRef());
497
498             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
499             expectTerminated(duration("3 seconds"), transaction);
500         }};
501     }
502
503     @Test
504     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
505         new JavaTestKit(getSystem()) {{
506             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
507                     "testReadOnlyTxOnReceiveCloseTransaction");
508
509             watch(transaction);
510
511             transaction.tell(new CloseTransaction().toSerializable(), getRef());
512
513             expectMsgClass(duration("3 seconds"), Terminated.class);
514         }};
515     }
516
517     @Test(expected=UnknownMessageException.class)
518     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
519         final ActorRef shard = createShard();
520         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
521                 testSchemaContext, datastoreContext, shardStats, "txn",
522                 DataStoreVersions.CURRENT_VERSION);
523         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
524
525         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
526                 toSerializable(), ActorRef.noSender());
527     }
528
529     @Test
530     public void testShardTransactionInactivity() {
531
532         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
533                 Duration.create(500, TimeUnit.MILLISECONDS)).build();
534
535         new JavaTestKit(getSystem()) {{
536             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
537                     "testShardTransactionInactivity");
538
539             watch(transaction);
540
541             expectMsgClass(duration("3 seconds"), Terminated.class);
542         }};
543     }
544 }