Bug 1520 - Increased test coverage
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.event.Logging;
7 import akka.testkit.JavaTestKit;
8 import akka.testkit.TestActorRef;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import org.junit.Assert;
14 import org.junit.Test;
15 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
16 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChain;
18 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionChainReply;
19 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
20 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
21 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
22 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
23 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
24 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
25 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
26 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
27 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
30 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
31 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
32 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
33 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
34 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
41
42 import java.io.IOException;
43 import java.util.Collections;
44 import java.util.HashMap;
45 import java.util.Map;
46 import java.util.concurrent.ExecutionException;
47
48 import static org.junit.Assert.assertEquals;
49 import static org.junit.Assert.assertFalse;
50 import static org.junit.Assert.assertTrue;
51
52 public class ShardTest extends AbstractActorTest {
53
54     private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
55
56     @Test
57     public void testOnReceiveCreateTransactionChain() throws Exception {
58         new JavaTestKit(getSystem()) {{
59             final ShardIdentifier identifier =
60                 ShardIdentifier.builder().memberName("member-1")
61                     .shardName("inventory").type("config").build();
62
63             final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
64             final ActorRef subject =
65                 getSystem().actorOf(props, "testCreateTransactionChain");
66
67
68             // Wait for a specific log message to show up
69             final boolean result =
70                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
71                 ) {
72                     @Override
73                     protected Boolean run() {
74                         return true;
75                     }
76                 }.from(subject.path().toString())
77                     .message("Switching from state Candidate to Leader")
78                     .occurrences(1).exec();
79
80             Assert.assertEquals(true, result);
81
82             new Within(duration("3 seconds")) {
83                 @Override
84                 protected void run() {
85
86                     subject.tell(new CreateTransactionChain().toSerializable(), getRef());
87
88                     final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
89                         // do not put code outside this method, will run afterwards
90                         @Override
91                         protected String match(Object in) {
92                             if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
93                                 CreateTransactionChainReply reply =
94                                     CreateTransactionChainReply.fromSerializable(getSystem(),in);
95                                 return reply.getTransactionChainPath()
96                                     .toString();
97                             } else {
98                                 throw noMatch();
99                             }
100                         }
101                     }.get(); // this extracts the received message
102
103                     assertEquals("Unexpected transaction path " + out,
104                         "akka://test/user/testCreateTransactionChain/$a",
105                         out);
106
107                     expectNoMsg();
108                 }
109
110
111             };
112         }};
113     }
114
115     @Test
116     public void testOnReceiveRegisterListener() throws Exception {
117         new JavaTestKit(getSystem()) {{
118             final ShardIdentifier identifier =
119                 ShardIdentifier.builder().memberName("member-1")
120                     .shardName("inventory").type("config").build();
121
122             final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
123             final ActorRef subject =
124                 getSystem().actorOf(props, "testRegisterChangeListener");
125
126             new Within(duration("3 seconds")) {
127                 @Override
128                 protected void run() {
129
130                     subject.tell(
131                         new UpdateSchemaContext(SchemaContextHelper.full()),
132                         getRef());
133
134                     subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
135                         getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
136                         getRef());
137
138                     final Boolean notificationEnabled = new ExpectMsg<Boolean>(
139                                                    duration("3 seconds"), "enable notification") {
140                         // do not put code outside this method, will run afterwards
141                         @Override
142                         protected Boolean match(Object in) {
143                             if(in instanceof EnableNotification){
144                                 return ((EnableNotification) in).isEnabled();
145                             } else {
146                                 throw noMatch();
147                             }
148                         }
149                     }.get(); // this extracts the received message
150
151                     assertFalse(notificationEnabled);
152
153                     final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
154                         // do not put code outside this method, will run afterwards
155                         @Override
156                         protected String match(Object in) {
157                             if (in.getClass().equals(RegisterChangeListenerReply.class)) {
158                                 RegisterChangeListenerReply reply =
159                                     (RegisterChangeListenerReply) in;
160                                 return reply.getListenerRegistrationPath()
161                                     .toString();
162                             } else {
163                                 throw noMatch();
164                             }
165                         }
166                     }.get(); // this extracts the received message
167
168                     assertTrue(out.matches(
169                         "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
170                 }
171
172
173             };
174         }};
175     }
176
177     @Test
178     public void testCreateTransaction(){
179         new JavaTestKit(getSystem()) {{
180             final ShardIdentifier identifier =
181                 ShardIdentifier.builder().memberName("member-1")
182                     .shardName("inventory").type("config").build();
183
184             final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
185             final ActorRef subject =
186                 getSystem().actorOf(props, "testCreateTransaction");
187
188             // Wait for a specific log message to show up
189             final boolean result =
190                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
191                 ) {
192                     @Override
193                     protected Boolean run() {
194                         return true;
195                     }
196                 }.from(subject.path().toString())
197                     .message("Switching from state Candidate to Leader")
198                     .occurrences(1).exec();
199
200             Assert.assertEquals(true, result);
201
202             new Within(duration("3 seconds")) {
203                 @Override
204                 protected void run() {
205
206                     subject.tell(
207                         new UpdateSchemaContext(TestModel.createTestContext()),
208                         getRef());
209
210                     subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
211                         getRef());
212
213                     final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
214                         // do not put code outside this method, will run afterwards
215                         @Override
216                         protected String match(Object in) {
217                             if (in instanceof CreateTransactionReply) {
218                                 CreateTransactionReply reply =
219                                     (CreateTransactionReply) in;
220                                 return reply.getTransactionActorPath()
221                                     .toString();
222                             } else {
223                                 throw noMatch();
224                             }
225                         }
226                     }.get(); // this extracts the received message
227
228                     assertTrue("Unexpected transaction path " + out,
229                         out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
230                     expectNoMsg();
231                 }
232             };
233         }};
234     }
235
236     @Test
237     public void testPeerAddressResolved(){
238         new JavaTestKit(getSystem()) {{
239             Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
240
241             final ShardIdentifier identifier =
242                 ShardIdentifier.builder().memberName("member-1")
243                     .shardName("inventory").type("config").build();
244
245             peerAddresses.put(identifier, null);
246             final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
247             final ActorRef subject =
248                 getSystem().actorOf(props, "testPeerAddressResolved");
249
250             new Within(duration("3 seconds")) {
251                 @Override
252                 protected void run() {
253
254                     subject.tell(
255                         new PeerAddressResolved(identifier, "akka://foobar"),
256                         getRef());
257
258                     expectNoMsg();
259                 }
260             };
261         }};
262     }
263
264     @Test
265     public void testApplySnapshot() throws ExecutionException, InterruptedException {
266         Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
267
268         final ShardIdentifier identifier =
269             ShardIdentifier.builder().memberName("member-1")
270                 .shardName("inventory").type("config").build();
271
272         peerAddresses.put(identifier, null);
273         final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
274
275         TestActorRef<Shard> ref = TestActorRef.create(getSystem(), props);
276
277         ref.underlyingActor().updateSchemaContext(TestModel.createTestContext());
278
279         NormalizedNodeToNodeCodec codec =
280             new NormalizedNodeToNodeCodec(TestModel.createTestContext());
281
282         ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
283
284         NormalizedNode expected = ref.underlyingActor().readStore();
285
286         NormalizedNodeMessages.Container encode = codec
287             .encode(YangInstanceIdentifier.builder().build(), expected);
288
289
290         ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString());
291
292         NormalizedNode actual = ref.underlyingActor().readStore();
293
294         assertEquals(expected, actual);
295     }
296
297     private static class ShardTestKit extends JavaTestKit {
298
299         private ShardTestKit(ActorSystem actorSystem) {
300             super(actorSystem);
301         }
302
303         protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
304             // Wait for a specific log message to show up
305             final boolean result =
306                 new JavaTestKit.EventFilter<Boolean>(logLevel
307                 ) {
308                     @Override
309                     protected Boolean run() {
310                         return true;
311                     }
312                 }.from(subject.path().toString())
313                     .message(logMessage)
314                     .occurrences(1).exec();
315
316             Assert.assertEquals(true, result);
317
318         }
319
320     }
321
322     @Test
323     public void testCreateSnapshot() throws IOException, InterruptedException {
324         new ShardTestKit(getSystem()) {{
325             final ShardIdentifier identifier =
326                 ShardIdentifier.builder().memberName("member-1")
327                     .shardName("inventory").type("config").build();
328
329             final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
330             final ActorRef subject =
331                 getSystem().actorOf(props, "testCreateSnapshot");
332
333             // Wait for a specific log message to show up
334             this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader");
335
336
337             new Within(duration("3 seconds")) {
338                 @Override
339                 protected void run() {
340
341                     subject.tell(
342                         new UpdateSchemaContext(TestModel.createTestContext()),
343                         getRef());
344
345                     subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
346                         getRef());
347
348                     waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor");
349                 }
350             };
351
352             Thread.sleep(2000);
353             deletePersistenceFiles();
354         }};
355     }
356
357     /**
358      * This test simply verifies that the applySnapShot logic will work
359      * @throws ReadFailedException
360      */
361     @Test
362     public void testInMemoryDataStoreRestore() throws ReadFailedException {
363         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
364             MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
365
366         store.onGlobalContextUpdated(TestModel.createTestContext());
367
368         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
369         putTransaction.write(TestModel.TEST_PATH,
370             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
371         commitTransaction(putTransaction);
372
373
374         NormalizedNode expected = readStore(store);
375
376         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
377
378         writeTransaction.delete(YangInstanceIdentifier.builder().build());
379         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
380
381         commitTransaction(writeTransaction);
382
383         NormalizedNode actual = readStore(store);
384
385         assertEquals(expected, actual);
386
387     }
388
389     private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
390         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
391         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
392             transaction.read(YangInstanceIdentifier.builder().build());
393
394         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
395
396         NormalizedNode<?, ?> normalizedNode = optional.get();
397
398         transaction.close();
399
400         return normalizedNode;
401     }
402
403     private void commitTransaction(DOMStoreWriteTransaction transaction) {
404         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
405         ListenableFuture<Void> future =
406             commitCohort.preCommit();
407         try {
408             future.get();
409             future = commitCohort.commit();
410             future.get();
411         } catch (InterruptedException | ExecutionException e) {
412         }
413     }
414
415     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
416         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
417             @Override
418             public void onDataChanged(
419                 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
420
421             }
422         };
423     }
424 }