Bump odlparent/yangtools/mdsal
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15
16 import akka.actor.ActorSystem;
17 import akka.actor.Address;
18 import akka.actor.AddressFromURIString;
19 import akka.cluster.Cluster;
20 import akka.testkit.javadsl.TestKit;
21 import com.google.common.base.Throwables;
22 import com.google.common.util.concurrent.FluentFuture;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import com.typesafe.config.ConfigFactory;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Optional;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.junit.runner.RunWith;
36 import org.junit.runners.Parameterized;
37 import org.junit.runners.Parameterized.Parameters;
38 import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
41 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
42 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
43 import org.opendaylight.mdsal.common.api.ReadFailedException;
44 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
45 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
46 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
47 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
51
52 @RunWith(Parameterized.class)
53 public class DistributedDataStoreIntegrationTest extends AbstractDistributedDataStoreIntegrationTest {
54
55     @Parameters(name = "{0}")
56     public static Collection<Object[]> data() {
57         return Arrays.asList(new Object[][] {
58                 { TestDistributedDataStore.class }, { TestClientBackedDataStore.class }
59         });
60     }
61
62     @Before
63     public void setUp() {
64         InMemorySnapshotStore.clear();
65         InMemoryJournal.clear();
66         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
67         Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
68         Cluster.get(system).join(member1Address);
69     }
70
71     @After
72     public void tearDown() {
73         TestKit.shutdownActorSystem(system, true);
74         system = null;
75     }
76
77     @SuppressWarnings("checkstyle:IllegalCatch")
78     private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
79             throws Exception {
80         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
81         final String shardName = "test-1";
82
83         // Setup the InMemoryJournal to block shard recovery to ensure
84         // the shard isn't
85         // initialized until we create and submit the write the Tx.
86         final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
87         final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
88         InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
89
90         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
91             testParameter, testName, false, shardName)) {
92
93             // Create the write Tx
94             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
95                     : dataStore.newReadWriteTransaction();
96             assertNotNull("newReadWriteTransaction returned null", writeTx);
97
98             // Do some modification operations and ready the Tx on a
99             // separate thread.
100             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
101                     .builder(TestModel.OUTER_LIST_PATH)
102                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
103
104             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
105             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
106             final CountDownLatch txReady = new CountDownLatch(1);
107             final Thread txThread = new Thread(() -> {
108                 try {
109                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
110
111                     writeTx.merge(TestModel.OUTER_LIST_PATH,
112                         ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
113                         .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
114                         .build());
115
116                     writeTx.write(listEntryPath,
117                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
118
119                     writeTx.delete(listEntryPath);
120
121                     txCohort.set(writeTx.ready());
122                 } catch (Exception e) {
123                     caughtEx.set(e);
124                 } finally {
125                     txReady.countDown();
126                 }
127             });
128
129             txThread.start();
130
131             // Wait for the Tx operations to complete.
132             final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
133             if (caughtEx.get() != null) {
134                 throw caughtEx.get();
135             }
136
137             assertTrue("Tx ready", done);
138
139             // At this point the Tx operations should be waiting for the
140             // shard to initialize so
141             // trigger the latch to let the shard recovery to continue.
142             blockRecoveryLatch.countDown();
143
144             // Wait for the Tx commit to complete.
145             testKit.doCommit(txCohort.get());
146
147             // Verify the data in the store
148             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
149
150             Optional<NormalizedNode> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
151             assertTrue("isPresent", optional.isPresent());
152
153             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
154             assertTrue("isPresent", optional.isPresent());
155
156             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
157             assertFalse("isPresent", optional.isPresent());
158         }
159     }
160
161     @Test
162     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
163         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
164         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
165     }
166
167     @Test
168     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
169         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
170     }
171
172     @Test
173     @SuppressWarnings("checkstyle:IllegalCatch")
174     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
175         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
176         final String testName = "testTransactionReadsWithShardNotInitiallyReady";
177         final String shardName = "test-1";
178
179         // Setup the InMemoryJournal to block shard recovery to ensure
180         // the shard isn't
181         // initialized until we create the Tx.
182         final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
183         final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
184         InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
185
186         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
187             testParameter, testName, false, shardName)) {
188
189             // Create the read-write Tx
190             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
191             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
192
193             // Do some reads on the Tx on a separate thread.
194             final AtomicReference<FluentFuture<Boolean>> txExistsFuture = new AtomicReference<>();
195             final AtomicReference<FluentFuture<Optional<NormalizedNode>>> txReadFuture = new AtomicReference<>();
196             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
197             final CountDownLatch txReadsDone = new CountDownLatch(1);
198             final Thread txThread = new Thread(() -> {
199                 try {
200                     readWriteTx.write(TestModel.TEST_PATH,
201                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
202
203                     txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
204
205                     txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
206                 } catch (Exception e) {
207                     caughtEx.set(e);
208                 } finally {
209                     txReadsDone.countDown();
210                 }
211             });
212
213             txThread.start();
214
215             // Wait for the Tx operations to complete.
216             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
217             if (caughtEx.get() != null) {
218                 throw caughtEx.get();
219             }
220
221             assertTrue("Tx reads done", done);
222
223             // At this point the Tx operations should be waiting for the
224             // shard to initialize so
225             // trigger the latch to let the shard recovery to continue.
226             blockRecoveryLatch.countDown();
227
228             // Wait for the reads to complete and verify.
229             assertEquals("exists", Boolean.TRUE, txExistsFuture.get().get(5, TimeUnit.SECONDS));
230             assertTrue("read", txReadFuture.get().get(5, TimeUnit.SECONDS).isPresent());
231
232             readWriteTx.close();
233         }
234     }
235
236     @Test(expected = NotInitializedException.class)
237     @SuppressWarnings("checkstyle:IllegalCatch")
238     public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
239         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
240         final String testName = "testTransactionCommitFailureWithShardNotInitialized";
241         final String shardName = "test-1";
242
243         // Set the shard initialization timeout low for the test.
244         datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
245
246         // Setup the InMemoryJournal to block shard recovery
247         // indefinitely.
248         final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
249         final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
250         InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
251
252         InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
253
254         final AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName);
255
256         // Create the write Tx
257         final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
258         assertNotNull("newReadWriteTransaction returned null", writeTx);
259
260         // Do some modifications and ready the Tx on a separate
261         // thread.
262         final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
263         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
264         final CountDownLatch txReady = new CountDownLatch(1);
265         final Thread txThread = new Thread(() -> {
266             try {
267                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
268
269                 txCohort.set(writeTx.ready());
270             } catch (Exception e) {
271                 caughtEx.set(e);
272             } finally {
273                 txReady.countDown();
274             }
275         });
276
277         txThread.start();
278
279         // Wait for the Tx operations to complete.
280         boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
281         if (caughtEx.get() != null) {
282             throw caughtEx.get();
283         }
284
285         assertTrue("Tx ready", done);
286
287         // Wait for the commit to complete. Since the shard never
288         // initialized, the Tx should
289         // have timed out and throw an appropriate exception cause.
290         try {
291             txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
292             fail("Expected NotInitializedException");
293         } catch (final Exception e) {
294             final Throwable root = Throwables.getRootCause(e);
295             Throwables.throwIfUnchecked(root);
296             throw new RuntimeException(root);
297         } finally {
298             blockRecoveryLatch.countDown();
299         }
300     }
301
302     @Test(expected = NotInitializedException.class)
303     @SuppressWarnings("checkstyle:IllegalCatch")
304     public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
305         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
306         final String testName = "testTransactionReadFailureWithShardNotInitialized";
307         final String shardName = "test-1";
308
309         // Set the shard initialization timeout low for the test.
310         datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
311
312         // Setup the InMemoryJournal to block shard recovery
313         // indefinitely.
314         final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
315         final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
316         InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
317
318         InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
319
320         try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
321
322             // Create the read-write Tx
323             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
324             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
325
326             // Do a read on the Tx on a separate thread.
327             final AtomicReference<FluentFuture<Optional<NormalizedNode>>> txReadFuture = new AtomicReference<>();
328             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
329             final CountDownLatch txReadDone = new CountDownLatch(1);
330             final Thread txThread = new Thread(() -> {
331                 try {
332                     readWriteTx.write(TestModel.TEST_PATH,
333                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
334
335                     txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
336
337                     readWriteTx.close();
338                 } catch (Exception e) {
339                     caughtEx.set(e);
340                 } finally {
341                     txReadDone.countDown();
342                 }
343             });
344
345             txThread.start();
346
347             // Wait for the Tx operations to complete.
348             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
349             if (caughtEx.get() != null) {
350                 throw caughtEx.get();
351             }
352
353             assertTrue("Tx read done", done);
354
355             // Wait for the read to complete. Since the shard never
356             // initialized, the Tx should
357             // have timed out and throw an appropriate exception cause.
358             try {
359                 txReadFuture.get().get(5, TimeUnit.SECONDS);
360             } catch (ExecutionException e) {
361                 assertTrue("Expected ReadFailedException cause: " + e.getCause(),
362                     e.getCause() instanceof ReadFailedException);
363                 final Throwable root = Throwables.getRootCause(e);
364                 Throwables.throwIfUnchecked(root);
365                 throw new RuntimeException(root);
366             } finally {
367                 blockRecoveryLatch.countDown();
368             }
369         }
370     }
371
372 }