Merge "Bug 2265: Use new NormalizedNode streaming in messages"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTransactionTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertFalse;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertTrue;
7 import akka.actor.ActorRef;
8 import akka.actor.Props;
9 import akka.actor.Terminated;
10 import akka.testkit.JavaTestKit;
11 import akka.testkit.TestActorRef;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import java.util.Collections;
14 import java.util.concurrent.TimeUnit;
15 import org.junit.BeforeClass;
16 import org.junit.Test;
17 import org.opendaylight.controller.cluster.datastore.ShardWriteTransaction.GetCompositeModificationReply;
18 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
19 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
26 import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
27 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
28 import org.opendaylight.controller.cluster.datastore.messages.MergeDataReply;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
33 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
34 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
35 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
36 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
37 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
38 import org.opendaylight.controller.cluster.datastore.modification.Modification;
39 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
40 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
41 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec.Encoded;
42 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
43 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
44 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
47 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
48 import scala.concurrent.duration.Duration;
49
50 public class ShardTransactionTest extends AbstractActorTest {
51     private static final InMemoryDOMDataStore store =
52         new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
53
54     private static final SchemaContext testSchemaContext = TestModel.createTestContext();
55
56     private static final ShardIdentifier SHARD_IDENTIFIER =
57         ShardIdentifier.builder().memberName("member-1")
58             .shardName("inventory").type("config").build();
59
60     private DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
61
62     private final ShardStats shardStats = new ShardStats(SHARD_IDENTIFIER.toString(), "DataStore");
63
64     @BeforeClass
65     public static void staticSetup() {
66         store.onGlobalContextUpdated(testSchemaContext);
67     }
68
69     private ActorRef createShard(){
70         return getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
71             Collections.<ShardIdentifier, String>emptyMap(), datastoreContext, TestModel.createTestContext()));
72     }
73
74     @Test
75     public void testOnReceiveReadData() throws Exception {
76         new JavaTestKit(getSystem()) {{
77             final ActorRef shard = createShard();
78             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
79                     testSchemaContext, datastoreContext, shardStats, "txn",
80                     DataStoreVersions.CURRENT_VERSION);
81
82             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRO"));
83
84             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
85                     testSchemaContext, datastoreContext, shardStats, "txn",
86                     DataStoreVersions.CURRENT_VERSION);
87
88             testOnReceiveReadData(getSystem().actorOf(props, "testReadDataRW"));
89         }
90
91         private void testOnReceiveReadData(final ActorRef transaction) {
92             //serialized read
93             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
94                 getRef());
95
96             Object replySerialized =
97                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
98
99             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
100
101             // unserialized read
102             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()),getRef());
103
104             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
105
106             assertNotNull(reply.getNormalizedNode());
107         }};
108     }
109
110     @Test
111     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
112         new JavaTestKit(getSystem()) {{
113             final ActorRef shard = createShard();
114             Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
115                     testSchemaContext, datastoreContext, shardStats, "txn",
116                     DataStoreVersions.CURRENT_VERSION);
117
118             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
119                     props, "testReadDataWhenDataNotFoundRO"));
120
121             props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
122                     testSchemaContext, datastoreContext, shardStats, "txn",
123                     DataStoreVersions.CURRENT_VERSION);
124
125             testOnReceiveReadDataWhenDataNotFound(getSystem().actorOf(
126                     props, "testReadDataWhenDataNotFoundRW"));
127         }
128
129         private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
130             // serialized read
131             transaction.tell(new ReadData(TestModel.TEST_PATH).toSerializable(), getRef());
132
133             Object replySerialized =
134                     expectMsgClass(duration("5 seconds"), ReadDataReply.SERIALIZABLE_CLASS);
135
136             assertTrue(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode() == null);
137
138             // unserialized read
139             transaction.tell(new ReadData(TestModel.TEST_PATH),getRef());
140
141             ReadDataReply reply = expectMsgClass(duration("5 seconds"), ReadDataReply.class);
142
143             assertTrue(reply.getNormalizedNode() == null);
144         }};
145     }
146
147     @Test
148     public void testOnReceiveReadDataHeliumR1() throws Exception {
149         new JavaTestKit(getSystem()) {{
150             final ActorRef shard = createShard();
151             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
152                     testSchemaContext, datastoreContext, shardStats, "txn",
153                     DataStoreVersions.HELIUM_1_VERSION);
154
155             ActorRef transaction = getSystem().actorOf(props, "testOnReceiveReadDataHeliumR1");
156
157             transaction.tell(new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(),
158                     getRef());
159
160             ShardTransactionMessages.ReadDataReply replySerialized =
161                     expectMsgClass(duration("5 seconds"), ShardTransactionMessages.ReadDataReply.class);
162
163             assertNotNull(ReadDataReply.fromSerializable(replySerialized).getNormalizedNode());
164         }};
165     }
166
167     @Test
168     public void testOnReceiveDataExistsPositive() throws Exception {
169         new JavaTestKit(getSystem()) {{
170             final ActorRef shard = createShard();
171             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
172                     testSchemaContext, datastoreContext, shardStats, "txn",
173                     DataStoreVersions.CURRENT_VERSION);
174
175             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRO"));
176
177             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
178                     testSchemaContext, datastoreContext, shardStats, "txn",
179                     DataStoreVersions.CURRENT_VERSION);
180
181             testOnReceiveDataExistsPositive(getSystem().actorOf(props, "testDataExistsPositiveRW"));
182         }
183
184         private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
185             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()).toSerializable(),
186                 getRef());
187
188             ShardTransactionMessages.DataExistsReply replySerialized =
189                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
190
191             assertTrue(DataExistsReply.fromSerializable(replySerialized).exists());
192
193             // unserialized read
194             transaction.tell(new DataExists(YangInstanceIdentifier.builder().build()),getRef());
195
196             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
197
198             assertTrue(reply.exists());
199         }};
200     }
201
202     @Test
203     public void testOnReceiveDataExistsNegative() throws Exception {
204         new JavaTestKit(getSystem()) {{
205             final ActorRef shard = createShard();
206             Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
207                     testSchemaContext, datastoreContext, shardStats, "txn",
208                     DataStoreVersions.CURRENT_VERSION);
209
210             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRO"));
211
212             props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
213                     testSchemaContext, datastoreContext, shardStats, "txn",
214                     DataStoreVersions.CURRENT_VERSION);
215
216             testOnReceiveDataExistsNegative(getSystem().actorOf(props, "testDataExistsNegativeRW"));
217         }
218
219         private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
220             transaction.tell(new DataExists(TestModel.TEST_PATH).toSerializable(), getRef());
221
222             ShardTransactionMessages.DataExistsReply replySerialized =
223                 expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DataExistsReply.class);
224
225             assertFalse(DataExistsReply.fromSerializable(replySerialized).exists());
226
227             // unserialized read
228             transaction.tell(new DataExists(TestModel.TEST_PATH),getRef());
229
230             DataExistsReply reply = expectMsgClass(duration("5 seconds"), DataExistsReply.class);
231
232             assertFalse(reply.exists());
233         }};
234     }
235
236     private void assertModification(final ActorRef subject,
237         final Class<? extends Modification> modificationType) {
238         new JavaTestKit(getSystem()) {{
239             subject.tell(new ShardWriteTransaction.GetCompositedModification(), getRef());
240
241             CompositeModification compositeModification = expectMsgClass(duration("3 seconds"),
242                     GetCompositeModificationReply.class).getModification();
243
244             assertTrue(compositeModification.getModifications().size() == 1);
245             assertEquals(modificationType, compositeModification.getModifications().get(0).getClass());
246         }};
247     }
248
249     @Test
250     public void testOnReceiveWriteData() throws Exception {
251         new JavaTestKit(getSystem()) {{
252             final ActorRef shard = createShard();
253             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
254                     testSchemaContext, datastoreContext, shardStats, "txn",
255                     DataStoreVersions.CURRENT_VERSION);
256             final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveWriteData");
257
258             transaction.tell(new WriteData(TestModel.TEST_PATH,
259                     ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
260                             DataStoreVersions.HELIUM_2_VERSION), getRef());
261
262             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
263
264             assertModification(transaction, WriteModification.class);
265
266             // unserialized write
267             transaction.tell(new WriteData(TestModel.TEST_PATH,
268                 ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
269                 getRef());
270
271             expectMsgClass(duration("5 seconds"), WriteDataReply.class);
272         }};
273     }
274
275     @Test
276     public void testOnReceiveHeliumR1WriteData() throws Exception {
277         new JavaTestKit(getSystem()) {{
278             final ActorRef shard = createShard();
279             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
280                     testSchemaContext, datastoreContext, shardStats, "txn",
281                     DataStoreVersions.HELIUM_1_VERSION);
282             final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1WriteData");
283
284             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
285                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
286             ShardTransactionMessages.WriteData serialized = ShardTransactionMessages.WriteData.newBuilder()
287                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
288                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
289
290             transaction.tell(serialized, getRef());
291
292             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.WriteDataReply.class);
293
294             assertModification(transaction, WriteModification.class);
295         }};
296     }
297
298     @Test
299     public void testOnReceiveMergeData() throws Exception {
300         new JavaTestKit(getSystem()) {{
301             final ActorRef shard = createShard();
302             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
303                     testSchemaContext, datastoreContext, shardStats, "txn",
304                     DataStoreVersions.CURRENT_VERSION);
305             final ActorRef transaction = getSystem().actorOf(props, "testMergeData");
306
307             transaction.tell(new MergeData(TestModel.TEST_PATH,
308                     ImmutableNodes.containerNode(TestModel.TEST_QNAME)).toSerializable(
309                             DataStoreVersions.HELIUM_2_VERSION), getRef());
310
311             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
312
313             assertModification(transaction, MergeModification.class);
314
315             //unserialized merge
316             transaction.tell(new MergeData(TestModel.TEST_PATH,
317                 ImmutableNodes.containerNode(TestModel.TEST_QNAME)),
318                 getRef());
319
320             expectMsgClass(duration("5 seconds"), MergeDataReply.class);
321         }};
322     }
323
324     @Test
325     public void testOnReceiveHeliumR1MergeData() throws Exception {
326         new JavaTestKit(getSystem()) {{
327             final ActorRef shard = createShard();
328             final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
329                     testSchemaContext, datastoreContext, shardStats, "txn",
330                     DataStoreVersions.HELIUM_1_VERSION);
331             final ActorRef transaction = getSystem().actorOf(props, "testOnReceiveHeliumR1MergeData");
332
333             Encoded encoded = new NormalizedNodeToNodeCodec(null).encode(TestModel.TEST_PATH,
334                     ImmutableNodes.containerNode(TestModel.TEST_QNAME));
335             ShardTransactionMessages.MergeData serialized = ShardTransactionMessages.MergeData.newBuilder()
336                     .setInstanceIdentifierPathArguments(encoded.getEncodedPath())
337                     .setNormalizedNode(encoded.getEncodedNode().getNormalizedNode()).build();
338
339             transaction.tell(serialized, getRef());
340
341             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.MergeDataReply.class);
342
343             assertModification(transaction, MergeModification.class);
344         }};
345     }
346
347     @Test
348     public void testOnReceiveDeleteData() throws Exception {
349         new JavaTestKit(getSystem()) {{
350             final ActorRef shard = createShard();
351             final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
352                     testSchemaContext, datastoreContext, shardStats, "txn",
353                     DataStoreVersions.CURRENT_VERSION);
354             final ActorRef transaction = getSystem().actorOf(props, "testDeleteData");
355
356             transaction.tell(new DeleteData(TestModel.TEST_PATH).toSerializable(), getRef());
357
358             expectMsgClass(duration("5 seconds"), ShardTransactionMessages.DeleteDataReply.class);
359
360             assertModification(transaction, DeleteModification.class);
361
362             //unserialized merge
363             transaction.tell(new DeleteData(TestModel.TEST_PATH), getRef());
364
365             expectMsgClass(duration("5 seconds"), DeleteDataReply.class);
366         }};
367     }
368
369
370     @Test
371     public void testOnReceiveReadyTransaction() throws Exception {
372         new JavaTestKit(getSystem()) {{
373             final ActorRef shard = createShard();
374             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
375                     testSchemaContext, datastoreContext, shardStats, "txn",
376                     DataStoreVersions.CURRENT_VERSION);
377             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction");
378
379             watch(transaction);
380
381             transaction.tell(new ReadyTransaction().toSerializable(), getRef());
382
383             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
384                     Terminated.class);
385             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.SERIALIZABLE_CLASS,
386                     Terminated.class);
387         }};
388
389         // test
390         new JavaTestKit(getSystem()) {{
391             final ActorRef shard = createShard();
392             final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
393                 testSchemaContext, datastoreContext, shardStats, "txn",
394                 DataStoreVersions.CURRENT_VERSION);
395             final ActorRef transaction = getSystem().actorOf(props, "testReadyTransaction2");
396
397             watch(transaction);
398
399             transaction.tell(new ReadyTransaction(), getRef());
400
401             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
402                     Terminated.class);
403             expectMsgAnyClassOf(duration("5 seconds"), ReadyTransactionReply.class,
404                     Terminated.class);
405         }};
406
407     }
408
409     @SuppressWarnings("unchecked")
410     @Test
411     public void testOnReceiveCloseTransaction() throws Exception {
412         new JavaTestKit(getSystem()) {{
413             final ActorRef shard = createShard();
414             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
415                     testSchemaContext, datastoreContext, shardStats, "txn",
416                     DataStoreVersions.CURRENT_VERSION);
417             final ActorRef transaction = getSystem().actorOf(props, "testCloseTransaction");
418
419             watch(transaction);
420
421             transaction.tell(new CloseTransaction().toSerializable(), getRef());
422
423             expectMsgClass(duration("3 seconds"), CloseTransactionReply.SERIALIZABLE_CLASS);
424             expectTerminated(duration("3 seconds"), transaction);
425         }};
426     }
427
428     @Test(expected=UnknownMessageException.class)
429     public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
430         final ActorRef shard = createShard();
431         final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
432                 testSchemaContext, datastoreContext, shardStats, "txn",
433                 DataStoreVersions.CURRENT_VERSION);
434         final TestActorRef<ShardTransaction> transaction = TestActorRef.apply(props,getSystem());
435
436         transaction.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
437     }
438
439     @Test
440     public void testShardTransactionInactivity() {
441
442         datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
443                 Duration.create(500, TimeUnit.MILLISECONDS)).build();
444
445         new JavaTestKit(getSystem()) {{
446             final ActorRef shard = createShard();
447             final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
448                     testSchemaContext, datastoreContext, shardStats, "txn",
449                     DataStoreVersions.CURRENT_VERSION);
450             final ActorRef transaction =
451                 getSystem().actorOf(props, "testShardTransactionInactivity");
452
453             watch(transaction);
454
455             expectMsgClass(duration("3 seconds"), Terminated.class);
456         }};
457     }
458 }