2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.AddressFromURIString;
16 import akka.cluster.Cluster;
17 import akka.testkit.JavaTestKit;
18 import com.google.common.base.Optional;
19 import com.typesafe.config.ConfigFactory;
20 import java.math.BigInteger;
21 import java.util.concurrent.TimeUnit;
22 import org.junit.After;
23 import org.junit.Before;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
27 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
28 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
29 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
30 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
31 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
32 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
33 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
41 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
45 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
46 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
47 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
50 * End-to-end distributed data store tests that exercise remote shards and transactions.
52 * @author Thomas Pantelis
54 public class DistributedDataStoreRemotingIntegrationTest {
56 private static final String[] SHARD_NAMES = {"cars", "people"};
58 private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
59 private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
61 private static final String MODULE_SHARDS_CONFIG = "module-shards-member1-and-2.conf";
63 private ActorSystem leaderSystem;
64 private ActorSystem followerSystem;
66 private final DatastoreContext.Builder leaderDatastoreContextBuilder =
67 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
69 private final DatastoreContext.Builder followerDatastoreContextBuilder =
70 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
72 private DistributedDataStore followerDistributedDataStore;
73 private DistributedDataStore leaderDistributedDataStore;
74 private IntegrationTestKit followerTestKit;
75 private IntegrationTestKit leaderTestKit;
78 public void setUpClass() {
79 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
80 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
82 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
83 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
87 public void tearDownClass() {
88 JavaTestKit.shutdownActorSystem(leaderSystem);
89 JavaTestKit.shutdownActorSystem(followerSystem);
92 private void initDatastores(String type) {
93 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
95 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
96 followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
98 leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
100 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
103 private void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
104 Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
105 assertEquals("isPresent", true, optional.isPresent());
107 CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME);
108 for(NormalizedNode<?, ?> entry: entries) {
109 listBuilder.withChild((MapEntryNode) entry);
112 assertEquals("Car list node", listBuilder.build(), optional.get());
115 private void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path, NormalizedNode<?, ?> expNode)
117 Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
118 assertEquals("isPresent", true, optional.isPresent());
119 assertEquals("Data node", expNode, optional.get());
122 private void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
123 Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
124 assertEquals("exists", true, exists);
128 public void testWriteTransactionWithSingleShard() throws Exception {
129 String testName = "testWriteTransactionWithSingleShard";
130 initDatastores(testName);
132 String followerCarShardName = "member-2-shard-cars-" + testName;
133 InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
135 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
136 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
138 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
139 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
141 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
142 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
143 writeTx.merge(car1Path, car1);
145 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
146 YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
147 writeTx.merge(car2Path, car2);
149 followerTestKit.doCommit(writeTx.ready());
151 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
153 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
157 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
159 writeTx.delete(car1Path);
161 followerTestKit.doCommit(writeTx.ready());
163 verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
165 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
167 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
169 // Re-instate the follower member 2 as a single-node to verify replication and recovery.
171 InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
173 JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
174 JavaTestKit.shutdownActorSystem(followerSystem, null, true);
176 ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
178 DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder).
179 setupDistributedDataStore(testName, "module-shards-member2", true, SHARD_NAMES);
181 verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
183 JavaTestKit.shutdownActorSystem(newSystem);
187 public void testReadWriteTransactionWithSingleShard() throws Exception {
188 initDatastores("testReadWriteTransactionWithSingleShard");
190 DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
191 assertNotNull("newReadWriteTransaction returned null", rwTx);
193 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
194 rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
196 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
197 rwTx.merge(CarsModel.newCarPath("optima"), car1);
199 verifyCars(rwTx, car1);
201 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
202 YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
203 rwTx.merge(car2Path, car2);
205 verifyExists(rwTx, car2Path);
207 followerTestKit.doCommit(rwTx.ready());
209 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
213 public void testWriteTransactionWithMultipleShards() throws Exception {
214 initDatastores("testWriteTransactionWithMultipleShards");
216 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
217 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
219 YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
220 NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
221 writeTx.write(carsPath, carsNode);
223 YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
224 NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
225 writeTx.write(peoplePath, peopleNode);
227 followerTestKit.doCommit(writeTx.ready());
229 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
231 verifyNode(readTx, carsPath, carsNode);
232 verifyNode(readTx, peoplePath, peopleNode);
236 public void testReadWriteTransactionWithMultipleShards() throws Exception {
237 initDatastores("testReadWriteTransactionWithMultipleShards");
239 DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
240 assertNotNull("newReadWriteTransaction returned null", rwTx);
242 YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
243 NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
244 rwTx.write(carsPath, carsNode);
246 YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
247 NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
248 rwTx.write(peoplePath, peopleNode);
250 followerTestKit.doCommit(rwTx.ready());
252 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
254 verifyNode(readTx, carsPath, carsNode);
255 verifyNode(readTx, peoplePath, peopleNode);
259 public void testTransactionChainWithSingleShard() throws Exception {
260 initDatastores("testTransactionChainWithSingleShard");
262 DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
264 // Add the top-level cars container with write-only.
266 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
267 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
269 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
273 // Verify the top-level cars container with read-only.
275 verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
277 // Perform car operations with read-write.
279 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
281 verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
283 rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
285 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
286 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
287 rwTx.write(car1Path, car1);
289 verifyExists(rwTx, car1Path);
291 verifyCars(rwTx, car1);
293 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
294 rwTx.merge(CarsModel.newCarPath("sportage"), car2);
296 rwTx.delete(car1Path);
298 followerTestKit.doCommit(rwTx.ready());
302 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
306 public void testTransactionChainWithMultipleShards() throws Exception{
307 initDatastores("testTransactionChainWithMultipleShards");
309 DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
311 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
312 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
314 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
315 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
317 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
318 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
320 followerTestKit.doCommit(writeTx.ready());
322 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
324 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
325 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
326 readWriteTx.write(carPath, car);
328 MapEntryNode person = PeopleModel.newPersonEntry("jack");
329 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
330 readWriteTx.merge(personPath, person);
332 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
333 assertEquals("isPresent", true, optional.isPresent());
334 assertEquals("Data node", car, optional.get());
336 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
337 assertEquals("isPresent", true, optional.isPresent());
338 assertEquals("Data node", person, optional.get());
340 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
342 writeTx = txChain.newWriteOnlyTransaction();
344 writeTx.delete(personPath);
346 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
348 followerTestKit.doCommit(cohort2);
349 followerTestKit.doCommit(cohort3);
353 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
354 verifyCars(readTx, car);
356 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
357 assertEquals("isPresent", false, optional.isPresent());
361 public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
362 String testName = "testSingleShardTransactionsWithLeaderChanges";
363 initDatastores(testName);
365 String followerCarShardName = "member-2-shard-cars-" + testName;
366 InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
368 // Write top-level car container from the follower so it uses a remote Tx.
370 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
372 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
373 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
375 followerTestKit.doCommit(writeTx.ready());
377 InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
379 // Switch the leader to the follower
381 followerDatastoreContextBuilder.shardElectionTimeoutFactor(1);
382 followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
384 JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
386 followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
388 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
389 Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
391 DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
392 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
393 IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
394 DistributedDataStore newMember1Datastore = newMember1TestKit.
395 setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
397 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
399 // Write a car entry to the new leader - should switch to local Tx
401 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
403 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
404 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
405 writeTx.merge(car1Path, car1);
407 followerTestKit.doCommit(writeTx.ready());
409 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
413 public void testReadyLocalTransactionForwardedToLeader() throws Exception {
414 initDatastores("testReadyLocalTransactionForwardedToLeader");
416 Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
417 assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
419 TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
420 dataTree.setSchemaContext(SchemaContextHelper.full());
421 DataTreeModification modification = dataTree.takeSnapshot().newModification();
423 new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
424 new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
426 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
427 new WriteModification(CarsModel.newCarPath("optima"), car).apply(modification);
429 String transactionID = "tx-1";
430 ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(transactionID , modification, true);
432 carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
433 followerTestKit.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
435 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car);