b22001a4da6c234f6ac7b7167e4f4a4e18d25771
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import static org.mockito.Mockito.doThrow;
8 import akka.actor.ActorRef;
9 import akka.actor.Props;
10 import akka.actor.Status.Failure;
11 import akka.actor.Terminated;
12 import akka.testkit.JavaTestKit;
13 import akka.testkit.TestActorRef;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.Collections;
16 import java.util.concurrent.TimeUnit;
17 import org.junit.Before;
18 import org.junit.Test;
19 import org.mockito.InOrder;
20 import org.mockito.Mockito;
21 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
22 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
23 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
32 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
33 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
34 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
36 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
42 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
43 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
45 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.Modification;
47 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
48 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
49 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
50 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
51 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
52 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
53 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
54 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
60 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
61 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
62 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
63
64 public class ShardTransactionTest extends AbstractActorTest {
65
66     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
67
68     private static final ShardIdentifier SHARD_IDENTIFIER =
69         ShardIdentifier.builder().memberName("member-1")
70             .shardName("inventory").type("config").build();
71
72     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
73
74     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
75
76     private final InMemoryDOMDataStore store =
77             new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
78
79     @Before
80     public void setup() {
81         store.onGlobalContextUpdated(testSchemaContext);
82     }
83
84     private ActorRef createShard(){
85         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
86             Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
87     }
88
89     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
90         return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
91     }
92
93     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
94         return newTransactionActor(transaction, null, name, version);
95     }
96
97     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
98         return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
99     }
100
101     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
102             short version) {
103         Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
104                 datastoreContext, shardStats, "txn", version);
105         return getSystem().actorOf(props, name);
106     }
107
108     @Test
109     public void testOnReceiveReadData() throws Exception {
110         new JavaTestKit(getSystem()) {{
111             final ActorRef shard = createShard();
112
113             testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
114
115             testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
116         }
117
118         private void testOnReceiveReadData(final ActorRef transaction) {
119             //serialized read
120             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
121                 getRef());
122
123             Object replySerialized =
124                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
125
126             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
127
128             // unserialized read
129             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
130
131             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
132
133             assertNotNull(reply.getNormalizedNode());
134         }};
135     }
136
137     @Test
138     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
139         new JavaTestKit(getSystem()) {{
140             final ActorRef shard = createShard();
141
142             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
143                     store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
144
145             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
146                     store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
147         }
148
149         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
150             // serialized read
151             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
152
153             Object replySerialized =
154                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
155
156             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
157
158             // unserialized read
159             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
160
161             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
162
163             assertTrue(reply.getNormalizedNode() == null);
164         }};
165     }
166
167     @Test
168     public void testOnReceiveReadDataHeliumR1() throws Exception {
169         new JavaTestKit(getSystem()) {{
170             ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
171                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
172
173             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
174                     getRef());
175
176             ShardTransactionMessages.ReadDataReply replySerialized =
177                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
178
179             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
180         }};
181     }
182
183     @Test
184     public void testOnReceiveDataExistsPositive() throws Exception {
185         new JavaTestKit(getSystem()) {{
186             final ActorRef shard = createShard();
187
188             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
189                     "testDataExistsPositiveRO"));
190
191             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
192                     "testDataExistsPositiveRW"));
193         }
194
195         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
196             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
197                 getRef());
198
199             ShardTransactionMessages.DataExistsReply replySerialized =
200                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
201
202             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
203
204             // unserialized read
205             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
206
207             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
208
209             assertTrue(reply.exists());
210         }};
211     }
212
213     @Test
214     public void testOnReceiveDataExistsNegative() throws Exception {
215         new JavaTestKit(getSystem()) {{
216             final ActorRef shard = createShard();
217
218             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
219                     "testDataExistsNegativeRO"));
220
221             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
222                     "testDataExistsNegativeRW"));
223         }
224
225         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
226             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
227
228             ShardTransactionMessages.DataExistsReply replySerialized =
229                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
230
231             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
232
233             // unserialized read
234             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
235
236             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
237
238             assertFalse(reply.exists());
239         }};
240     }
241
242     private void assertModification(final ActorRef subject,
243         final Class<? extends Modification> modificationType) {
244         new JavaTestKit(getSystem()) {{
245             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
246
247             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
248                     GetCompositeModificationReply.class).getModification();
249
250             assertTrue(compositeModification.getModifications().size() == 1);
251             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
252         }};
253     }
254
255     @Test
256     public void testOnReceiveWriteData() throws Exception {
257         new JavaTestKit(getSystem()) {{
258             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
259                     "testOnReceiveWriteData");
260
261             transaction.tell(new WriteData(TestModel.TEST_PATH,
262                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
263                         toSerializable(), getRef());
264
265             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
266
267             assertModification(transaction, WriteModification.class);
268
269             // unserialized write
270             transaction.tell(new WriteData(TestModel.TEST_PATH,
271                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
272                 getRef());
273
274             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
275         }};
276     }
277
278     @Test
279     public void testOnReceiveHeliumR1WriteData() throws Exception {
280         new JavaTestKit(getSystem()) {{
281             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
282                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
283
284             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
285                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
286             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
287                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
288                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
289
290             transaction.tell(serialized, getRef());
291
292             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
293
294             assertModification(transaction, WriteModification.class);
295         }};
296     }
297
298     @Test
299     public void testOnReceiveMergeData() throws Exception {
300         new JavaTestKit(getSystem()) {{
301             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
302                     "testMergeData");
303
304             transaction.tell(new MergeData(TestModel.TEST_PATH,
305                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
306                         toSerializable(), getRef());
307
308             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
309
310             assertModification(transaction, MergeModification.class);
311
312             //unserialized merge
313             transaction.tell(new MergeData(TestModel.TEST_PATH,
314                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
315                 getRef());
316
317             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
318         }};
319     }
320
321     @Test
322     public void testOnReceiveHeliumR1MergeData() throws Exception {
323         new JavaTestKit(getSystem()) {{
324             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
325                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
326
327             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
328                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
329             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
330                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
331                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
332
333             transaction.tell(serialized, getRef());
334
335             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
336
337             assertModification(transaction, MergeModification.class);
338         }};
339     }
340
341     @Test
342     public void testOnReceiveDeleteData() throws Exception {
343         new JavaTestKit(getSystem()) {{
344             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
345                     "testDeleteData");
346
347             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
348                     toSerializable(), getRef());
349
350             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
351
352             assertModification(transaction, DeleteModification.class);
353
354             //unserialized
355             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
356
357             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
358         }};
359     }
360
361     @Test
362     public void testOnReceiveBatchedModifications() throws Exception {
363         new JavaTestKit(getSystem()) {{
364
365             DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
366             final ActorRef transaction = newTransactionActor(mockWriteTx, "testOnReceiveBatchedModifications");
367
368             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
369             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
370                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
371                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
372
373             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
374             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
375                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
376
377             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
378
379             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
380             batched.addModification(new WriteModification(writePath, writeData));
381             batched.addModification(new MergeModification(mergePath, mergeData));
382             batched.addModification(new DeleteModification(deletePath));
383
384             transaction.tell(batched, getRef());
385
386             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
387             assertEquals("getNumBatched", 3, reply.getNumBatched());
388
389             JavaTestKit verification = new JavaTestKit(getSystem());
390             transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
391
392             CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
393                         GetCompositeModificationReply.class).getModification();
394
395             assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
396
397             WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
398             assertEquals("getPath", writePath, write.getPath());
399             assertEquals("getData", writeData, write.getData());
400
401             MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
402             assertEquals("getPath", mergePath, merge.getPath());
403             assertEquals("getData", mergeData, merge.getData());
404
405             DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
406             assertEquals("getPath", deletePath, delete.getPath());
407
408             InOrder inOrder = Mockito.inOrder(mockWriteTx);
409             inOrder.verify(mockWriteTx).write(writePath, writeData);
410             inOrder.verify(mockWriteTx).merge(mergePath, mergeData);
411             inOrder.verify(mockWriteTx).delete(deletePath);
412         }};
413     }
414
415     @Test
416     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
417         new JavaTestKit(getSystem()) {{
418
419             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
420                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
421
422             JavaTestKit watcher = new JavaTestKit(getSystem());
423             watcher.watch(transaction);
424
425             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
426             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
427                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
428                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
429
430             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
431             batched.addModification(new WriteModification(writePath, writeData));
432
433             transaction.tell(batched, getRef());
434             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
435             assertEquals("getNumBatched", 1, reply.getNumBatched());
436
437             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
438             batched.setReady(true);
439             batched.setTotalMessagesSent(2);
440
441             transaction.tell(batched, getRef());
442             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
443             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
444         }};
445     }
446
447     @Test
448     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
449         new JavaTestKit(getSystem()) {{
450
451             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
452                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
453
454             JavaTestKit watcher = new JavaTestKit(getSystem());
455             watcher.watch(transaction);
456
457             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
458             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
459                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
460                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
461
462             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
463             batched.addModification(new WriteModification(writePath, writeData));
464             batched.setReady(true);
465             batched.setDoCommitOnReady(true);
466             batched.setTotalMessagesSent(1);
467
468             transaction.tell(batched, getRef());
469             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
470             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
471         }};
472     }
473
474     @Test(expected=TestException.class)
475     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
476         new JavaTestKit(getSystem()) {{
477
478             DOMStoreWriteTransaction mockWriteTx = Mockito.mock(DOMStoreWriteTransaction.class);
479             final ActorRef transaction = newTransactionActor(mockWriteTx,
480                     "testOnReceiveBatchedModificationsFailure");
481
482             JavaTestKit watcher = new JavaTestKit(getSystem());
483             watcher.watch(transaction);
484
485             YangInstanceIdentifier path = TestModel.TEST_PATH;
486             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
487
488             doThrow(new TestException()).when(mockWriteTx).write(path, node);
489
490             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
491             batched.addModification(new WriteModification(path, node));
492
493             transaction.tell(batched, getRef());
494             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
495
496             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
497             batched.setReady(true);
498             batched.setTotalMessagesSent(2);
499
500             transaction.tell(batched, getRef());
501             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
502             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
503
504             if(failure != null) {
505                 throw failure.cause();
506             }
507         }};
508     }
509
510     @Test(expected=IllegalStateException.class)
511     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
512         new JavaTestKit(getSystem()) {{
513
514             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
515                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
516
517             JavaTestKit watcher = new JavaTestKit(getSystem());
518             watcher.watch(transaction);
519
520             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
521             batched.setReady(true);
522             batched.setTotalMessagesSent(2);
523
524             transaction.tell(batched, getRef());
525
526             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
527             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
528
529             if(failure != null) {
530                 throw failure.cause();
531             }
532         }};
533     }
534
535     @Test
536     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
537         new JavaTestKit(getSystem()) {{
538             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
539                     "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
540
541             JavaTestKit watcher = new JavaTestKit(getSystem());
542             watcher.watch(transaction);
543
544             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
545
546             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
547             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
548         }};
549
550         // test
551         new JavaTestKit(getSystem()) {{
552             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
553                     "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
554
555             JavaTestKit watcher = new JavaTestKit(getSystem());
556             watcher.watch(transaction);
557
558             transaction.tell(new ReadyTransaction(), getRef());
559
560             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
561             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
562         }};
563     }
564
565     @Test
566     public void testOnReceiveCreateSnapshot() throws Exception {
567         new JavaTestKit(getSystem()) {{
568             ShardTest.writeToStore(store, TestModel.TEST_PATH,
569                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
570
571             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
572                     YangInstanceIdentifier.builder().build());
573
574             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
575                     "testOnReceiveCreateSnapshot");
576
577             watch(transaction);
578
579             transaction.tell(CreateSnapshot.INSTANCE, getRef());
580
581             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
582
583             assertNotNull("getSnapshot is null", reply.getSnapshot());
584
585             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
586                     reply.getSnapshot());
587
588             assertEquals("Root node", expectedRoot, actualRoot);
589
590             expectTerminated(duration("3 seconds"), transaction);
591         }};
592     }
593
594     @Test
595     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
596         new JavaTestKit(getSystem()) {{
597             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
598                     "testReadWriteTxOnReceiveCloseTransaction");
599
600             watch(transaction);
601
602             transaction.tell(new CloseTransaction().toSerializable(), getRef());
603
604             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
605             expectTerminated(duration("3 seconds"), transaction);
606         }};
607     }
608
609     @Test
610     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
611         new JavaTestKit(getSystem()) {{
612             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
613                     "testWriteTxOnReceiveCloseTransaction");
614
615             watch(transaction);
616
617             transaction.tell(new CloseTransaction().toSerializable(), getRef());
618
619             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
620             expectTerminated(duration("3 seconds"), transaction);
621         }};
622     }
623
624     @Test
625     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
626         new JavaTestKit(getSystem()) {{
627             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
628                     "testReadOnlyTxOnReceiveCloseTransaction");
629
630             watch(transaction);
631
632             transaction.tell(new CloseTransaction().toSerializable(), getRef());
633
634             expectMsgClass(duration("3 seconds"), Terminated.class);
635         }};
636     }
637
638     @Test(expected=UnknownMessageException.class)
639     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
640         final ActorRef shard = createShard();
641         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
642                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
643         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
644
645         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
646                 toSerializable(), ActorRef.noSender());
647     }
648
649     @Test
650     public void testShardTransactionInactivity() {
651
652         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
653                 500, TimeUnit.MILLISECONDS).build();
654
655         new JavaTestKit(getSystem()) {{
656             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
657                     "testShardTransactionInactivity");
658
659             watch(transaction);
660
661             expectMsgClass(duration("3 seconds"), Terminated.class);
662         }};
663     }
664
665     public static class TestException extends RuntimeException {
666         private static final long serialVersionUID = 1L;
667     }
668 }