2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.fail;
13 import static org.mockito.Matchers.any;
14 import static org.mockito.Matchers.eq;
15 import static org.mockito.Mockito.timeout;
16 import static org.mockito.Mockito.verify;
17 import akka.actor.ActorRef;
18 import akka.actor.ActorSystem;
19 import akka.actor.Address;
20 import akka.actor.AddressFromURIString;
21 import akka.cluster.Cluster;
22 import akka.testkit.JavaTestKit;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.util.concurrent.MoreExecutors;
26 import com.typesafe.config.ConfigFactory;
27 import java.math.BigInteger;
28 import java.util.concurrent.TimeUnit;
29 import org.junit.After;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.mockito.Mockito;
33 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
35 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
36 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
37 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
38 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
39 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
40 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
41 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
42 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
43 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
44 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
45 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
46 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
47 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
48 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
49 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
50 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
51 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
52 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
53 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
54 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
55 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
56 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
57 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
58 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
60 import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
61 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
62 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
63 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
64 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
67 * End-to-end distributed data store tests that exercise remote shards and transactions.
69 * @author Thomas Pantelis
71 public class DistributedDataStoreRemotingIntegrationTest {
73 private static final String[] SHARD_NAMES = {"cars", "people"};
75 private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
76 private static final Address MEMBER_2_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2559");
78 private static final String MODULE_SHARDS_CONFIG = "module-shards-member1-and-2.conf";
80 private ActorSystem leaderSystem;
81 private ActorSystem followerSystem;
83 private final DatastoreContext.Builder leaderDatastoreContextBuilder =
84 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1);
86 private final DatastoreContext.Builder followerDatastoreContextBuilder =
87 DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
89 private DistributedDataStore followerDistributedDataStore;
90 private DistributedDataStore leaderDistributedDataStore;
91 private IntegrationTestKit followerTestKit;
92 private IntegrationTestKit leaderTestKit;
95 public void setUpClass() {
96 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
97 Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
99 followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
100 Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
104 public void tearDownClass() {
105 JavaTestKit.shutdownActorSystem(leaderSystem);
106 JavaTestKit.shutdownActorSystem(followerSystem);
109 private void initDatastores(String type) {
110 leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
112 followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
113 followerDistributedDataStore = followerTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
115 leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
117 leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), SHARD_NAMES);
120 private void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
121 Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
122 assertEquals("isPresent", true, optional.isPresent());
124 CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME);
125 for(NormalizedNode<?, ?> entry: entries) {
126 listBuilder.withChild((MapEntryNode) entry);
129 assertEquals("Car list node", listBuilder.build(), optional.get());
132 private void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path, NormalizedNode<?, ?> expNode)
134 Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
135 assertEquals("isPresent", true, optional.isPresent());
136 assertEquals("Data node", expNode, optional.get());
139 private void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
140 Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
141 assertEquals("exists", true, exists);
145 public void testWriteTransactionWithSingleShard() throws Exception {
146 String testName = "testWriteTransactionWithSingleShard";
147 initDatastores(testName);
149 String followerCarShardName = "member-2-shard-cars-" + testName;
150 InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 2, ApplyJournalEntries.class );
152 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
153 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
155 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
156 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
158 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
159 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
160 writeTx.merge(car1Path, car1);
162 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
163 YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
164 writeTx.merge(car2Path, car2);
166 followerTestKit.doCommit(writeTx.ready());
168 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
170 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
174 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
176 writeTx.delete(car1Path);
178 followerTestKit.doCommit(writeTx.ready());
180 verifyExists(followerDistributedDataStore.newReadOnlyTransaction(), car2Path);
182 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
184 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car2);
186 // Re-instate the follower member 2 as a single-node to verify replication and recovery.
188 InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
190 JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
191 JavaTestKit.shutdownActorSystem(followerSystem, null, true);
193 ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
195 DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder).
196 setupDistributedDataStore(testName, "module-shards-member2", true, SHARD_NAMES);
198 verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
200 JavaTestKit.shutdownActorSystem(newSystem);
204 public void testReadWriteTransactionWithSingleShard() throws Exception {
205 initDatastores("testReadWriteTransactionWithSingleShard");
207 DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
208 assertNotNull("newReadWriteTransaction returned null", rwTx);
210 rwTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
211 rwTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
213 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
214 rwTx.merge(CarsModel.newCarPath("optima"), car1);
216 verifyCars(rwTx, car1);
218 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
219 YangInstanceIdentifier car2Path = CarsModel.newCarPath("sportage");
220 rwTx.merge(car2Path, car2);
222 verifyExists(rwTx, car2Path);
224 followerTestKit.doCommit(rwTx.ready());
226 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1, car2);
230 public void testWriteTransactionWithMultipleShards() throws Exception {
231 initDatastores("testWriteTransactionWithMultipleShards");
233 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
234 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
236 YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
237 NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
238 writeTx.write(carsPath, carsNode);
240 YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
241 NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
242 writeTx.write(peoplePath, peopleNode);
244 followerTestKit.doCommit(writeTx.ready());
246 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
248 verifyNode(readTx, carsPath, carsNode);
249 verifyNode(readTx, peoplePath, peopleNode);
253 public void testReadWriteTransactionWithMultipleShards() throws Exception {
254 initDatastores("testReadWriteTransactionWithMultipleShards");
256 DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
257 assertNotNull("newReadWriteTransaction returned null", rwTx);
259 YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
260 NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
261 rwTx.write(carsPath, carsNode);
263 YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
264 NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
265 rwTx.write(peoplePath, peopleNode);
267 followerTestKit.doCommit(rwTx.ready());
269 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
271 verifyNode(readTx, carsPath, carsNode);
272 verifyNode(readTx, peoplePath, peopleNode);
276 public void testTransactionChainWithSingleShard() throws Exception {
277 initDatastores("testTransactionChainWithSingleShard");
279 DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
281 // Add the top-level cars container with write-only.
283 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
284 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
286 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
290 // Verify the top-level cars container with read-only.
292 verifyNode(txChain.newReadOnlyTransaction(), CarsModel.BASE_PATH, CarsModel.emptyContainer());
294 // Perform car operations with read-write.
296 DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
298 verifyNode(rwTx, CarsModel.BASE_PATH, CarsModel.emptyContainer());
300 rwTx.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
302 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
303 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
304 rwTx.write(car1Path, car1);
306 verifyExists(rwTx, car1Path);
308 verifyCars(rwTx, car1);
310 MapEntryNode car2 = CarsModel.newCarEntry("sportage", BigInteger.valueOf(25000));
311 rwTx.merge(CarsModel.newCarPath("sportage"), car2);
313 rwTx.delete(car1Path);
315 followerTestKit.doCommit(rwTx.ready());
319 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
323 public void testTransactionChainWithMultipleShards() throws Exception{
324 initDatastores("testTransactionChainWithMultipleShards");
326 DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
328 DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
329 assertNotNull("newWriteOnlyTransaction returned null", writeTx);
331 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
332 writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
334 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
335 writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
337 followerTestKit.doCommit(writeTx.ready());
339 DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
341 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
342 YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
343 readWriteTx.write(carPath, car);
345 MapEntryNode person = PeopleModel.newPersonEntry("jack");
346 YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
347 readWriteTx.merge(personPath, person);
349 Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
350 assertEquals("isPresent", true, optional.isPresent());
351 assertEquals("Data node", car, optional.get());
353 optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
354 assertEquals("isPresent", true, optional.isPresent());
355 assertEquals("Data node", person, optional.get());
357 DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
359 writeTx = txChain.newWriteOnlyTransaction();
361 writeTx.delete(personPath);
363 DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
365 followerTestKit.doCommit(cohort2);
366 followerTestKit.doCommit(cohort3);
370 DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
371 verifyCars(readTx, car);
373 optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
374 assertEquals("isPresent", false, optional.isPresent());
378 public void testChainedTransactionFailureWithSingleShard() throws Exception {
379 initDatastores("testChainedTransactionFailureWithSingleShard");
381 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
382 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
383 LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
384 MoreExecutors.directExecutor());
386 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
387 DOMTransactionChain txChain = broker.createTransactionChain(listener);
389 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
391 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
392 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
393 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
395 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
398 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
399 fail("Expected TransactionCommitFailedException");
400 } catch (TransactionCommitFailedException e) {
404 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
411 public void testChainedTransactionFailureWithMultipleShards() throws Exception {
412 initDatastores("testChainedTransactionFailureWithMultipleShards");
414 ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
415 ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
416 LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
417 MoreExecutors.directExecutor());
419 TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
420 DOMTransactionChain txChain = broker.createTransactionChain(listener);
422 DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
424 writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
426 ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
427 new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)).
428 withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
430 // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
431 // done for put for performance reasons.
432 writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
435 writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
436 fail("Expected TransactionCommitFailedException");
437 } catch (TransactionCommitFailedException e) {
441 verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
448 public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
449 String testName = "testSingleShardTransactionsWithLeaderChanges";
450 initDatastores(testName);
452 String followerCarShardName = "member-2-shard-cars-" + testName;
453 InMemoryJournal.addWriteMessagesCompleteLatch(followerCarShardName, 1, ApplyJournalEntries.class );
455 // Write top-level car container from the follower so it uses a remote Tx.
457 DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
459 writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
460 writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
462 followerTestKit.doCommit(writeTx.ready());
464 InMemoryJournal.waitForWriteMessagesComplete(followerCarShardName);
466 // Switch the leader to the follower
468 followerDatastoreContextBuilder.shardElectionTimeoutFactor(1);
469 followerDistributedDataStore.onDatastoreContextUpdated(followerDatastoreContextBuilder.build());
471 JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
473 followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
475 leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
476 Cluster.get(leaderSystem).join(MEMBER_2_ADDRESS);
478 DatastoreContext.Builder newMember1Builder = DatastoreContext.newBuilder().
479 shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
480 IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
481 DistributedDataStore newMember1Datastore = newMember1TestKit.
482 setupDistributedDataStore(testName, MODULE_SHARDS_CONFIG, false, SHARD_NAMES);
484 followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), SHARD_NAMES);
486 // Write a car entry to the new leader - should switch to local Tx
488 writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
490 MapEntryNode car1 = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
491 YangInstanceIdentifier car1Path = CarsModel.newCarPath("optima");
492 writeTx.merge(car1Path, car1);
494 followerTestKit.doCommit(writeTx.ready());
496 verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car1);
500 public void testReadyLocalTransactionForwardedToLeader() throws Exception {
501 initDatastores("testReadyLocalTransactionForwardedToLeader");
503 Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext().findLocalShard("cars");
504 assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
506 TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
507 dataTree.setSchemaContext(SchemaContextHelper.full());
508 DataTreeModification modification = dataTree.takeSnapshot().newModification();
510 new WriteModification(CarsModel.BASE_PATH, CarsModel.emptyContainer()).apply(modification);
511 new MergeModification(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()).apply(modification);
513 MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
514 new WriteModification(CarsModel.newCarPath("optima"), car).apply(modification);
516 String transactionID = "tx-1";
517 ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(transactionID , modification, true);
519 carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
520 followerTestKit.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
522 verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car);