Merge "BUG-2310 - widen yang model for netconf-node."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.Before;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
18 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
19 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
25 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
27 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
28 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
29 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
35 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
36 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
37 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
38 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
39 import org.opendaylight.controller.cluster.datastore.modification.Modification;
40 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
41 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
42 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
43 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
44 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
45 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
46 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
47 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
48 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
52 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 import scala.concurrent.duration.Duration;
54
55 public class ShardTransactionTest extends AbstractActorTest {
56
57     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
58
59     private static final ShardIdentifier SHARD_IDENTIFIER =
60         ShardIdentifier.builder().memberName("member-1")
61             .shardName("inventory").type("config").build();
62
63     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
64
65     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
66
67     private final InMemoryDOMDataStore store =
68             new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
69
70     @Before
71     public void setup() {
72         store.onGlobalContextUpdated(testSchemaContext);
73     }
74
75     private ActorRef createShard(){
76         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
77             Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
78     }
79
80     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name) {
81         return newTransactionActor(transaction, name, DataStoreVersions.CURRENT_VERSION);
82     }
83
84     private ActorRef newTransactionActor(DOMStoreTransaction transaction, String name, short version) {
85         return newTransactionActor(transaction, null, name, version);
86     }
87
88     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name) {
89         return newTransactionActor(transaction, null, name, DataStoreVersions.CURRENT_VERSION);
90     }
91
92     private ActorRef newTransactionActor(DOMStoreTransaction transaction, ActorRef shard, String name,
93             short version) {
94         Props props = ShardTransaction.props(transaction, shard != null ? shard : createShard(),
95                 testSchemaContext, datastoreContext, shardStats, "txn", version);
96         return getSystem().actorOf(props, name);
97     }
98
99     @Test
100     public void testOnReceiveReadData() throws Exception {
101         new JavaTestKit(getSystem()) {{
102             final ActorRef shard = createShard();
103
104             testOnReceiveReadData(newTransactionActor(store.newReadOnlyTransaction(), shard, "testReadDataRO"));
105
106             testOnReceiveReadData(newTransactionActor(store.newReadWriteTransaction(), shard, "testReadDataRW"));
107         }
108
109         private void testOnReceiveReadData(final ActorRef transaction) {
110             //serialized read
111             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
112                 getRef());
113
114             Object replySerialized =
115                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
116
117             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
118
119             // unserialized read
120             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
121
122             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
123
124             assertNotNull(reply.getNormalizedNode());
125         }};
126     }
127
128     @Test
129     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
130         new JavaTestKit(getSystem()) {{
131             final ActorRef shard = createShard();
132
133             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
134                     store.newReadOnlyTransaction(), shard, "testReadDataWhenDataNotFoundRO"));
135
136             testOnReceiveReadDataWhenDataNotFound(newTransactionActor(
137                     store.newReadWriteTransaction(), shard, "testReadDataWhenDataNotFoundRW"));
138         }
139
140         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
141             // serialized read
142             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
143
144             Object replySerialized =
145                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
146
147             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
148
149             // unserialized read
150             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
151
152             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
153
154             assertTrue(reply.getNormalizedNode() == null);
155         }};
156     }
157
158     @Test
159     public void testOnReceiveReadDataHeliumR1() throws Exception {
160         new JavaTestKit(getSystem()) {{
161             ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
162                     "testOnReceiveReadDataHeliumR1", DataStoreVersions.HELIUM_1_VERSION);
163
164             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
165                     getRef());
166
167             ShardTransactionMessages.ReadDataReply replySerialized =
168                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
169
170             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
171         }};
172     }
173
174     @Test
175     public void testOnReceiveDataExistsPositive() throws Exception {
176         new JavaTestKit(getSystem()) {{
177             final ActorRef shard = createShard();
178
179             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadOnlyTransaction(), shard,
180                     "testDataExistsPositiveRO"));
181
182             testOnReceiveDataExistsPositive(newTransactionActor(store.newReadWriteTransaction(), shard,
183                     "testDataExistsPositiveRW"));
184         }
185
186         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
187             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
188                 getRef());
189
190             ShardTransactionMessages.DataExistsReply replySerialized =
191                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
192
193             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
194
195             // unserialized read
196             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
197
198             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
199
200             assertTrue(reply.exists());
201         }};
202     }
203
204     @Test
205     public void testOnReceiveDataExistsNegative() throws Exception {
206         new JavaTestKit(getSystem()) {{
207             final ActorRef shard = createShard();
208
209             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadOnlyTransaction(), shard,
210                     "testDataExistsNegativeRO"));
211
212             testOnReceiveDataExistsNegative(newTransactionActor(store.newReadWriteTransaction(), shard,
213                     "testDataExistsNegativeRW"));
214         }
215
216         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
217             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
218
219             ShardTransactionMessages.DataExistsReply replySerialized =
220                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
221
222             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
223
224             // unserialized read
225             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
226
227             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
228
229             assertFalse(reply.exists());
230         }};
231     }
232
233     private void assertModification(final ActorRef subject,
234         final Class<? extends Modification> modificationType) {
235         new JavaTestKit(getSystem()) {{
236             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
237
238             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
239                     GetCompositeModificationReply.class).getModification();
240
241             assertTrue(compositeModification.getModifications().size() == 1);
242             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
243         }};
244     }
245
246     @Test
247     public void testOnReceiveWriteData() throws Exception {
248         new JavaTestKit(getSystem()) {{
249             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
250                     "testOnReceiveWriteData");
251
252             transaction.tell(new WriteData(TestModel.TEST_PATH,
253                     ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
254                             DataStoreVersions.HELIUM_2_VERSION), getRef());
255
256             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
257
258             assertModification(transaction, WriteModification.class);
259
260             // unserialized write
261             transaction.tell(new WriteData(TestModel.TEST_PATH,
262                 ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
263                 getRef());
264
265             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
266         }};
267     }
268
269     @Test
270     public void testOnReceiveHeliumR1WriteData() throws Exception {
271         new JavaTestKit(getSystem()) {{
272             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
273                     "testOnReceiveHeliumR1WriteData", DataStoreVersions.HELIUM_1_VERSION);
274
275             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
276                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
277             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
278                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
279                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
280
281             transaction.tell(serialized, getRef());
282
283             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
284
285             assertModification(transaction, WriteModification.class);
286         }};
287     }
288
289     @Test
290     public void testOnReceiveMergeData() throws Exception {
291         new JavaTestKit(getSystem()) {{
292             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
293                     "testMergeData");
294
295             transaction.tell(new MergeData(TestModel.TEST_PATH,
296                     ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
297                             DataStoreVersions.HELIUM_2_VERSION), getRef());
298
299             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
300
301             assertModification(transaction, MergeModification.class);
302
303             //unserialized merge
304             transaction.tell(new MergeData(TestModel.TEST_PATH,
305                 ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
306                 getRef());
307
308             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
309         }};
310     }
311
312     @Test
313     public void testOnReceiveHeliumR1MergeData() throws Exception {
314         new JavaTestKit(getSystem()) {{
315             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
316                     "testOnReceiveHeliumR1MergeData", DataStoreVersions.HELIUM_1_VERSION);
317
318             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
319                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
320             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
321                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
322                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
323
324             transaction.tell(serialized, getRef());
325
326             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
327
328             assertModification(transaction, MergeModification.class);
329         }};
330     }
331
332     @Test
333     public void testOnReceiveDeleteData() throws Exception {
334         new JavaTestKit(getSystem()) {{
335             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
336                     "testDeleteData");
337
338             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(
339                     DataStoreVersions.HELIUM_2_VERSION), getRef());
340
341             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
342
343             assertModification(transaction, DeleteModification.class);
344
345             //unserialized
346             transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
347
348             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
349         }};
350     }
351
352
353     @Test
354     public void testOnReceiveReadyTransaction() throws Exception {
355         new JavaTestKit(getSystem()) {{
356             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
357                     "testReadyTransaction");
358
359             watch(transaction);
360
361             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
362
363             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
364                     Terminated.class);
365             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
366                     Terminated.class);
367         }};
368
369         // test
370         new JavaTestKit(getSystem()) {{
371             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
372                     "testReadyTransaction2");
373
374             watch(transaction);
375
376             transaction.tell(new ReadyTransaction(), getRef());
377
378             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
379                     Terminated.class);
380             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
381                     Terminated.class);
382         }};
383     }
384
385     @Test
386     public void testOnReceiveCreateSnapshot() throws Exception {
387         new JavaTestKit(getSystem()) {{
388             ShardTest.writeToStore(store, TestModel.TEST_PATH,
389                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
390
391             NormalizedNode<?,?> expectedRoot = ShardTest.readStore(store,
392                     YangInstanceIdentifier.builder().build());
393
394             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
395                     "testOnReceiveCreateSnapshot");
396
397             watch(transaction);
398
399             transaction.tell(CreateSnapshot.INSTANCE, getRef());
400
401             CaptureSnapshotReply reply = expectMsgClass(duration("3 seconds"), CaptureSnapshotReply.class);
402
403             assertNotNull("getSnapshot is null", reply.getSnapshot());
404
405             NormalizedNode<?,?> actualRoot = SerializationUtils.deserializeNormalizedNode(
406                     reply.getSnapshot());
407
408             assertEquals("Root node", expectedRoot, actualRoot);
409
410             expectTerminated(duration("3 seconds"), transaction);
411         }};
412     }
413
414     @Test
415     public void testReadWriteTxOnReceiveCloseTransaction() throws Exception {
416         new JavaTestKit(getSystem()) {{
417             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
418                     "testReadWriteTxOnReceiveCloseTransaction");
419
420             watch(transaction);
421
422             transaction.tell(new CloseTransaction().toSerializable(), getRef());
423
424             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
425             expectTerminated(duration("3 seconds"), transaction);
426         }};
427     }
428
429     @Test
430     public void testWriteOnlyTxOnReceiveCloseTransaction() throws Exception {
431         new JavaTestKit(getSystem()) {{
432             final ActorRef transaction = newTransactionActor(store.newWriteOnlyTransaction(),
433                     "testWriteTxOnReceiveCloseTransaction");
434
435             watch(transaction);
436
437             transaction.tell(new CloseTransaction().toSerializable(), getRef());
438
439             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
440             expectTerminated(duration("3 seconds"), transaction);
441         }};
442     }
443
444     @Test
445     public void testReadOnlyTxOnReceiveCloseTransaction() throws Exception {
446         new JavaTestKit(getSystem()) {{
447             final ActorRef transaction = newTransactionActor(store.newReadOnlyTransaction(),
448                     "testReadOnlyTxOnReceiveCloseTransaction");
449
450             watch(transaction);
451
452             transaction.tell(new CloseTransaction().toSerializable(), getRef());
453
454             expectMsgClass(duration("3 seconds"), Terminated.class);
455         }};
456     }
457
458     @Test(expected=UnknownMessageException.class)
459     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
460         final ActorRef shard = createShard();
461         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
462                 testSchemaContext, datastoreContext, shardStats, "txn",
463                 DataStoreVersions.CURRENT_VERSION);
464         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
465
466         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(
467                 DataStoreVersions.CURRENT_VERSION), ActorRef.noSender());
468     }
469
470     @Test
471     public void testShardTransactionInactivity() {
472
473         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
474                 Duration.create(500, TimeUnit.MILLISECONDS)).build();
475
476         new JavaTestKit(getSystem()) {{
477             final ActorRef transaction = newTransactionActor(store.newReadWriteTransaction(),
478                     "testShardTransactionInactivity");
479
480             watch(transaction);
481
482             expectMsgClass(duration("3 seconds"), Terminated.class);
483         }};
484     }
485 }