Handle leader change on ForwardedReadyTransaction in Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.doThrow;
16 import akka.actor.ActorRef;
17 import akka.actor.Props;
18 import akka.actor.Status.Failure;
19 import akka.actor.Terminated;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import java.util.concurrent.TimeUnit;
23 import org.junit.Test;
24 import org.mockito.InOrder;
25 import org.mockito.Mockito;
26 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
27 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
28 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
29 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
30 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
35 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
36 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
37 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
38 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
39 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
40 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
46 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
47 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
48 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
49 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
50 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
51 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
52 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
53 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
54 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
55 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
56 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
57 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
60 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
61 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
62 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64
65 public class ShardTransactionTest extends AbstractActorTest {
66
67     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
68     private static final TransactionType RO = TransactionType.READ_ONLY;
69     private static final TransactionType RW = TransactionType.READ_WRITE;
70     private static final TransactionType WO = TransactionType.WRITE_ONLY;
71
72     private static final ShardIdentifier SHARD_IDENTIFIER =
73         ShardIdentifier.builder().memberName("member-1")
74             .shardName("inventory").type("config").build();
75
76     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
77
78     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
79
80     private final ShardDataTree store = new ShardDataTree(testSchemaContext, TreeType.OPERATIONAL);
81
82     private int txCounter = 0;
83
84     private ActorRef createShard() {
85         ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext).
86                 schemaContext(TestModel.createTestContext()).props());
87         ShardTestKit.waitUntilLeader(shard);
88         return shard;
89     }
90
91     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
92         return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
93     }
94
95     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
96         return newTransactionActor(type, transaction, null, name, version);
97     }
98
99     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
100         return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
101     }
102
103     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
104             short version) {
105         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
106                 datastoreContext, shardStats, "txn", version);
107         return getSystem().actorOf(props, name);
108     }
109
110     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
111         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
112     }
113
114     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
115         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
116     }
117
118     @Test
119     public void testOnReceiveReadData() throws Exception {
120         new JavaTestKit(getSystem()) {{
121             final ActorRef shard = createShard();
122
123             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
124
125             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
126         }
127
128         private void testOnReceiveReadData(final ActorRef transaction) {
129             //serialized read
130             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
131                 getRef());
132
133             Object replySerialized =
134                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
135
136             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
137
138             // unserialized read
139             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
140
141             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
142
143             assertNotNull(reply.getNormalizedNode());
144         }};
145     }
146
147     @Test
148     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
149         new JavaTestKit(getSystem()) {{
150             final ActorRef shard = createShard();
151
152             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
153                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
154
155             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
156                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
157         }
158
159         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
160             // serialized read
161             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
162
163             Object replySerialized =
164                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
165
166             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
167
168             // unserialized read
169             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
170
171             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
172
173             assertTrue(reply.getNormalizedNode() == null);
174         }};
175     }
176
177     @Test
178     public void testOnReceiveReadDataHeliumR1() throws Exception {
179         new JavaTestKit(getSystem()) {{
180             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
181                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
182
183             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
184                     getRef());
185
186             ShardTransactionMessages.ReadDataReply replySerialized =
187                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
188
189             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
190         }};
191     }
192
193     @Test
194     public void testOnReceiveDataExistsPositive() throws Exception {
195         new JavaTestKit(getSystem()) {{
196             final ActorRef shard = createShard();
197
198             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
199                     "testDataExistsPositiveRO"));
200
201             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
202                     "testDataExistsPositiveRW"));
203         }
204
205         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
206             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
207                 getRef());
208
209             ShardTransactionMessages.DataExistsReply replySerialized =
210                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
211
212             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
213
214             // unserialized read
215             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
216
217             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
218
219             assertTrue(reply.exists());
220         }};
221     }
222
223     @Test
224     public void testOnReceiveDataExistsNegative() throws Exception {
225         new JavaTestKit(getSystem()) {{
226             final ActorRef shard = createShard();
227
228             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
229                     "testDataExistsNegativeRO"));
230
231             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
232                     "testDataExistsNegativeRW"));
233         }
234
235         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
236             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
237
238             ShardTransactionMessages.DataExistsReply replySerialized =
239                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
240
241             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
242
243             // unserialized read
244             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
245
246             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
247
248             assertFalse(reply.exists());
249         }};
250     }
251
252     @Test
253     public void testOnReceiveWriteData() {
254         new JavaTestKit(getSystem()) {{
255             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
256                     "testOnReceiveWriteData");
257
258             transaction.tell(new WriteData(TestModel.TEST_PATH,
259                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
260                         toSerializable(), getRef());
261
262             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
263
264             // unserialized write
265             transaction.tell(new WriteData(TestModel.TEST_PATH,
266                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
267                 getRef());
268
269             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
270         }};
271     }
272
273     @Test
274     public void testOnReceiveHeliumR1WriteData() {
275         new JavaTestKit(getSystem()) {{
276             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
277                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
278
279             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
280                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
281             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
282                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
283                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
284
285             transaction.tell(serialized, getRef());
286
287             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
288         }};
289     }
290
291     @Test
292     public void testOnReceiveMergeData() {
293         new JavaTestKit(getSystem()) {{
294             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
295                     "testMergeData");
296
297             transaction.tell(new MergeData(TestModel.TEST_PATH,
298                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
299                         toSerializable(), getRef());
300
301             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
302
303             //unserialized merge
304             transaction.tell(new MergeData(TestModel.TEST_PATH,
305                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
306                 getRef());
307
308             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
309         }};
310     }
311
312     @Test
313     public void testOnReceiveHeliumR1MergeData() throws Exception {
314         new JavaTestKit(getSystem()) {{
315             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
316                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
317
318             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
319                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
320             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
321                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
322                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
323
324             transaction.tell(serialized, getRef());
325
326             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
327         }};
328     }
329
330     @Test
331     public void testOnReceiveDeleteData() throws Exception {
332         new JavaTestKit(getSystem()) {{
333             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
334                     "testDeleteData");
335
336             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
337                     toSerializable(), getRef());
338
339             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
340
341             //unserialized
342             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
343
344             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
345         }};
346     }
347
348     @Test
349     public void testOnReceiveBatchedModifications() throws Exception {
350         new JavaTestKit(getSystem()) {{
351
352             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
353             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
354             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
355             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
356
357             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
358             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
359                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
360                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
361
362             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
363             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
364                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
365
366             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
367
368             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
369             batched.addModification(new WriteModification(writePath, writeData));
370             batched.addModification(new MergeModification(mergePath, mergeData));
371             batched.addModification(new DeleteModification(deletePath));
372
373             transaction.tell(batched, getRef());
374
375             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
376             assertEquals("getNumBatched", 3, reply.getNumBatched());
377
378             InOrder inOrder = Mockito.inOrder(mockModification);
379             inOrder.verify(mockModification).write(writePath, writeData);
380             inOrder.verify(mockModification).merge(mergePath, mergeData);
381             inOrder.verify(mockModification).delete(deletePath);
382         }};
383     }
384
385     @Test
386     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
387         new JavaTestKit(getSystem()) {{
388
389             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
390                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
391
392             JavaTestKit watcher = new JavaTestKit(getSystem());
393             watcher.watch(transaction);
394
395             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
396             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
397                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
398                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
399
400             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
401             batched.addModification(new WriteModification(writePath, writeData));
402
403             transaction.tell(batched, getRef());
404             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
405             assertEquals("getNumBatched", 1, reply.getNumBatched());
406
407             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
408             batched.setReady(true);
409             batched.setTotalMessagesSent(2);
410
411             transaction.tell(batched, getRef());
412             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
413             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
414         }};
415     }
416
417     @Test
418     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
419         new JavaTestKit(getSystem()) {{
420
421             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
422                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
423
424             JavaTestKit watcher = new JavaTestKit(getSystem());
425             watcher.watch(transaction);
426
427             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
428             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
429                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
430                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
431
432             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
433             batched.addModification(new WriteModification(writePath, writeData));
434             batched.setReady(true);
435             batched.setDoCommitOnReady(true);
436             batched.setTotalMessagesSent(1);
437
438             transaction.tell(batched, getRef());
439             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
440             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
441         }};
442     }
443
444     @Test(expected=TestException.class)
445     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
446         new JavaTestKit(getSystem()) {{
447
448             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
449             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
450             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
451             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
452                     "testOnReceiveBatchedModificationsFailure");
453
454             JavaTestKit watcher = new JavaTestKit(getSystem());
455             watcher.watch(transaction);
456
457             YangInstanceIdentifier path = TestModel.TEST_PATH;
458             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
459
460             doThrow(new TestException()).when(mockModification).write(path, node);
461
462             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
463             batched.addModification(new WriteModification(path, node));
464
465             transaction.tell(batched, getRef());
466             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
467
468             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
469             batched.setReady(true);
470             batched.setTotalMessagesSent(2);
471
472             transaction.tell(batched, getRef());
473             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
474             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
475
476             if(failure != null) {
477                 throw failure.cause();
478             }
479         }};
480     }
481
482     @Test(expected=IllegalStateException.class)
483     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
484         new JavaTestKit(getSystem()) {{
485
486             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
487                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
488
489             JavaTestKit watcher = new JavaTestKit(getSystem());
490             watcher.watch(transaction);
491
492             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
493             batched.setReady(true);
494             batched.setTotalMessagesSent(2);
495
496             transaction.tell(batched, getRef());
497
498             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
499             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
500
501             if(failure != null) {
502                 throw failure.cause();
503             }
504         }};
505     }
506
507     @Test
508     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
509         new JavaTestKit(getSystem()) {{
510             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
511                     "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
512
513             JavaTestKit watcher = new JavaTestKit(getSystem());
514             watcher.watch(transaction);
515
516             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
517
518             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
519             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
520         }};
521
522         // test
523         new JavaTestKit(getSystem()) {{
524             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
525                     "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
526
527             JavaTestKit watcher = new JavaTestKit(getSystem());
528             watcher.watch(transaction);
529
530             transaction.tell(new ReadyTransaction(), getRef());
531
532             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
533             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
534         }};
535     }
536
537     @Test
538     public void testOnReceiveCreateSnapshot() throws Exception {
539         new JavaTestKit(getSystem()) {{
540             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
541                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
542
543             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
544                     YangInstanceIdentifier.builder().build());
545
546             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
547                     "testOnReceiveCreateSnapshot");
548
549             watch(transaction);
550
551             transaction.tell(CreateSnapshot.INSTANCE, getRef());
552
553             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
554
555             assertNotNull("getSnapshot is null", reply.getSnapshot());
556
557             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
558                     reply.getSnapshot());
559
560             assertEquals("Root node", expectedRoot, actualRoot);
561
562             expectTerminated(duration("3 seconds"), transaction);
563         }};
564     }
565
566     @Test
567     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
568         new JavaTestKit(getSystem()) {{
569             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
570                     "testReadWriteTxOnReceiveCloseTransaction");
571
572             watch(transaction);
573
574             transaction.tell(new CloseTransaction().toSerializable(), getRef());
575
576             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
577             expectTerminated(duration("3 seconds"), transaction);
578         }};
579     }
580
581     @Test
582     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
583         new JavaTestKit(getSystem()) {{
584             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
585                     "testWriteTxOnReceiveCloseTransaction");
586
587             watch(transaction);
588
589             transaction.tell(new CloseTransaction().toSerializable(), getRef());
590
591             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
592             expectTerminated(duration("3 seconds"), transaction);
593         }};
594     }
595
596     @Test
597     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
598         new JavaTestKit(getSystem()) {{
599             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
600                     "testReadOnlyTxOnReceiveCloseTransaction");
601
602             watch(transaction);
603
604             transaction.tell(new CloseTransaction().toSerializable(), getRef());
605
606             expectMsgClass(duration("3 seconds"), Terminated.class);
607         }};
608     }
609
610     @Test(expected=UnknownMessageException.class)
611     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
612         final ActorRef shard = createShard();
613         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
614                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
615         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
616
617         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
618                 toSerializable(), ActorRef.noSender());
619     }
620
621     @Test
622     public void testShardTransactionInactivity() {
623
624         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
625                 500, TimeUnit.MILLISECONDS).build();
626
627         new JavaTestKit(getSystem()) {{
628             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
629                     "testShardTransactionInactivity");
630
631             watch(transaction);
632
633             expectMsgClass(duration("3 seconds"), Terminated.class);
634         }};
635     }
636
637     public static class TestException extends RuntimeException {
638         private static final long serialVersionUID = 1L;
639     }
640 }