2 * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.junit.Assert.fail;
16 import akka.actor.ActorSystem;
17 import akka.actor.Address;
18 import akka.actor.AddressFromURIString;
19 import akka.cluster.Cluster;
20 import akka.testkit.javadsl.TestKit;
21 import com.google.common.base.Throwables;
22 import com.google.common.util.concurrent.FluentFuture;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import com.typesafe.config.ConfigFactory;
25 import java.util.Arrays;
26 import java.util.Collection;
27 import java.util.Optional;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.atomic.AtomicReference;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35 import org.junit.runner.RunWith;
36 import org.junit.runners.Parameterized;
37 import org.junit.runners.Parameterized.Parameters;
38 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
41 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
42 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
43 import org.opendaylight.mdsal.common.api.ReadFailedException;
44 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
45 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
46 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
47 import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
52 @RunWith(Parameterized.class)
53 public class DistributedDataStoreIntegrationTest extends AbstractDistributedDataStoreIntegrationTest {
55 @Parameters(name = "{0}")
56 public static Collection<Object[]> data() {
57 return Arrays.asList(new Object[][] {
58 { DistributedDataStore.class }, { ClientBackedDataStore.class }
64 InMemorySnapshotStore.clear();
65 InMemoryJournal.clear();
66 system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
67 Address member1Address = AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
68 Cluster.get(system).join(member1Address);
72 public void tearDown() {
73 TestKit.shutdownActorSystem(system, true);
77 @SuppressWarnings("checkstyle:IllegalCatch")
78 private void testTransactionWritesWithShardNotInitiallyReady(final String testName, final boolean writeOnly)
80 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
81 final String shardName = "test-1";
83 // Setup the InMemoryJournal to block shard recovery to ensure
85 // initialized until we create and submit the write the Tx.
86 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
87 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
88 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
90 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
91 testParameter, testName, false, shardName)) {
93 // Create the write Tx
94 final DOMStoreWriteTransaction writeTx = writeOnly ? dataStore.newWriteOnlyTransaction()
95 : dataStore.newReadWriteTransaction();
96 assertNotNull("newReadWriteTransaction returned null", writeTx);
98 // Do some modification operations and ready the Tx on a
100 final YangInstanceIdentifier listEntryPath = YangInstanceIdentifier
101 .builder(TestModel.OUTER_LIST_PATH)
102 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build();
104 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
105 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
106 final CountDownLatch txReady = new CountDownLatch(1);
107 final Thread txThread = new Thread(() -> {
109 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
111 writeTx.merge(TestModel.OUTER_LIST_PATH,
112 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
113 .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 42))
116 writeTx.write(listEntryPath,
117 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
119 writeTx.delete(listEntryPath);
121 txCohort.set(writeTx.ready());
122 } catch (Exception e) {
131 // Wait for the Tx operations to complete.
132 final boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
133 if (caughtEx.get() != null) {
134 throw caughtEx.get();
137 assertTrue("Tx ready", done);
139 // At this point the Tx operations should be waiting for the
140 // shard to initialize so
141 // trigger the latch to let the shard recovery to continue.
142 blockRecoveryLatch.countDown();
144 // Wait for the Tx commit to complete.
145 testKit.doCommit(txCohort.get());
147 // Verify the data in the store
148 final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
150 Optional<NormalizedNode<?, ?>> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
151 assertTrue("isPresent", optional.isPresent());
153 optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
154 assertTrue("isPresent", optional.isPresent());
156 optional = readTx.read(listEntryPath).get(5, TimeUnit.SECONDS);
157 assertFalse("isPresent", optional.isPresent());
162 public void testWriteOnlyTransactionWithShardNotInitiallyReady() throws Exception {
163 datastoreContextBuilder.writeOnlyTransactionOptimizationsEnabled(true);
164 testTransactionWritesWithShardNotInitiallyReady("testWriteOnlyTransactionWithShardNotInitiallyReady", true);
168 public void testReadWriteTransactionWithShardNotInitiallyReady() throws Exception {
169 testTransactionWritesWithShardNotInitiallyReady("testReadWriteTransactionWithShardNotInitiallyReady", false);
173 @SuppressWarnings("checkstyle:IllegalCatch")
174 public void testTransactionReadsWithShardNotInitiallyReady() throws Exception {
175 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
176 final String testName = "testTransactionReadsWithShardNotInitiallyReady";
177 final String shardName = "test-1";
179 // Setup the InMemoryJournal to block shard recovery to ensure
181 // initialized until we create the Tx.
182 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
183 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
184 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
186 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
187 testParameter, testName, false, shardName)) {
189 // Create the read-write Tx
190 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
191 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
193 // Do some reads on the Tx on a separate thread.
194 final AtomicReference<FluentFuture<Boolean>> txExistsFuture = new AtomicReference<>();
195 final AtomicReference<FluentFuture<Optional<NormalizedNode<?, ?>>>> txReadFuture = new AtomicReference<>();
196 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
197 final CountDownLatch txReadsDone = new CountDownLatch(1);
198 final Thread txThread = new Thread(() -> {
200 readWriteTx.write(TestModel.TEST_PATH,
201 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
203 txExistsFuture.set(readWriteTx.exists(TestModel.TEST_PATH));
205 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
206 } catch (Exception e) {
209 txReadsDone.countDown();
215 // Wait for the Tx operations to complete.
216 boolean done = Uninterruptibles.awaitUninterruptibly(txReadsDone, 5, TimeUnit.SECONDS);
217 if (caughtEx.get() != null) {
218 throw caughtEx.get();
221 assertTrue("Tx reads done", done);
223 // At this point the Tx operations should be waiting for the
224 // shard to initialize so
225 // trigger the latch to let the shard recovery to continue.
226 blockRecoveryLatch.countDown();
228 // Wait for the reads to complete and verify.
229 assertEquals("exists", Boolean.TRUE, txExistsFuture.get().get(5, TimeUnit.SECONDS));
230 assertTrue("read", txReadFuture.get().get(5, TimeUnit.SECONDS).isPresent());
236 @Test(expected = NotInitializedException.class)
237 @SuppressWarnings("checkstyle:IllegalCatch")
238 public void testTransactionCommitFailureWithShardNotInitialized() throws Exception {
239 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
240 final String testName = "testTransactionCommitFailureWithShardNotInitialized";
241 final String shardName = "test-1";
243 // Set the shard initialization timeout low for the test.
244 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
246 // Setup the InMemoryJournal to block shard recovery
248 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
249 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
250 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
252 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
254 final AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName);
256 // Create the write Tx
257 final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
258 assertNotNull("newReadWriteTransaction returned null", writeTx);
260 // Do some modifications and ready the Tx on a separate
262 final AtomicReference<DOMStoreThreePhaseCommitCohort> txCohort = new AtomicReference<>();
263 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
264 final CountDownLatch txReady = new CountDownLatch(1);
265 final Thread txThread = new Thread(() -> {
267 writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
269 txCohort.set(writeTx.ready());
270 } catch (Exception e) {
279 // Wait for the Tx operations to complete.
280 boolean done = Uninterruptibles.awaitUninterruptibly(txReady, 5, TimeUnit.SECONDS);
281 if (caughtEx.get() != null) {
282 throw caughtEx.get();
285 assertTrue("Tx ready", done);
287 // Wait for the commit to complete. Since the shard never
288 // initialized, the Tx should
289 // have timed out and throw an appropriate exception cause.
291 txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
292 fail("Expected NotInitializedException");
293 } catch (final Exception e) {
294 final Throwable root = Throwables.getRootCause(e);
295 Throwables.throwIfUnchecked(root);
296 throw new RuntimeException(root);
298 blockRecoveryLatch.countDown();
302 @Test(expected = NotInitializedException.class)
303 @SuppressWarnings("checkstyle:IllegalCatch")
304 public void testTransactionReadFailureWithShardNotInitialized() throws Exception {
305 final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
306 final String testName = "testTransactionReadFailureWithShardNotInitialized";
307 final String shardName = "test-1";
309 // Set the shard initialization timeout low for the test.
310 datastoreContextBuilder.shardInitializationTimeout(300, TimeUnit.MILLISECONDS);
312 // Setup the InMemoryJournal to block shard recovery
314 final String persistentID = String.format("member-1-shard-%s-%s", shardName, testName);
315 final CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
316 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
318 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
320 try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
322 // Create the read-write Tx
323 final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
324 assertNotNull("newReadWriteTransaction returned null", readWriteTx);
326 // Do a read on the Tx on a separate thread.
327 final AtomicReference<FluentFuture<Optional<NormalizedNode<?, ?>>>> txReadFuture = new AtomicReference<>();
328 final AtomicReference<Exception> caughtEx = new AtomicReference<>();
329 final CountDownLatch txReadDone = new CountDownLatch(1);
330 final Thread txThread = new Thread(() -> {
332 readWriteTx.write(TestModel.TEST_PATH,
333 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
335 txReadFuture.set(readWriteTx.read(TestModel.TEST_PATH));
338 } catch (Exception e) {
341 txReadDone.countDown();
347 // Wait for the Tx operations to complete.
348 boolean done = Uninterruptibles.awaitUninterruptibly(txReadDone, 5, TimeUnit.SECONDS);
349 if (caughtEx.get() != null) {
350 throw caughtEx.get();
353 assertTrue("Tx read done", done);
355 // Wait for the read to complete. Since the shard never
356 // initialized, the Tx should
357 // have timed out and throw an appropriate exception cause.
359 txReadFuture.get().get(5, TimeUnit.SECONDS);
360 } catch (ExecutionException e) {
361 assertTrue("Expected ReadFailedException cause: " + e.getCause(),
362 e.getCause() instanceof ReadFailedException);
363 final Throwable root = Throwables.getRootCause(e);
364 Throwables.throwIfUnchecked(root);
365 throw new RuntimeException(root);
367 blockRecoveryLatch.countDown();