1 package org.opendaylight.controller.cluster.datastore;
3 import akka.actor.ActorSystem;
4 import akka.event.Logging;
5 import akka.testkit.JavaTestKit;
7 import com.google.common.base.Optional;
8 import com.google.common.util.concurrent.ListenableFuture;
10 import junit.framework.Assert;
12 import org.apache.commons.io.FileUtils;
13 import org.junit.After;
14 import org.junit.Before;
15 import org.junit.Test;
16 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
17 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
18 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
19 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
20 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
21 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
24 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
25 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
28 import java.io.IOException;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.TimeUnit;
31 import java.util.concurrent.TimeoutException;
33 import static junit.framework.Assert.assertEquals;
34 import static junit.framework.Assert.assertTrue;
35 import static junit.framework.Assert.fail;
37 public class DistributedDataStoreIntegrationTest {
39 private static ActorSystem system;
42 public void setUp() throws IOException {
43 File journal = new File("journal");
45 if(journal.exists()) {
46 FileUtils.deleteDirectory(journal);
50 System.setProperty("shard.persistent", "false");
51 system = ActorSystem.create("test");
55 public void tearDown() {
56 JavaTestKit.shutdownActorSystem(system);
60 protected ActorSystem getSystem() {
65 public void integrationTest() throws Exception {
66 final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
67 ShardStrategyFactory.setConfiguration(configuration);
71 new JavaTestKit(getSystem()) {
74 new Within(duration("10 seconds")) {
76 protected void run() {
78 final DistributedDataStore distributedDataStore =
79 new DistributedDataStore(getSystem(), "config",
80 new MockClusterWrapper(), configuration,
81 new DistributedDataStoreProperties());
83 distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
85 // Wait for a specific log message to show up
86 final boolean result =
87 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
90 protected Boolean run() {
93 }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
94 .message("Switching from state Candidate to Leader")
95 .occurrences(1).exec();
97 assertEquals(true, result);
99 DOMStoreReadWriteTransaction transaction =
100 distributedDataStore.newReadWriteTransaction();
103 .write(TestModel.TEST_PATH, ImmutableNodes
104 .containerNode(TestModel.TEST_QNAME));
106 ListenableFuture<Optional<NormalizedNode<?, ?>>>
108 transaction.read(TestModel.TEST_PATH);
110 Optional<NormalizedNode<?, ?>> optional =
113 Assert.assertTrue("Node not found", optional.isPresent());
115 NormalizedNode<?, ?> normalizedNode =
118 assertEquals(TestModel.TEST_QNAME,
119 normalizedNode.getNodeType());
121 DOMStoreThreePhaseCommitCohort ready =
124 ListenableFuture<Boolean> canCommit =
127 assertTrue(canCommit.get(5, TimeUnit.SECONDS));
129 ListenableFuture<Void> preCommit =
132 preCommit.get(5, TimeUnit.SECONDS);
134 ListenableFuture<Void> commit = ready.commit();
136 commit.get(5, TimeUnit.SECONDS);
137 } catch (ExecutionException | TimeoutException | InterruptedException e){
138 fail(e.getMessage());
148 //FIXME : Disabling test because it's flaky
150 public void integrationTestWithMultiShardConfiguration()
151 throws ExecutionException, InterruptedException, TimeoutException {
152 final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
154 ShardStrategyFactory.setConfiguration(configuration);
156 new JavaTestKit(getSystem()) {
159 new Within(duration("10 seconds")) {
161 protected void run() {
163 final DistributedDataStore distributedDataStore =
164 new DistributedDataStore(getSystem(), "config",
165 new MockClusterWrapper(), configuration, null);
167 distributedDataStore.onGlobalContextUpdated(
168 SchemaContextHelper.full());
170 // Wait for a specific log message to show up
171 final boolean result =
172 new JavaTestKit.EventFilter<Boolean>(
176 protected Boolean run() {
180 "akka://test/user/shardmanager-config/member-1-shard-cars-1-config")
182 "Switching from state Candidate to Leader")
189 DOMStoreReadWriteTransaction transaction =
190 distributedDataStore.newReadWriteTransaction();
192 transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
193 transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
195 DOMStoreThreePhaseCommitCohort ready = transaction.ready();
197 ListenableFuture<Boolean> canCommit = ready.canCommit();
199 assertTrue(canCommit.get(5, TimeUnit.SECONDS));
201 ListenableFuture<Void> preCommit = ready.preCommit();
203 preCommit.get(5, TimeUnit.SECONDS);
205 ListenableFuture<Void> commit = ready.commit();
207 commit.get(5, TimeUnit.SECONDS);
209 assertEquals(true, result);
210 } catch(ExecutionException | TimeoutException | InterruptedException e){
211 fail(e.getMessage());