BUG 1712 - Distributed DataStore does not work properly with Transaction Chains
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / ShardTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Props;
6 import akka.event.Logging;
7 import akka.testkit.JavaTestKit;
8 import akka.testkit.TestActorRef;
9 import com.google.common.base.Optional;
10 import com.google.common.util.concurrent.CheckedFuture;
11 import com.google.common.util.concurrent.ListenableFuture;
12 import com.google.common.util.concurrent.MoreExecutors;
13 import org.junit.Assert;
14 import org.junit.Test;
15 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
16 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
18 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
19 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
20 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
21 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
22 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
23 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
24 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
25 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
29 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
30 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
31 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
32 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
39
40 import java.io.IOException;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.Map;
44 import java.util.concurrent.ExecutionException;
45
46 import static org.junit.Assert.assertEquals;
47 import static org.junit.Assert.assertFalse;
48 import static org.junit.Assert.assertTrue;
49
50 public class ShardTest extends AbstractActorTest {
51
52     private static final DatastoreContext DATA_STORE_CONTEXT = new DatastoreContext();
53
54     @Test
55     public void testOnReceiveRegisterListener() throws Exception {
56         new JavaTestKit(getSystem()) {{
57             final ShardIdentifier identifier =
58                 ShardIdentifier.builder().memberName("member-1")
59                     .shardName("inventory").type("config").build();
60
61             final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
62             final ActorRef subject =
63                 getSystem().actorOf(props, "testRegisterChangeListener");
64
65             new Within(duration("3 seconds")) {
66                 @Override
67                 protected void run() {
68
69                     subject.tell(
70                         new UpdateSchemaContext(SchemaContextHelper.full()),
71                         getRef());
72
73                     subject.tell(new RegisterChangeListener(TestModel.TEST_PATH,
74                         getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
75                         getRef());
76
77                     final Boolean notificationEnabled = new ExpectMsg<Boolean>(
78                                                    duration("3 seconds"), "enable notification") {
79                         // do not put code outside this method, will run afterwards
80                         @Override
81                         protected Boolean match(Object in) {
82                             if(in instanceof EnableNotification){
83                                 return ((EnableNotification) in).isEnabled();
84                             } else {
85                                 throw noMatch();
86                             }
87                         }
88                     }.get(); // this extracts the received message
89
90                     assertFalse(notificationEnabled);
91
92                     final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
93                         // do not put code outside this method, will run afterwards
94                         @Override
95                         protected String match(Object in) {
96                             if (in.getClass().equals(RegisterChangeListenerReply.class)) {
97                                 RegisterChangeListenerReply reply =
98                                     (RegisterChangeListenerReply) in;
99                                 return reply.getListenerRegistrationPath()
100                                     .toString();
101                             } else {
102                                 throw noMatch();
103                             }
104                         }
105                     }.get(); // this extracts the received message
106
107                     assertTrue(out.matches(
108                         "akka:\\/\\/test\\/user\\/testRegisterChangeListener\\/\\$.*"));
109                 }
110
111
112             };
113         }};
114     }
115
116     @Test
117     public void testCreateTransaction(){
118         new JavaTestKit(getSystem()) {{
119             final ShardIdentifier identifier =
120                 ShardIdentifier.builder().memberName("member-1")
121                     .shardName("inventory").type("config").build();
122
123             final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
124             final ActorRef subject =
125                 getSystem().actorOf(props, "testCreateTransaction");
126
127             // Wait for a specific log message to show up
128             final boolean result =
129                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
130                 ) {
131                     @Override
132                     protected Boolean run() {
133                         return true;
134                     }
135                 }.from(subject.path().toString())
136                     .message("Switching from state Candidate to Leader")
137                     .occurrences(1).exec();
138
139             Assert.assertEquals(true, result);
140
141             new Within(duration("3 seconds")) {
142                 @Override
143                 protected void run() {
144
145                     subject.tell(
146                         new UpdateSchemaContext(TestModel.createTestContext()),
147                         getRef());
148
149                     subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
150                         getRef());
151
152                     final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
153                         // do not put code outside this method, will run afterwards
154                         @Override
155                         protected String match(Object in) {
156                             if (in instanceof CreateTransactionReply) {
157                                 CreateTransactionReply reply =
158                                     (CreateTransactionReply) in;
159                                 return reply.getTransactionActorPath()
160                                     .toString();
161                             } else {
162                                 throw noMatch();
163                             }
164                         }
165                     }.get(); // this extracts the received message
166
167                     assertTrue("Unexpected transaction path " + out,
168                         out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
169                     expectNoMsg();
170                 }
171             };
172         }};
173     }
174
175     @Test
176     public void testCreateTransactionOnChain(){
177         new JavaTestKit(getSystem()) {{
178             final ShardIdentifier identifier =
179                 ShardIdentifier.builder().memberName("member-1")
180                     .shardName("inventory").type("config").build();
181
182             final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
183             final ActorRef subject =
184                 getSystem().actorOf(props, "testCreateTransactionOnChain");
185
186             // Wait for a specific log message to show up
187             final boolean result =
188                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
189                 ) {
190                     @Override
191                     protected Boolean run() {
192                         return true;
193                     }
194                 }.from(subject.path().toString())
195                     .message("Switching from state Candidate to Leader")
196                     .occurrences(1).exec();
197
198             Assert.assertEquals(true, result);
199
200             new Within(duration("3 seconds")) {
201                 @Override
202                 protected void run() {
203
204                     subject.tell(
205                         new UpdateSchemaContext(TestModel.createTestContext()),
206                         getRef());
207
208                     subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() , "foobar").toSerializable(),
209                         getRef());
210
211                     final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
212                         // do not put code outside this method, will run afterwards
213                         @Override
214                         protected String match(Object in) {
215                             if (in instanceof CreateTransactionReply) {
216                                 CreateTransactionReply reply =
217                                     (CreateTransactionReply) in;
218                                 return reply.getTransactionActorPath()
219                                     .toString();
220                             } else {
221                                 throw noMatch();
222                             }
223                         }
224                     }.get(); // this extracts the received message
225
226                     assertTrue("Unexpected transaction path " + out,
227                         out.contains("akka://test/user/testCreateTransactionOnChain/shard-txn-1"));
228                     expectNoMsg();
229                 }
230             };
231         }};
232     }
233
234     @Test
235     public void testPeerAddressResolved(){
236         new JavaTestKit(getSystem()) {{
237             Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
238
239             final ShardIdentifier identifier =
240                 ShardIdentifier.builder().memberName("member-1")
241                     .shardName("inventory").type("config").build();
242
243             peerAddresses.put(identifier, null);
244             final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
245             final ActorRef subject =
246                 getSystem().actorOf(props, "testPeerAddressResolved");
247
248             new Within(duration("3 seconds")) {
249                 @Override
250                 protected void run() {
251
252                     subject.tell(
253                         new PeerAddressResolved(identifier, "akka://foobar"),
254                         getRef());
255
256                     expectNoMsg();
257                 }
258             };
259         }};
260     }
261
262     @Test
263     public void testApplySnapshot() throws ExecutionException, InterruptedException {
264         Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
265
266         final ShardIdentifier identifier =
267             ShardIdentifier.builder().memberName("member-1")
268                 .shardName("inventory").type("config").build();
269
270         peerAddresses.put(identifier, null);
271         final Props props = Shard.props(identifier, peerAddresses, DATA_STORE_CONTEXT, TestModel.createTestContext());
272
273         TestActorRef<Shard> ref = TestActorRef.create(getSystem(), props);
274
275         ref.underlyingActor().updateSchemaContext(TestModel.createTestContext());
276
277         NormalizedNodeToNodeCodec codec =
278             new NormalizedNodeToNodeCodec(TestModel.createTestContext());
279
280         ref.underlyingActor().writeToStore(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
281
282         NormalizedNode expected = ref.underlyingActor().readStore();
283
284         NormalizedNodeMessages.Container encode = codec
285             .encode(YangInstanceIdentifier.builder().build(), expected);
286
287
288         ref.underlyingActor().applySnapshot(encode.getNormalizedNode().toByteString());
289
290         NormalizedNode actual = ref.underlyingActor().readStore();
291
292         assertEquals(expected, actual);
293     }
294
295     private static class ShardTestKit extends JavaTestKit {
296
297         private ShardTestKit(ActorSystem actorSystem) {
298             super(actorSystem);
299         }
300
301         protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){
302             // Wait for a specific log message to show up
303             final boolean result =
304                 new JavaTestKit.EventFilter<Boolean>(logLevel
305                 ) {
306                     @Override
307                     protected Boolean run() {
308                         return true;
309                     }
310                 }.from(subject.path().toString())
311                     .message(logMessage)
312                     .occurrences(1).exec();
313
314             Assert.assertEquals(true, result);
315
316         }
317
318     }
319
320     @Test
321     public void testCreateSnapshot() throws IOException, InterruptedException {
322         new ShardTestKit(getSystem()) {{
323             final ShardIdentifier identifier =
324                 ShardIdentifier.builder().memberName("member-1")
325                     .shardName("inventory").type("config").build();
326
327             final Props props = Shard.props(identifier, Collections.EMPTY_MAP, DATA_STORE_CONTEXT, TestModel.createTestContext());
328             final ActorRef subject =
329                 getSystem().actorOf(props, "testCreateSnapshot");
330
331             // Wait for a specific log message to show up
332             this.waitForLogMessage(Logging.Info.class, subject, "Switching from state Candidate to Leader");
333
334
335             new Within(duration("3 seconds")) {
336                 @Override
337                 protected void run() {
338
339                     subject.tell(
340                         new UpdateSchemaContext(TestModel.createTestContext()),
341                         getRef());
342
343                     subject.tell(new CaptureSnapshot(-1,-1,-1,-1),
344                         getRef());
345
346                     waitForLogMessage(Logging.Debug.class, subject, "CaptureSnapshotReply received by actor");
347                 }
348             };
349
350             Thread.sleep(2000);
351             deletePersistenceFiles();
352         }};
353     }
354
355     /**
356      * This test simply verifies that the applySnapShot logic will work
357      * @throws ReadFailedException
358      */
359     @Test
360     public void testInMemoryDataStoreRestore() throws ReadFailedException {
361         InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.listeningDecorator(
362             MoreExecutors.sameThreadExecutor()), MoreExecutors.sameThreadExecutor());
363
364         store.onGlobalContextUpdated(TestModel.createTestContext());
365
366         DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
367         putTransaction.write(TestModel.TEST_PATH,
368             ImmutableNodes.containerNode(TestModel.TEST_QNAME));
369         commitTransaction(putTransaction);
370
371
372         NormalizedNode expected = readStore(store);
373
374         DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
375
376         writeTransaction.delete(YangInstanceIdentifier.builder().build());
377         writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
378
379         commitTransaction(writeTransaction);
380
381         NormalizedNode actual = readStore(store);
382
383         assertEquals(expected, actual);
384
385     }
386
387     private NormalizedNode readStore(InMemoryDOMDataStore store) throws ReadFailedException {
388         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
389         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
390             transaction.read(YangInstanceIdentifier.builder().build());
391
392         Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
393
394         NormalizedNode<?, ?> normalizedNode = optional.get();
395
396         transaction.close();
397
398         return normalizedNode;
399     }
400
401     private void commitTransaction(DOMStoreWriteTransaction transaction) {
402         DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
403         ListenableFuture<Void> future =
404             commitCohort.preCommit();
405         try {
406             future.get();
407             future = commitCohort.commit();
408             future.get();
409         } catch (InterruptedException | ExecutionException e) {
410         }
411     }
412
413     private AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> noOpDataChangeListener() {
414         return new AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>() {
415             @Override
416             public void onDataChanged(
417                 AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
418
419             }
420         };
421     }
422 }