1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorSystem;
4 import akka.event.Logging;
5 import akka.testkit.JavaTestKit;
6 import com.google.common.base.Optional;
7 import com.google.common.util.concurrent.ListenableFuture;
8 import junit.framework.Assert;
9 import org.apache.commons.io.FileUtils;
10 import org.junit.After;
11 import org.junit.Before;
12 import org.junit.Test;
13 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
14 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
15 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
16 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
17 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
18 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
19 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
21 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
22 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
25 import java.io.IOException;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
30 import static junit.framework.Assert.assertEquals;
31 import static junit.framework.Assert.assertTrue;
32 import static junit.framework.Assert.fail;
34 public class DistributedDataStoreIntegrationTest {
36 private static ActorSystem system;
39 public void setUp() throws IOException {
40 File journal = new File("journal");
42 if(journal.exists()) {
43 FileUtils.deleteDirectory(journal);
47 System.setProperty("shard.persistent", "false");
48 system = ActorSystem.create("test");
52 public void tearDown() {
53 JavaTestKit.shutdownActorSystem(system);
57 protected ActorSystem getSystem() {
62 public void integrationTest() throws Exception {
63 final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
64 ShardStrategyFactory.setConfiguration(configuration);
68 new JavaTestKit(getSystem()) {
71 new Within(duration("10 seconds")) {
72 protected void run() {
74 final DistributedDataStore distributedDataStore =
75 new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
77 distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
79 // Wait for a specific log message to show up
80 final boolean result =
81 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
83 protected Boolean run() {
86 }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
87 .message("Switching from state Candidate to Leader")
88 .occurrences(1).exec();
90 assertEquals(true, result);
92 DOMStoreReadWriteTransaction transaction =
93 distributedDataStore.newReadWriteTransaction();
96 .write(TestModel.TEST_PATH, ImmutableNodes
97 .containerNode(TestModel.TEST_QNAME));
99 ListenableFuture<Optional<NormalizedNode<?, ?>>>
101 transaction.read(TestModel.TEST_PATH);
103 Optional<NormalizedNode<?, ?>> optional =
106 Assert.assertTrue("Node not found", optional.isPresent());
108 NormalizedNode<?, ?> normalizedNode =
111 assertEquals(TestModel.TEST_QNAME,
112 normalizedNode.getNodeType());
114 DOMStoreThreePhaseCommitCohort ready =
117 ListenableFuture<Boolean> canCommit =
120 assertTrue(canCommit.get(5, TimeUnit.SECONDS));
122 ListenableFuture<Void> preCommit =
125 preCommit.get(5, TimeUnit.SECONDS);
127 ListenableFuture<Void> commit = ready.commit();
129 commit.get(5, TimeUnit.SECONDS);
130 } catch (ExecutionException | TimeoutException | InterruptedException e){
131 fail(e.getMessage());
141 //FIXME : Disabling test because it's flaky
143 public void integrationTestWithMultiShardConfiguration()
144 throws ExecutionException, InterruptedException, TimeoutException {
145 final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
147 ShardStrategyFactory.setConfiguration(configuration);
149 new JavaTestKit(getSystem()) {
152 new Within(duration("10 seconds")) {
153 protected void run() {
155 final DistributedDataStore distributedDataStore =
156 new DistributedDataStore(getSystem(), "config",
157 new MockClusterWrapper(), configuration);
159 distributedDataStore.onGlobalContextUpdated(
160 SchemaContextHelper.full());
162 // Wait for a specific log message to show up
163 final boolean result =
164 new JavaTestKit.EventFilter<Boolean>(
167 protected Boolean run() {
171 "akka://test/user/shardmanager-config/member-1-shard-cars-1-config")
173 "Switching from state Candidate to Leader")
180 DOMStoreReadWriteTransaction transaction =
181 distributedDataStore.newReadWriteTransaction();
183 transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
184 transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
186 DOMStoreThreePhaseCommitCohort ready = transaction.ready();
188 ListenableFuture<Boolean> canCommit = ready.canCommit();
190 assertTrue(canCommit.get(5, TimeUnit.SECONDS));
192 ListenableFuture<Void> preCommit = ready.preCommit();
194 preCommit.get(5, TimeUnit.SECONDS);
196 ListenableFuture<Void> commit = ready.commit();
198 commit.get(5, TimeUnit.SECONDS);
200 assertEquals(true, result);
201 } catch(ExecutionException | TimeoutException | InterruptedException e){
202 fail(e.getMessage());