2 * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
16 import akka.actor.ActorSystem;
17 import akka.actor.Address;
18 import akka.actor.AddressFromURIString;
19 import akka.cluster.Cluster;
20 import akka.testkit.javadsl.TestKit;
21 import com.google.common.base.Throwables;
22 import com.google.common.util.concurrent.FluentFuture;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import com.typesafe.config.ConfigFactory;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Optional;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.junit.runner.RunWith;
36 import org.junit.runners.Parameterized;
37 import org.junit.runners.Parameterized.Parameters;
38 import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
41 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
42 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
43 import org.opendaylight.mdsal.common.api.ReadFailedException;
44 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
45 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
46 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
47 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
52 @RunWith(Parameterized.class)
53 public class DistributedDataStoreIntegrationTest extends AbstractDistributedDataStoreIntegrationTest {
55 @Parameters(name = "{0}")
56 public static Collection<Object[]> data() {
57 return Arrays.asList(new Object[][] {
58 { TestClientBackedDataStore.class }
64 InMemorySnapshotStore.clear();
65 InMemoryJournal.clear();
66 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
67 Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
68 Cluster.get(system).join(member1Address);
72 public void tearDown() {
73 TestKit.shutdownActorSystem(system, true);
77 @SuppressWarnings("checkstyle:IllegalCatch")
78 private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
80 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
81 final String shardName = "test-1";
83 // Setup the InMemoryJournal to block shard recovery to ensure
85 // initialized until we create and submit the write the Tx.
86 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
87 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
88 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
90 try (var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName)) {
91 // Create the write Tx
92 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
93 : dataStore.newReadWriteTransaction();
94 assertNotNull("newReadWriteTransaction returned null", writeTx);
96 // Do some modification operations and ready the Tx on a
98 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
99 .builder(TestModel.OUTER_LIST_PATH)
100 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
102 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
103 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
104 final CountDownLatch txReady = new CountDownLatch(1);
105 final Thread txThread = new Thread(() -> {
107 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
109 writeTx.merge(TestModel.OUTER_LIST_PATH,
110 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
111 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
114 writeTx.write(listEntryPath,
115 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
117 writeTx.delete(listEntryPath);
119 txCohort.set(writeTx.ready());
120 } catch (Exception e) {
129 // Wait for the Tx operations to complete.
130 final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
131 if (caughtEx.get() != null) {
132 throw caughtEx.get();
135 assertTrue("Tx ready", done);
137 // At this point the Tx operations should be waiting for the
138 // shard to initialize so
139 // trigger the latch to let the shard recovery to continue.
140 blockRecoveryLatch.countDown();
142 // Wait for the Tx commit to complete.
143 testKit.doCommit(txCohort.get());
145 // Verify the data in the store
146 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
148 Optional<NormalizedNode> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
149 assertTrue("isPresent", optional.isPresent());
151 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
152 assertTrue("isPresent", optional.isPresent());
154 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
155 assertFalse("isPresent", optional.isPresent());
160 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
161 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
162 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
166 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
167 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
171 @SuppressWarnings("checkstyle:IllegalCatch")
172 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
173 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
174 final String testName = "testTransactionReadsWithShardNotInitiallyReady";
175 final String shardName = "test-1";
177 // Setup the InMemoryJournal to block shard recovery to ensure
179 // initialized until we create the Tx.
180 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
181 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
182 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
184 try (var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName)) {
185 // Create the read-write Tx
186 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
187 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
189 // Do some reads on the Tx on a separate thread.
190 final AtomicReference<FluentFuture<Boolean>> txExistsFuture = new AtomicReference<>();
191 final AtomicReference<FluentFuture<Optional<NormalizedNode>>> txReadFuture = new AtomicReference<>();
192 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
193 final CountDownLatch txReadsDone = new CountDownLatch(1);
194 final Thread txThread = new Thread(() -> {
196 readWriteTx.write(TestModel.TEST_PATH,
197 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
199 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
201 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
202 } catch (Exception e) {
205 txReadsDone.countDown();
211 // Wait for the Tx operations to complete.
212 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
213 if (caughtEx.get() != null) {
214 throw caughtEx.get();
217 assertTrue("Tx reads done", done);
219 // At this point the Tx operations should be waiting for the
220 // shard to initialize so
221 // trigger the latch to let the shard recovery to continue.
222 blockRecoveryLatch.countDown();
224 // Wait for the reads to complete and verify.
225 assertEquals("exists", Boolean.TRUE, txExistsFuture.get().get(5, TimeUnit.SECONDS));
226 assertTrue("read", txReadFuture.get().get(5, TimeUnit.SECONDS).isPresent());
232 @Test(expected = NotInitializedException.class)
233 @SuppressWarnings("checkstyle:IllegalCatch")
234 public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
235 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
236 final String testName = "testTransactionCommitFailureWithShardNotInitialized";
237 final String shardName = "test-1";
239 // Set the shard initialization timeout low for the test.
240 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
242 // Setup the InMemoryJournal to block shard recovery
244 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
245 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
246 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
248 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
250 final var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName);
252 // Create the write Tx
253 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
254 assertNotNull("newReadWriteTransaction returned null", writeTx);
256 // Do some modifications and ready the Tx on a separate
258 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
259 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
260 final CountDownLatch txReady = new CountDownLatch(1);
261 final Thread txThread = new Thread(() -> {
263 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
265 txCohort.set(writeTx.ready());
266 } catch (Exception e) {
275 // Wait for the Tx operations to complete.
276 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
277 if (caughtEx.get() != null) {
278 throw caughtEx.get();
281 assertTrue("Tx ready", done);
283 // Wait for the commit to complete. Since the shard never
284 // initialized, the Tx should
285 // have timed out and throw an appropriate exception cause.
287 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
288 fail("Expected NotInitializedException");
289 } catch (final Exception e) {
290 final Throwable root = Throwables.getRootCause(e);
291 Throwables.throwIfUnchecked(root);
292 throw new RuntimeException(root);
294 blockRecoveryLatch.countDown();
298 @Test(expected = NotInitializedException.class)
299 @SuppressWarnings("checkstyle:IllegalCatch")
300 public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
301 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
302 final String testName = "testTransactionReadFailureWithShardNotInitialized";
303 final String shardName = "test-1";
305 // Set the shard initialization timeout low for the test.
306 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
308 // Setup the InMemoryJournal to block shard recovery
310 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
311 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
312 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
314 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
316 try (var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName)) {
317 // Create the read-write Tx
318 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
319 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
321 // Do a read on the Tx on a separate thread.
322 final AtomicReference<FluentFuture<Optional<NormalizedNode>>> txReadFuture = new AtomicReference<>();
323 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
324 final CountDownLatch txReadDone = new CountDownLatch(1);
325 final Thread txThread = new Thread(() -> {
327 readWriteTx.write(TestModel.TEST_PATH,
328 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
330 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
333 } catch (Exception e) {
336 txReadDone.countDown();
342 // Wait for the Tx operations to complete.
343 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
344 if (caughtEx.get() != null) {
345 throw caughtEx.get();
348 assertTrue("Tx read done", done);
350 // Wait for the read to complete. Since the shard never
351 // initialized, the Tx should
352 // have timed out and throw an appropriate exception cause.
354 txReadFuture.get().get(5, TimeUnit.SECONDS);
355 } catch (ExecutionException e) {
356 assertTrue("Expected ReadFailedException cause: " + e.getCause(),
357 e.getCause() instanceof ReadFailedException);
358 final Throwable root = Throwables.getRootCause(e);
359 Throwables.throwIfUnchecked(root);
360 throw new RuntimeException(root);
362 blockRecoveryLatch.countDown();