Revert "Revert "BUG-1425: Integrated new Binding to Normalized Node codec for write...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / DistributedDataStoreIntegrationTest.java
1 package org.opendaylight.controller.cluster.datastore;
2
3 import akka.actor.ActorSystem;
4 import akka.event.Logging;
5 import akka.testkit.JavaTestKit;
6 import com.google.common.base.Optional;
7 import com.google.common.util.concurrent.ListenableFuture;
8 import junit.framework.Assert;
9 import org.apache.commons.io.FileUtils;
10 import org.junit.After;
11 import org.junit.Before;
12 import org.junit.Test;
13 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
14 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
15 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
16 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
17 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
18 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
19 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
20 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
21 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
22 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
23
24 import java.io.File;
25 import java.io.IOException;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29
30 import static junit.framework.Assert.assertEquals;
31 import static junit.framework.Assert.assertTrue;
32 import static junit.framework.Assert.fail;
33
34 public class DistributedDataStoreIntegrationTest {
35
36     private static ActorSystem system;
37
38     @Before
39     public void setUp() throws IOException {
40         File journal = new File("journal");
41
42         if(journal.exists()) {
43             FileUtils.deleteDirectory(journal);
44         }
45
46
47         System.setProperty("shard.persistent", "false");
48         system = ActorSystem.create("test");
49     }
50
51     @After
52     public void tearDown() {
53         JavaTestKit.shutdownActorSystem(system);
54         system = null;
55     }
56
57     protected ActorSystem getSystem() {
58         return system;
59     }
60
61     @Test
62     public void integrationTest() throws Exception {
63         final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
64         ShardStrategyFactory.setConfiguration(configuration);
65
66
67
68         new JavaTestKit(getSystem()) {
69             {
70
71                 new Within(duration("10 seconds")) {
72                     protected void run() {
73                         try {
74                             final DistributedDataStore distributedDataStore =
75                                 new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration);
76
77                             distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
78
79                             // Wait for a specific log message to show up
80                             final boolean result =
81                                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
82                                     ) {
83                                     protected Boolean run() {
84                                         return true;
85                                     }
86                                 }.from("akka://test/user/shardmanager-config/member-1-shard-test-1-config")
87                                     .message("Switching from state Candidate to Leader")
88                                     .occurrences(1).exec();
89
90                             assertEquals(true, result);
91
92                             DOMStoreReadWriteTransaction transaction =
93                                 distributedDataStore.newReadWriteTransaction();
94
95                             transaction
96                                 .write(TestModel.TEST_PATH, ImmutableNodes
97                                     .containerNode(TestModel.TEST_QNAME));
98
99                             ListenableFuture<Optional<NormalizedNode<?, ?>>>
100                                 future =
101                                 transaction.read(TestModel.TEST_PATH);
102
103                             Optional<NormalizedNode<?, ?>> optional =
104                                 future.get();
105
106                             Assert.assertTrue("Node not found", optional.isPresent());
107
108                             NormalizedNode<?, ?> normalizedNode =
109                                 optional.get();
110
111                             assertEquals(TestModel.TEST_QNAME,
112                                 normalizedNode.getNodeType());
113
114                             DOMStoreThreePhaseCommitCohort ready =
115                                 transaction.ready();
116
117                             ListenableFuture<Boolean> canCommit =
118                                 ready.canCommit();
119
120                             assertTrue(canCommit.get(5, TimeUnit.SECONDS));
121
122                             ListenableFuture<Void> preCommit =
123                                 ready.preCommit();
124
125                             preCommit.get(5, TimeUnit.SECONDS);
126
127                             ListenableFuture<Void> commit = ready.commit();
128
129                             commit.get(5, TimeUnit.SECONDS);
130                         } catch (ExecutionException | TimeoutException | InterruptedException e){
131                             fail(e.getMessage());
132                         }
133                     }
134                 };
135             }
136         };
137
138     }
139
140
141     //FIXME : Disabling test because it's flaky
142     //@Test
143     public void integrationTestWithMultiShardConfiguration()
144         throws ExecutionException, InterruptedException, TimeoutException {
145         final Configuration configuration = new ConfigurationImpl("module-shards.conf", "modules.conf");
146
147         ShardStrategyFactory.setConfiguration(configuration);
148
149         new JavaTestKit(getSystem()) {
150             {
151
152                 new Within(duration("10 seconds")) {
153                     protected void run() {
154                         try {
155                             final DistributedDataStore distributedDataStore =
156                                 new DistributedDataStore(getSystem(), "config",
157                                     new MockClusterWrapper(), configuration);
158
159                             distributedDataStore.onGlobalContextUpdated(
160                                 SchemaContextHelper.full());
161
162                             // Wait for a specific log message to show up
163                             final boolean result =
164                                 new JavaTestKit.EventFilter<Boolean>(
165                                     Logging.Info.class
166                                 ) {
167                                     protected Boolean run() {
168                                         return true;
169                                     }
170                                 }.from(
171                                     "akka://test/user/shardmanager-config/member-1-shard-cars-1-config")
172                                     .message(
173                                         "Switching from state Candidate to Leader")
174                                     .occurrences(1)
175                                     .exec();
176
177                             Thread.sleep(1000);
178
179
180                             DOMStoreReadWriteTransaction transaction =
181                                 distributedDataStore.newReadWriteTransaction();
182
183                             transaction.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
184                             transaction.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
185
186                             DOMStoreThreePhaseCommitCohort ready = transaction.ready();
187
188                             ListenableFuture<Boolean> canCommit = ready.canCommit();
189
190                             assertTrue(canCommit.get(5, TimeUnit.SECONDS));
191
192                             ListenableFuture<Void> preCommit = ready.preCommit();
193
194                             preCommit.get(5, TimeUnit.SECONDS);
195
196                             ListenableFuture<Void> commit = ready.commit();
197
198                             commit.get(5, TimeUnit.SECONDS);
199
200                             assertEquals(true, result);
201                         } catch(ExecutionException | TimeoutException | InterruptedException e){
202                             fail(e.getMessage());
203                         }
204                     }
205                 };
206             }
207         };
208
209
210     }
211
212 }