CDS: split TransactionType from TransactionProxy
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import static org.mockito.Mockito.doThrow;
8 import akka.actor.ActorRef;
9 import akka.actor.Props;
10 import akka.actor.Status.Failure;
11 import akka.actor.Terminated;
12 import akka.testkit.JavaTestKit;
13 import akka.testkit.TestActorRef;
14 import java.util.Collections;
15 import java.util.concurrent.TimeUnit;
16 import org.junit.Test;
17 import org.mockito.InOrder;
18 import org.mockito.Mockito;
19 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
20 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
23 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
33 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
34 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
40 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
41 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.Modification;
45 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
46 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
47 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
48 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
49 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
50 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
51 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
56 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
57 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
58 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
59
60 public class ShardTransactionTest extends AbstractActorTest {
61
62     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
63     private static final TransactionType RO = TransactionType.READ_ONLY;
64     private static final TransactionType RW = TransactionType.READ_WRITE;
65     private static final TransactionType WO = TransactionType.WRITE_ONLY;
66
67     private static final ShardIdentifier SHARD_IDENTIFIER =
68         ShardIdentifier.builder().memberName("member-1")
69             .shardName("inventory").type("config").build();
70
71     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
72
73     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
74
75     private final ShardDataTree store = new ShardDataTree(testSchemaContext);
76
77     private int txCounter = 0;
78
79     private ActorRef createShard() {
80         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
81             Collections.<String, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
82     }
83
84     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name) {
85         return newTransactionActor(type, transaction, name, DataStoreVersions.CURRENT_VERSION);
86     }
87
88     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, String name, short version) {
89         return newTransactionActor(type, transaction, null, name, version);
90     }
91
92     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name) {
93         return newTransactionActor(type, transaction, null, name, DataStoreVersions.CURRENT_VERSION);
94     }
95
96     private ActorRef newTransactionActor(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shard, String name,
97             short version) {
98         Props props = ShardTransaction.props(type, transaction, shard != null ? shard : createShard(),
99                 datastoreContext, shardStats, "txn", version);
100         return getSystem().actorOf(props, name);
101     }
102
103     private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
104         return store.newReadOnlyTransaction("test-ro-" + String.valueOf(txCounter++), null);
105     }
106
107     private ReadWriteShardDataTreeTransaction readWriteTransaction() {
108         return store.newReadWriteTransaction("test-rw-" + String.valueOf(txCounter++), null);
109     }
110
111     @Test
112     public void testOnReceiveReadData() throws Exception {
113         new JavaTestKit(getSystem()) {{
114             final ActorRef shard = createShard();
115
116             testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), shard, "testReadDataRO"));
117
118             testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), shard, "testReadDataRW"));
119         }
120
121         private void testOnReceiveReadData(final ActorRef transaction) {
122             //serialized read
123             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
124                 getRef());
125
126             Object replySerialized =
127                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
128
129             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
130
131             // unserialized read
132             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
133
134             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
135
136             assertNotNull(reply.getNormalizedNode());
137         }};
138     }
139
140     @Test
141     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
142         new JavaTestKit(getSystem()) {{
143             final ActorRef shard = createShard();
144
145             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
146                     RO, readOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
147
148             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
149                     RW, readWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
150         }
151
152         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
153             // serialized read
154             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
155
156             Object replySerialized =
157                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
158
159             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
160
161             // unserialized read
162             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
163
164             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
165
166             assertTrue(reply.getNormalizedNode() == null);
167         }};
168     }
169
170     @Test
171     public void testOnReceiveReadDataHeliumR1() throws Exception {
172         new JavaTestKit(getSystem()) {{
173             ActorRef transaction = newTransactionActor(RO, readOnlyTransaction(),
174                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
175
176             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
177                     getRef());
178
179             ShardTransactionMessages.ReadDataReply replySerialized =
180                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
181
182             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
183         }};
184     }
185
186     @Test
187     public void testOnReceiveDataExistsPositive() throws Exception {
188         new JavaTestKit(getSystem()) {{
189             final ActorRef shard = createShard();
190
191             testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), shard,
192                     "testDataExistsPositiveRO"));
193
194             testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), shard,
195                     "testDataExistsPositiveRW"));
196         }
197
198         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
199             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
200                 getRef());
201
202             ShardTransactionMessages.DataExistsReply replySerialized =
203                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
204
205             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
206
207             // unserialized read
208             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
209
210             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
211
212             assertTrue(reply.exists());
213         }};
214     }
215
216     @Test
217     public void testOnReceiveDataExistsNegative() throws Exception {
218         new JavaTestKit(getSystem()) {{
219             final ActorRef shard = createShard();
220
221             testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), shard,
222                     "testDataExistsNegativeRO"));
223
224             testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), shard,
225                     "testDataExistsNegativeRW"));
226         }
227
228         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
229             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
230
231             ShardTransactionMessages.DataExistsReply replySerialized =
232                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
233
234             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
235
236             // unserialized read
237             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
238
239             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
240
241             assertFalse(reply.exists());
242         }};
243     }
244
245     private void assertModification(final ActorRef subject,
246         final Class<? extends Modification> modificationType) {
247         new JavaTestKit(getSystem()) {{
248             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
249
250             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
251                     GetCompositeModificationReply.class).getModification();
252
253             assertTrue(compositeModification.getModifications().size() == 1);
254             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
255         }};
256     }
257
258     @Test
259     public void testOnReceiveWriteData() {
260         new JavaTestKit(getSystem()) {{
261             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
262                     "testOnReceiveWriteData");
263
264             transaction.tell(new WriteData(TestModel.TEST_PATH,
265                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
266                         toSerializable(), getRef());
267
268             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
269
270             assertModification(transaction, WriteModification.class);
271
272             // unserialized write
273             transaction.tell(new WriteData(TestModel.TEST_PATH,
274                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
275                 getRef());
276
277             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
278         }};
279     }
280
281     @Test
282     public void testOnReceiveHeliumR1WriteData() {
283         new JavaTestKit(getSystem()) {{
284             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
285                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
286
287             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
288                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
289             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
290                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
291                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
292
293             transaction.tell(serialized, getRef());
294
295             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
296
297             assertModification(transaction, WriteModification.class);
298         }};
299     }
300
301     @Test
302     public void testOnReceiveMergeData() {
303         new JavaTestKit(getSystem()) {{
304             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
305                     "testMergeData");
306
307             transaction.tell(new MergeData(TestModel.TEST_PATH,
308                     ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.HELIUM_2_VERSION).
309                         toSerializable(), getRef());
310
311             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
312
313             assertModification(transaction, MergeModification.class);
314
315             //unserialized merge
316             transaction.tell(new MergeData(TestModel.TEST_PATH,
317                 ImmutableNodes.containerNode(TestModel.TEST_QNAME), DataStoreVersions.CURRENT_VERSION),
318                 getRef());
319
320             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
321         }};
322     }
323
324     @Test
325     public void testOnReceiveHeliumR1MergeData() throws Exception {
326         new JavaTestKit(getSystem()) {{
327             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
328                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
329
330             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
331                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
332             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
333                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
334                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
335
336             transaction.tell(serialized, getRef());
337
338             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
339
340             assertModification(transaction, MergeModification.class);
341         }};
342     }
343
344     @Test
345     public void testOnReceiveDeleteData() throws Exception {
346         new JavaTestKit(getSystem()) {{
347             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
348                     "testDeleteData");
349
350             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.HELIUM_2_VERSION).
351                     toSerializable(), getRef());
352
353             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
354
355             assertModification(transaction, DeleteModification.class);
356
357             //unserialized
358             transaction.tell(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), getRef());
359
360             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
361         }};
362     }
363
364     @Test
365     public void testOnReceiveBatchedModifications() throws Exception {
366         new JavaTestKit(getSystem()) {{
367
368             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
369             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
370             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
371             final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
372
373             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
374             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
375                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
376                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
377
378             YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
379             NormalizedNode<?, ?> mergeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
380                     new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
381
382             YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
383
384             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
385             batched.addModification(new WriteModification(writePath, writeData));
386             batched.addModification(new MergeModification(mergePath, mergeData));
387             batched.addModification(new DeleteModification(deletePath));
388
389             transaction.tell(batched, getRef());
390
391             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
392             assertEquals("getNumBatched", 3, reply.getNumBatched());
393
394             JavaTestKit verification = new JavaTestKit(getSystem());
395             transaction.tell(new ShardWriteTransaction.GetCompositedModification(), verification.getRef());
396
397             CompositeModification compositeModification = verification.expectMsgClass(duration("5 seconds"),
398                         GetCompositeModificationReply.class).getModification();
399
400             assertEquals("CompositeModification size", 3, compositeModification.getModifications().size());
401
402             WriteModification write = (WriteModification)compositeModification.getModifications().get(0);
403             assertEquals("getPath", writePath, write.getPath());
404             assertEquals("getData", writeData, write.getData());
405
406             MergeModification merge = (MergeModification)compositeModification.getModifications().get(1);
407             assertEquals("getPath", mergePath, merge.getPath());
408             assertEquals("getData", mergeData, merge.getData());
409
410             DeleteModification delete = (DeleteModification)compositeModification.getModifications().get(2);
411             assertEquals("getPath", deletePath, delete.getPath());
412
413             InOrder inOrder = Mockito.inOrder(mockModification);
414             inOrder.verify(mockModification).write(writePath, writeData);
415             inOrder.verify(mockModification).merge(mergePath, mergeData);
416             inOrder.verify(mockModification).delete(deletePath);
417         }};
418     }
419
420     @Test
421     public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() throws Exception {
422         new JavaTestKit(getSystem()) {{
423
424             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
425                     "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
426
427             JavaTestKit watcher = new JavaTestKit(getSystem());
428             watcher.watch(transaction);
429
430             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
431             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
432                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
433                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
434
435             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
436             batched.addModification(new WriteModification(writePath, writeData));
437
438             transaction.tell(batched, getRef());
439             BatchedModificationsReply reply = expectMsgClass(duration("5 seconds"), BatchedModificationsReply.class);
440             assertEquals("getNumBatched", 1, reply.getNumBatched());
441
442             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
443             batched.setReady(true);
444             batched.setTotalMessagesSent(2);
445
446             transaction.tell(batched, getRef());
447             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
448             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
449         }};
450     }
451
452     @Test
453     public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() throws Exception {
454         new JavaTestKit(getSystem()) {{
455
456             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
457                     "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
458
459             JavaTestKit watcher = new JavaTestKit(getSystem());
460             watcher.watch(transaction);
461
462             YangInstanceIdentifier writePath = TestModel.TEST_PATH;
463             NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
464                     new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
465                     withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
466
467             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
468             batched.addModification(new WriteModification(writePath, writeData));
469             batched.setReady(true);
470             batched.setDoCommitOnReady(true);
471             batched.setTotalMessagesSent(1);
472
473             transaction.tell(batched, getRef());
474             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
475             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
476         }};
477     }
478
479     @Test(expected=TestException.class)
480     public void testOnReceiveBatchedModificationsFailure() throws Throwable {
481         new JavaTestKit(getSystem()) {{
482
483             ShardDataTreeTransactionParent parent = Mockito.mock(ShardDataTreeTransactionParent.class);
484             DataTreeModification mockModification = Mockito.mock(DataTreeModification.class);
485             ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent, "id", mockModification);
486             final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
487                     "testOnReceiveBatchedModificationsFailure");
488
489             JavaTestKit watcher = new JavaTestKit(getSystem());
490             watcher.watch(transaction);
491
492             YangInstanceIdentifier path = TestModel.TEST_PATH;
493             ContainerNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
494
495             doThrow(new TestException()).when(mockModification).write(path, node);
496
497             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
498             batched.addModification(new WriteModification(path, node));
499
500             transaction.tell(batched, getRef());
501             expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
502
503             batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
504             batched.setReady(true);
505             batched.setTotalMessagesSent(2);
506
507             transaction.tell(batched, getRef());
508             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
509             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
510
511             if(failure != null) {
512                 throw failure.cause();
513             }
514         }};
515     }
516
517     @Test(expected=IllegalStateException.class)
518     public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Throwable {
519         new JavaTestKit(getSystem()) {{
520
521             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
522                     "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
523
524             JavaTestKit watcher = new JavaTestKit(getSystem());
525             watcher.watch(transaction);
526
527             BatchedModifications batched = new BatchedModifications("tx1", DataStoreVersions.CURRENT_VERSION, null);
528             batched.setReady(true);
529             batched.setTotalMessagesSent(2);
530
531             transaction.tell(batched, getRef());
532
533             Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
534             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
535
536             if(failure != null) {
537                 throw failure.cause();
538             }
539         }};
540     }
541
542     @Test
543     public void testOnReceivePreLithiumReadyTransaction() throws Exception {
544         new JavaTestKit(getSystem()) {{
545             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
546                     "testReadyTransaction", DataStoreVersions.HELIUM_2_VERSION);
547
548             JavaTestKit watcher = new JavaTestKit(getSystem());
549             watcher.watch(transaction);
550
551             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
552
553             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS);
554             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
555         }};
556
557         // test
558         new JavaTestKit(getSystem()) {{
559             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
560                     "testReadyTransaction2", DataStoreVersions.HELIUM_2_VERSION);
561
562             JavaTestKit watcher = new JavaTestKit(getSystem());
563             watcher.watch(transaction);
564
565             transaction.tell(new ReadyTransaction(), getRef());
566
567             expectMsgClass(duration("5 seconds"), ReadyTransactionReply.class);
568             watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
569         }};
570     }
571
572     @Test
573     public void testOnReceiveCreateSnapshot() throws Exception {
574         new JavaTestKit(getSystem()) {{
575             ShardTest.writeToStore(store.getDataTree(), TestModel.TEST_PATH,
576                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
577
578             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store.getDataTree(),
579                     YangInstanceIdentifier.builder().build());
580
581             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
582                     "testOnReceiveCreateSnapshot");
583
584             watch(transaction);
585
586             transaction.tell(CreateSnapshot.INSTANCE, getRef());
587
588             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
589
590             assertNotNull("getSnapshot is null", reply.getSnapshot());
591
592             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
593                     reply.getSnapshot());
594
595             assertEquals("Root node", expectedRoot, actualRoot);
596
597             expectTerminated(duration("3 seconds"), transaction);
598         }};
599     }
600
601     @Test
602     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
603         new JavaTestKit(getSystem()) {{
604             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
605                     "testReadWriteTxOnReceiveCloseTransaction");
606
607             watch(transaction);
608
609             transaction.tell(new CloseTransaction().toSerializable(), getRef());
610
611             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
612             expectTerminated(duration("3 seconds"), transaction);
613         }};
614     }
615
616     @Test
617     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
618         new JavaTestKit(getSystem()) {{
619             final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
620                     "testWriteTxOnReceiveCloseTransaction");
621
622             watch(transaction);
623
624             transaction.tell(new CloseTransaction().toSerializable(), getRef());
625
626             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
627             expectTerminated(duration("3 seconds"), transaction);
628         }};
629     }
630
631     @Test
632     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
633         new JavaTestKit(getSystem()) {{
634             final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
635                     "testReadOnlyTxOnReceiveCloseTransaction");
636
637             watch(transaction);
638
639             transaction.tell(new CloseTransaction().toSerializable(), getRef());
640
641             expectMsgClass(duration("3 seconds"), Terminated.class);
642         }};
643     }
644
645     @Test(expected=UnknownMessageException.class)
646     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
647         final ActorRef shard = createShard();
648         final Props props = ShardTransaction.props(TransactionType.READ_ONLY, readOnlyTransaction(), shard,
649                 datastoreContext, shardStats, "txn", DataStoreVersions.CURRENT_VERSION);
650         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
651
652         transaction.receive(new DeleteData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION).
653                 toSerializable(), ActorRef.noSender());
654     }
655
656     @Test
657     public void testShardTransactionInactivity() {
658
659         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
660                 500, TimeUnit.MILLISECONDS).build();
661
662         new JavaTestKit(getSystem()) {{
663             final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
664                     "testShardTransactionInactivity");
665
666             watch(transaction);
667
668             expectMsgClass(duration("3 seconds"), Terminated.class);
669         }};
670     }
671
672     public static class TestException extends RuntimeException {
673         private static final long serialVersionUID = 1L;
674     }
675 }