Merge "BUG 720 - YANG leaf as JSON input *<*:* couldn't be saved"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.event.Logging;
7 import akka.japi.Creator;
8 import akka.testkit.JavaTestKit;
9 import akka.testkit.TestActorRef;
10 import com.google.common.base.Optional;
11 import com.google.common.util.concurrent.CheckedFuture;
12 import com.google.common.util.concurrent.Futures;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import org.junit.After;
16 import org.junit.Assert;
17 import org.junit.Before;
18 import org.junit.Test;
19 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
20 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
21 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
23 import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
25 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
26 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
27 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
28 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
29 import org.opendaylight.controller.cluster.datastore.modification.Modification;
30 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
31 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
32 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
33 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
34 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
35 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
36 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
37 import org.opendaylight.controller.cluster.raft.Snapshot;
38 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
39 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
40 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
41 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
42 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
43 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
44 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
45 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
46 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
47 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
48 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
49 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
50 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
51 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
52 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
53 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
54 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
55 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
56 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
59 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
60 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
62 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
63 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
64 import scala.concurrent.duration.Duration;
65 import java.io.IOException;
66 import java.util.Collections;
67 import java.util.HashSet;
68 import java.util.Set;
69 import java.util.concurrent.CountDownLatch;
70 import java.util.concurrent.ExecutionException;
71 import java.util.concurrent.TimeUnit;
72 import static org.junit.Assert.assertEquals;
73 import static org.junit.Assert.assertNotNull;
74 import static org.junit.Assert.assertTrue;
75 import static org.junit.Assert.fail;
76 import static org.mockito.Mockito.mock;
77 import static org.mockito.Mockito.doReturn;
78 import static org.mockito.Mockito.verify;
79
80 public class ShardTest extends AbstractActorTest {
81
82     private static final DatastoreContext DATA_STORE_CONTEXT =
83             new DatastoreContext("", null, Duration.create(10, TimeUnit.MINUTES), 5, 3, 5000, 500);
84
85     private static final SchemaContext SCHEMA_CONTEXT = TestModel.createTestContext();
86
87     private static final ShardIdentifier IDENTIFIER = ShardIdentifier.builder().memberName("member-1")
88             .shardName("inventory").type("config").build();
89
90     @Before
91     public void setUp() {
92         System.setProperty("shard.persistent", "false");
93
94         InMemorySnapshotStore.clear();
95         InMemoryJournal.clear();
96     }
97
98     @After
99     public void tearDown() {
100         InMemorySnapshotStore.clear();
101         InMemoryJournal.clear();
102     }
103
104     private Props newShardProps() {
105         return Shard.props(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
106                 DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
107     }
108
109     @Test
110     public void testOnReceiveRegisterListener() throws Exception {
111         new JavaTestKit(getSystem()) {{
112             ActorRef subject = getSystem().actorOf(newShardProps(), "testRegisterChangeListener");
113
114             subject.tell(new UpdateSchemaContext(SchemaContextHelper.full()), getRef());
115
116             subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
117                     getRef().path(), AsyncDataBroker.DataChangeScope.BASE), getRef());
118
119             EnableNotification enable = expectMsgClass(duration("3 seconds"), EnableNotification.class);
120             assertEquals("isEnabled", false, enable.isEnabled());
121
122             RegisterChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
123                     RegisterChangeListenerReply.class);
124             assertTrue(reply.getListenerRegistrationPath().toString().matches(
125                     "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
126         }};
127     }
128
129     @Test
130     public void testCreateTransaction(){
131         new ShardTestKit(getSystem()) {{
132             ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransaction");
133
134             waitUntilLeader(subject);
135
136             subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
137
138             subject.tell(new CreateTransaction("txn-1",
139                     TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
140
141             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
142                     CreateTransactionReply.class);
143
144             String path = reply.getTransactionActorPath().toString();
145             assertTrue("Unexpected transaction path " + path,
146                     path.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
147             expectNoMsg();
148         }};
149     }
150
151     @Test
152     public void testCreateTransactionOnChain(){
153         new ShardTestKit(getSystem()) {{
154             final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateTransactionOnChain");
155
156             waitUntilLeader(subject);
157
158             subject.tell(new CreateTransaction("txn-1",
159                     TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
160                     getRef());
161
162             CreateTransactionReply reply = expectMsgClass(duration("3 seconds"),
163                     CreateTransactionReply.class);
164
165             String path = reply.getTransactionActorPath().toString();
166             assertTrue("Unexpected transaction path " + path,
167                     path.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
168             expectNoMsg();
169         }};
170     }
171
172     @Test
173     public void testPeerAddressResolved(){
174         new JavaTestKit(getSystem()) {{
175             final ShardIdentifier identifier =
176                 ShardIdentifier.builder().memberName("member-1")
177                     .shardName("inventory").type("config").build();
178
179             Props props = Shard.props(identifier,
180                     Collections.<ShardIdentifier, String>singletonMap(identifier, null),
181                     DATA_STORE_CONTEXT, SCHEMA_CONTEXT);
182             final ActorRef subject = getSystem().actorOf(props, "testPeerAddressResolved");
183
184             new Within(duration("3 seconds")) {
185                 @Override
186                 protected void run() {
187
188                     subject.tell(
189                         new PeerAddressResolved(identifier, "akka://foobar"),
190                         getRef());
191
192                     expectNoMsg();
193                 }
194             };
195         }};
196     }
197
198     @Test
199     public void testApplySnapshot() throws ExecutionException, InterruptedException {
200         TestActorRef<Shard> ref = TestActorRef.create(getSystem(), newShardProps());
201
202         NormalizedNodeToNodeCodec codec =
203             new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
204
205         ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(
206                 TestModel.TEST_QNAME));
207
208         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
209         NormalizedNode<?,?> expected = ref.underlyingActor().readStore(root);
210
211         NormalizedNodeMessages.Container encode = codec.encode(expected);
212
213         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
214                 encode.getNormalizedNode().toByteString().toByteArray(),
215                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
216
217         ref.underlyingActor().onReceiveCommand(applySnapshot);
218
219         NormalizedNode<?,?> actual = ref.underlyingActor().readStore(root);
220
221         assertEquals(expected, actual);
222     }
223
224     @Test
225     public void testApplyState() throws Exception {
226
227         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
228
229         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
230
231         MutableCompositeModification compMod = new MutableCompositeModification();
232         compMod.addModification(new WriteModification(TestModel.TEST_PATH, node, SCHEMA_CONTEXT));
233         Payload payload = new CompositeModificationPayload(compMod.toSerializable());
234         ApplyState applyState = new ApplyState(null, "test",
235                 new ReplicatedLogImplEntry(1, 2, payload));
236
237         shard.underlyingActor().onReceiveCommand(applyState);
238
239         NormalizedNode<?,?> actual = shard.underlyingActor().readStore(TestModel.TEST_PATH);
240         assertEquals("Applied state", node, actual);
241     }
242
243     @SuppressWarnings("serial")
244     @Test
245     public void testRecovery() throws Exception {
246
247         // Set up the InMemorySnapshotStore.
248
249         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
250         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
251
252         DOMStoreWriteTransaction writeTx = testStore.newWriteOnlyTransaction();
253         writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
254         DOMStoreThreePhaseCommitCohort commitCohort = writeTx.ready();
255         commitCohort.preCommit().get();
256         commitCohort.commit().get();
257
258         DOMStoreReadTransaction readTx = testStore.newReadOnlyTransaction();
259         NormalizedNode<?, ?> root = readTx.read(YangInstanceIdentifier.builder().build()).get().get();
260
261         InMemorySnapshotStore.addSnapshot(IDENTIFIER.toString(), Snapshot.create(
262                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(
263                         root).
264                                 getNormalizedNode().toByteString().toByteArray(),
265                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
266
267         // Set up the InMemoryJournal.
268
269         InMemoryJournal.addEntry(IDENTIFIER.toString(), 0, new ReplicatedLogImplEntry(0, 1, newPayload(
270                   new WriteModification(TestModel.OUTER_LIST_PATH,
271                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
272                           SCHEMA_CONTEXT))));
273
274         int nListEntries = 11;
275         Set<Integer> listEntryKeys = new HashSet<>();
276         for(int i = 1; i <= nListEntries; i++) {
277             listEntryKeys.add(Integer.valueOf(i));
278             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
279                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
280             Modification mod = new MergeModification(path,
281                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i),
282                     SCHEMA_CONTEXT);
283             InMemoryJournal.addEntry(IDENTIFIER.toString(), i, new ReplicatedLogImplEntry(i, 1,
284                     newPayload(mod)));
285         }
286
287         InMemoryJournal.addEntry(IDENTIFIER.toString(), nListEntries + 1,
288                 new ApplyLogEntries(nListEntries));
289
290         // Create the actor and wait for recovery complete.
291
292         final CountDownLatch recoveryComplete = new CountDownLatch(1);
293
294         Creator<Shard> creator = new Creator<Shard>() {
295             @Override
296             public Shard create() throws Exception {
297                 return new Shard(IDENTIFIER, Collections.<ShardIdentifier,String>emptyMap(),
298                         DATA_STORE_CONTEXT, SCHEMA_CONTEXT) {
299                     @Override
300                     protected void onRecoveryComplete() {
301                         try {
302                             super.onRecoveryComplete();
303                         } finally {
304                             recoveryComplete.countDown();
305                         }
306                     }
307                 };
308             }
309         };
310
311         TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
312                 Props.create(new DelegatingShardCreator(creator)), "testRecovery");
313
314         assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
315
316         // Verify data in the data store.
317
318         NormalizedNode<?, ?> outerList = shard.underlyingActor().readStore(TestModel.OUTER_LIST_PATH);
319         assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
320         assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
321                 outerList.getValue() instanceof Iterable);
322         for(Object entry: (Iterable<?>) outerList.getValue()) {
323             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
324                     entry instanceof MapEntryNode);
325             MapEntryNode mapEntry = (MapEntryNode)entry;
326             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
327                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
328             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
329             Object value = idLeaf.get().getValue();
330             assertTrue("Unexpected value for leaf "+ TestModel.ID_QNAME.getLocalName() + ": " + value,
331                     listEntryKeys.remove(value));
332         }
333
334         if(!listEntryKeys.isEmpty()) {
335             fail("Missing " + TestModel.OUTER_LIST_QNAME.getLocalName() + " entries with keys: " +
336                     listEntryKeys);
337         }
338
339         assertEquals("Last log index", nListEntries,
340                 shard.underlyingActor().getShardMBean().getLastLogIndex());
341         assertEquals("Commit index", nListEntries,
342                 shard.underlyingActor().getShardMBean().getCommitIndex());
343         assertEquals("Last applied", nListEntries,
344                 shard.underlyingActor().getShardMBean().getLastApplied());
345     }
346
347     private CompositeModificationPayload newPayload(Modification... mods) {
348         MutableCompositeModification compMod = new MutableCompositeModification();
349         for(Modification mod: mods) {
350             compMod.addModification(mod);
351         }
352
353         return new CompositeModificationPayload(compMod.toSerializable());
354     }
355
356     @SuppressWarnings("unchecked")
357     @Test
358     public void testForwardedCommitTransactionWithPersistence() throws IOException {
359         System.setProperty("shard.persistent", "true");
360
361         new ShardTestKit(getSystem()) {{
362             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps());
363
364             waitUntilLeader(shard);
365
366             NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
367
368             DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class);
369             doReturn(Futures.immediateFuture(null)).when(cohort).commit();
370
371             MutableCompositeModification modification = new MutableCompositeModification();
372             modification.addModification(new WriteModification(TestModel.TEST_PATH, node,
373                     SCHEMA_CONTEXT));
374
375             shard.tell(new ForwardedCommitTransaction(cohort, modification), getRef());
376
377             expectMsgClass(duration("5 seconds"), CommitTransactionReply.SERIALIZABLE_CLASS);
378
379             verify(cohort).commit();
380
381             assertEquals("Last log index", 0, shard.underlyingActor().getShardMBean().getLastLogIndex());
382         }};
383     }
384
385     @Test
386     public void testCreateSnapshot() throws IOException, InterruptedException {
387         new ShardTestKit(getSystem()) {{
388             final ActorRef subject = getSystem().actorOf(newShardProps(), "testCreateSnapshot");
389
390             waitUntilLeader(subject);
391
392             subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
393
394             waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
395
396             subject.tell(new CaptureSnapshot(-1,-1,-1,-1), getRef());
397
398             waitForLogMessage(Logging.Info.class, subject, "CaptureSnapshotReply received by actor");
399         }};
400     }
401
402     /**
403      * This test simply verifies that the applySnapShot logic will work
404      * @throws ReadFailedException
405      */
406     @Test
407     public void testInMemoryDataStoreRestore() throws ReadFailedException {
408         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
409             MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
410
411         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
412
413         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
414         putTransaction.write(TestModel.TEST_PATH,
415             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
416         commitTransaction(putTransaction);
417
418
419         NormalizedNode expected = readStore(store);
420
421         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
422
423         writeTransaction.delete(YangInstanceIdentifier.builder().build());
424         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
425
426         commitTransaction(writeTransaction);
427
428         NormalizedNode actual = readStore(store);
429
430         assertEquals(expected, actual);
431
432     }
433
434     private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
435         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
436         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
437             transaction.read(YangInstanceIdentifier.builder().build());
438
439         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
440
441         NormalizedNode<?, ?> normalizedNode = optional.get();
442
443         transaction.close();
444
445         return normalizedNode;
446     }
447
448     private void commitTransaction(DOMStoreWriteTransaction transaction) {
449         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
450         ListenableFuture<Void> future =
451             commitCohort.preCommit();
452         try {
453             future.get();
454             future = commitCohort.commit();
455             future.get();
456         } catch (InterruptedException | ExecutionException e) {
457         }
458     }
459
460     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
461         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
462             @Override
463             public void onDataChanged(
464                 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
465
466             }
467         };
468     }
469
470     private static final class DelegatingShardCreator implements Creator<Shard> {
471         private final Creator<Shard> delegate;
472
473         DelegatingShardCreator(Creator<Shard> delegate) {
474             this.delegate = delegate;
475         }
476
477         @Override
478         public Shard create() throws Exception {
479             return delegate.create();
480         }
481     }
482
483     private static class ShardTestKit extends JavaTestKit {
484
485         private ShardTestKit(ActorSystem actorSystem) {
486             super(actorSystem);
487         }
488
489         protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
490             // Wait for a specific log message to show up
491             final boolean result =
492                 new JavaTestKit.EventFilter<Boolean>(logLevel
493                 ) {
494                     @Override
495                     protected Boolean run() {
496                         return true;
497                     }
498                 }.from(subject.path().toString())
499                     .message(logMessage)
500                     .occurrences(1).exec();
501
502             Assert.assertEquals(true, result);
503
504         }
505
506         protected void waitUntilLeader(ActorRef subject) {
507             waitForLogMessage(Logging.Info.class, subject,
508                     "Switching from state Candidate to Leader");
509         }
510     }
511 }