Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
15
16 import akka.actor.ActorSystem;
17 import akka.actor.Address;
18 import akka.actor.AddressFromURIString;
19 import akka.cluster.Cluster;
20 import akka.testkit.javadsl.TestKit;
21 import com.google.common.base.Throwables;
22 import com.google.common.util.concurrent.FluentFuture;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import com.typesafe.config.ConfigFactory;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Optional;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.junit.runner.RunWith;
36 import org.junit.runners.Parameterized;
37 import org.junit.runners.Parameterized.Parameters;
38 import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
41 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
42 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
43 import org.opendaylight.mdsal.common.api.ReadFailedException;
44 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
45 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
46 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
47 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
51
52 @RunWith(Parameterized.class)
53 public class DistributedDataStoreIntegrationTest extends AbstractDistributedDataStoreIntegrationTest {
54
55     @Parameters(name = "{0}")
56     public static Collection<Object[]> data() {
57         return Arrays.asList(new Object[][] {
58             { TestClientBackedDataStore.class }
59         });
60     }
61
62     @Before
63     public void setUp() {
64         InMemorySnapshotStore.clear();
65         InMemoryJournal.clear();
66         system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
67         Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
68         Cluster.get(system).join(member1Address);
69     }
70
71     @After
72     public void tearDown() {
73         TestKit.shutdownActorSystem(system, true);
74         system = null;
75     }
76
77     @SuppressWarnings("checkstyle:IllegalCatch")
78     private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
79             throws Exception {
80         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
81         final String shardName = "test-1";
82
83         // Setup the InMemoryJournal to block shard recovery to ensure
84         // the shard isn't
85         // initialized until we create and submit the write the Tx.
86         final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
87         final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
88         InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
89
90         try (var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName)) {
91             // Create the write Tx
92             final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
93                     : dataStore.newReadWriteTransaction();
94             assertNotNull("newReadWriteTransaction returned null", writeTx);
95
96             // Do some modification operations and ready the Tx on a
97             // separate thread.
98             final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
99                     .builder(TestModel.OUTER_LIST_PATH)
100                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
101
102             final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
103             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
104             final CountDownLatch txReady = new CountDownLatch(1);
105             final Thread txThread = new Thread(() -> {
106                 try {
107                     writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
108
109                     writeTx.merge(TestModel.OUTER_LIST_PATH,
110                         ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
111                         .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
112                         .build());
113
114                     writeTx.write(listEntryPath,
115                         ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
116
117                     writeTx.delete(listEntryPath);
118
119                     txCohort.set(writeTx.ready());
120                 } catch (Exception e) {
121                     caughtEx.set(e);
122                 } finally {
123                     txReady.countDown();
124                 }
125             });
126
127             txThread.start();
128
129             // Wait for the Tx operations to complete.
130             final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
131             if (caughtEx.get() != null) {
132                 throw caughtEx.get();
133             }
134
135             assertTrue("Tx ready", done);
136
137             // At this point the Tx operations should be waiting for the
138             // shard to initialize so
139             // trigger the latch to let the shard recovery to continue.
140             blockRecoveryLatch.countDown();
141
142             // Wait for the Tx commit to complete.
143             testKit.doCommit(txCohort.get());
144
145             // Verify the data in the store
146             final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
147
148             Optional<NormalizedNode> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
149             assertTrue("isPresent", optional.isPresent());
150
151             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
152             assertTrue("isPresent", optional.isPresent());
153
154             optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
155             assertFalse("isPresent", optional.isPresent());
156         }
157     }
158
159     @Test
160     public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
161         datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
162         testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
163     }
164
165     @Test
166     public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
167         testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
168     }
169
170     @Test
171     @SuppressWarnings("checkstyle:IllegalCatch")
172     public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
173         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
174         final String testName = "testTransactionReadsWithShardNotInitiallyReady";
175         final String shardName = "test-1";
176
177         // Setup the InMemoryJournal to block shard recovery to ensure
178         // the shard isn't
179         // initialized until we create the Tx.
180         final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
181         final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
182         InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
183
184         try (var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName)) {
185             // Create the read-write Tx
186             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
187             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
188
189             // Do some reads on the Tx on a separate thread.
190             final AtomicReference<FluentFuture<Boolean>> txExistsFuture = new AtomicReference<>();
191             final AtomicReference<FluentFuture<Optional<NormalizedNode>>> txReadFuture = new AtomicReference<>();
192             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
193             final CountDownLatch txReadsDone = new CountDownLatch(1);
194             final Thread txThread = new Thread(() -> {
195                 try {
196                     readWriteTx.write(TestModel.TEST_PATH,
197                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
198
199                     txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
200
201                     txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
202                 } catch (Exception e) {
203                     caughtEx.set(e);
204                 } finally {
205                     txReadsDone.countDown();
206                 }
207             });
208
209             txThread.start();
210
211             // Wait for the Tx operations to complete.
212             boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
213             if (caughtEx.get() != null) {
214                 throw caughtEx.get();
215             }
216
217             assertTrue("Tx reads done", done);
218
219             // At this point the Tx operations should be waiting for the
220             // shard to initialize so
221             // trigger the latch to let the shard recovery to continue.
222             blockRecoveryLatch.countDown();
223
224             // Wait for the reads to complete and verify.
225             assertEquals("exists", Boolean.TRUE, txExistsFuture.get().get(5, TimeUnit.SECONDS));
226             assertTrue("read", txReadFuture.get().get(5, TimeUnit.SECONDS).isPresent());
227
228             readWriteTx.close();
229         }
230     }
231
232     @Test(expected = NotInitializedException.class)
233     @SuppressWarnings("checkstyle:IllegalCatch")
234     public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
235         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
236         final String testName = "testTransactionCommitFailureWithShardNotInitialized";
237         final String shardName = "test-1";
238
239         // Set the shard initialization timeout low for the test.
240         datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
241
242         // Setup the InMemoryJournal to block shard recovery
243         // indefinitely.
244         final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
245         final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
246         InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
247
248         InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
249
250         final var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName);
251
252         // Create the write Tx
253         final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
254         assertNotNull("newReadWriteTransaction returned null", writeTx);
255
256         // Do some modifications and ready the Tx on a separate
257         // thread.
258         final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
259         final AtomicReference<Exception> caughtEx = new AtomicReference<>();
260         final CountDownLatch txReady = new CountDownLatch(1);
261         final Thread txThread = new Thread(() -> {
262             try {
263                 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
264
265                 txCohort.set(writeTx.ready());
266             } catch (Exception e) {
267                 caughtEx.set(e);
268             } finally {
269                 txReady.countDown();
270             }
271         });
272
273         txThread.start();
274
275         // Wait for the Tx operations to complete.
276         boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
277         if (caughtEx.get() != null) {
278             throw caughtEx.get();
279         }
280
281         assertTrue("Tx ready", done);
282
283         // Wait for the commit to complete. Since the shard never
284         // initialized, the Tx should
285         // have timed out and throw an appropriate exception cause.
286         try {
287             txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
288             fail("Expected NotInitializedException");
289         } catch (final Exception e) {
290             final Throwable root = Throwables.getRootCause(e);
291             Throwables.throwIfUnchecked(root);
292             throw new RuntimeException(root);
293         } finally {
294             blockRecoveryLatch.countDown();
295         }
296     }
297
298     @Test(expected = NotInitializedException.class)
299     @SuppressWarnings("checkstyle:IllegalCatch")
300     public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
301         final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
302         final String testName = "testTransactionReadFailureWithShardNotInitialized";
303         final String shardName = "test-1";
304
305         // Set the shard initialization timeout low for the test.
306         datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
307
308         // Setup the InMemoryJournal to block shard recovery
309         // indefinitely.
310         final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
311         final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
312         InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
313
314         InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
315
316         try (var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName)) {
317             // Create the read-write Tx
318             final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
319             assertNotNull("newReadWriteTransaction returned null", readWriteTx);
320
321             // Do a read on the Tx on a separate thread.
322             final AtomicReference<FluentFuture<Optional<NormalizedNode>>> txReadFuture = new AtomicReference<>();
323             final AtomicReference<Exception> caughtEx = new AtomicReference<>();
324             final CountDownLatch txReadDone = new CountDownLatch(1);
325             final Thread txThread = new Thread(() -> {
326                 try {
327                     readWriteTx.write(TestModel.TEST_PATH,
328                         ImmutableNodes.containerNode(TestModel.TEST_QNAME));
329
330                     txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
331
332                     readWriteTx.close();
333                 } catch (Exception e) {
334                     caughtEx.set(e);
335                 } finally {
336                     txReadDone.countDown();
337                 }
338             });
339
340             txThread.start();
341
342             // Wait for the Tx operations to complete.
343             boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
344             if (caughtEx.get() != null) {
345                 throw caughtEx.get();
346             }
347
348             assertTrue("Tx read done", done);
349
350             // Wait for the read to complete. Since the shard never
351             // initialized, the Tx should
352             // have timed out and throw an appropriate exception cause.
353             try {
354                 txReadFuture.get().get(5, TimeUnit.SECONDS);
355             } catch (ExecutionException e) {
356                 assertTrue("Expected ReadFailedException cause: " + e.getCause(),
357                     e.getCause() instanceof ReadFailedException);
358                 final Throwable root = Throwables.getRootCause(e);
359                 Throwables.throwIfUnchecked(root);
360                 throw new RuntimeException(root);
361             } finally {
362                 blockRecoveryLatch.countDown();
363             }
364         }
365     }
366
367 }