2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.AddressFromURIString;
16 import akka.cluster.Cluster;
17 import akka.testkit.JavaTestKit;
18 import com.google.common.base.Optional;
19 import com.typesafe.config.ConfigFactory;
20 import java.math.BigInteger;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Before;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
27 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
28 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
30 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
31 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
32 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
33 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
47 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
50 * End-to-end distributed data store tests that exercise remote shards and transactions.
52 * @author Thomas Pantelis
54 public class DistributedDataStoreRemotingIntegrationTest {
56 private static final String[] SHARD_NAMES = {"cars", "people"};
58 private ActorSystem leaderSystem;
59 private ActorSystem followerSystem;
61 private final DatastoreContext.Builder leaderDatastoreContextBuilder =
62 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
64 private final DatastoreContext.Builder followerDatastoreContextBuilder =
65 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(200);
67 private DistributedDataStore followerDistributedDataStore;
68 private DistributedDataStore leaderDistributedDataStore;
69 private IntegrationTestKit followerTestKit;
70 private IntegrationTestKit leaderTestKit;
73 public void setUpClass() {
74 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
75 Address member1Address = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
76 Cluster.get(leaderSystem).join(member1Address);
78 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
79 Cluster.get(followerSystem).join(member1Address);
83 public void tearDownClass() {
84 JavaTestKit.shutdownActorSystem(leaderSystem);
85 JavaTestKit.shutdownActorSystem(followerSystem);
88 private void initDatastores(String type) {
89 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
91 String moduleShardsConfig = "module-shards-member1-and-2.conf";
93 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
94 followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, SHARD_NAMES);
96 leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, true, SHARD_NAMES);
98 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
101 private void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
102 Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
103 assertEquals("isPresent", true, optional.isPresent());
105 CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME);
106 for(NormalizedNode<?, ?> entry: entries) {
107 listBuilder.withChild((MapEntryNode) entry);
110 assertEquals("Car list node", listBuilder.build(), optional.get());
113 private void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path, NormalizedNode<?, ?> expNode)
115 Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
116 assertEquals("isPresent", true, optional.isPresent());
117 assertEquals("Data node", expNode, optional.get());
120 private void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
121 Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
122 assertEquals("exists", true, exists);
126 public void testWriteTransactionWithSingleShard() throws Exception {
127 String testName = "testWriteTransactionWithSingleShard";
128 initDatastores(testName);
130 String followerCarShardName = "member-2-shard-cars-" + testName;
131 InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
133 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
134 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
136 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
137 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
139 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
140 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
141 writeTx.merge(car1Path, car1);
143 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
144 YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
145 writeTx.merge(car2Path, car2);
147 followerTestKit.doCommit(writeTx.ready());
149 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
151 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
155 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
157 writeTx.delete(car1Path);
159 followerTestKit.doCommit(writeTx.ready());
161 verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
163 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
165 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
167 // Re-instate the follower member 2 as a single-node to verify replication and recovery.
169 InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
171 JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
172 JavaTestKit.shutdownActorSystem(followerSystem, null, true);
174 ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
176 DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder).
177 setupDistributedDataStore(testName, "module-shards-member2", true, SHARD_NAMES);
179 verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
181 JavaTestKit.shutdownActorSystem(newSystem);
185 public void testReadWriteTransactionWithSingleShard() throws Exception {
186 initDatastores("testReadWriteTransactionWithSingleShard");
188 DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
189 assertNotNull("newReadWriteTransaction returned null", rwTx);
191 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
192 rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
194 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
195 rwTx.merge(CarsModel.newCarPath("optima"), car1);
197 verifyCars(rwTx, car1);
199 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
200 YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
201 rwTx.merge(car2Path, car2);
203 verifyExists(rwTx, car2Path);
205 followerTestKit.doCommit(rwTx.ready());
207 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
211 public void testWriteTransactionWithMultipleShards() throws Exception {
212 initDatastores("testWriteTransactionWithMultipleShards");
214 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
215 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
217 YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
218 NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
219 writeTx.write(carsPath, carsNode);
221 YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
222 NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
223 writeTx.write(peoplePath, peopleNode);
225 followerTestKit.doCommit(writeTx.ready());
227 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
229 verifyNode(readTx, carsPath, carsNode);
230 verifyNode(readTx, peoplePath, peopleNode);
234 public void testReadWriteTransactionWithMultipleShards() throws Exception {
235 initDatastores("testReadWriteTransactionWithMultipleShards");
237 DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
238 assertNotNull("newReadWriteTransaction returned null", rwTx);
240 YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
241 NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
242 rwTx.write(carsPath, carsNode);
244 YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
245 NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
246 rwTx.write(peoplePath, peopleNode);
248 followerTestKit.doCommit(rwTx.ready());
250 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
252 verifyNode(readTx, carsPath, carsNode);
253 verifyNode(readTx, peoplePath, peopleNode);
257 public void testTransactionChainWithSingleShard() throws Exception {
258 initDatastores("testTransactionChainWithSingleShard");
260 DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
262 // Add the top-level cars container with write-only.
264 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
265 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
267 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
271 // Verify the top-level cars container with read-only.
273 verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
275 // Perform car operations with read-write.
277 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
279 verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
281 rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
283 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
284 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
285 rwTx.write(car1Path, car1);
287 verifyExists(rwTx, car1Path);
289 verifyCars(rwTx, car1);
291 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
292 rwTx.merge(CarsModel.newCarPath("sportage"), car2);
294 rwTx.delete(car1Path);
296 followerTestKit.doCommit(rwTx.ready());
300 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
304 public void testTransactionChainWithMultipleShards() throws Exception{
305 initDatastores("testTransactionChainWithMultipleShards");
307 DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
309 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
310 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
312 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
313 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
315 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
316 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
318 followerTestKit.doCommit(writeTx.ready());
320 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
322 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
323 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
324 readWriteTx.write(carPath, car);
326 MapEntryNode person = PeopleModel.newPersonEntry("jack");
327 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
328 readWriteTx.merge(personPath, person);
330 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
331 assertEquals("isPresent", true, optional.isPresent());
332 assertEquals("Data node", car, optional.get());
334 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
335 assertEquals("isPresent", true, optional.isPresent());
336 assertEquals("Data node", person, optional.get());
338 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
340 writeTx = txChain.newWriteOnlyTransaction();
342 writeTx.delete(personPath);
344 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
346 followerTestKit.doCommit(cohort2);
347 followerTestKit.doCommit(cohort3);
351 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
352 verifyCars(readTx, car);
354 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
355 assertEquals("isPresent", false, optional.isPresent());
359 public void testReadyLocalTransactionForwardedToLeader() throws Exception {
360 initDatastores("testReadyLocalTransactionForwardedToLeader");
362 Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
363 assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
365 TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
366 dataTree.setSchemaContext(SchemaContextHelper.full());
367 DataTreeModification modification = dataTree.takeSnapshot().newModification();
369 new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
370 new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
372 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
373 new WriteModification(CarsModel.newCarPath("optima"), car).apply(modification);
375 String transactionID = "tx-1";
376 ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(transactionID , modification, true);
378 carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
379 followerTestKit.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
381 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car);