X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=526b1d47e11d2c2f1123f6597261392fed053691;hb=a6af137c30470b86d4bc624d4c48cb686495a182;hp=9b97ee783c553a5075dc807a69a34ef2747ff5bb;hpb=898b12df2bde612b152742677bc9739b277ab047;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index 9b97ee783c..526b1d47e1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * Copyright (c) 2015, 2017 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -24,8 +24,7 @@ import akka.actor.AddressFromURIString; import akka.cluster.Cluster; import akka.dispatch.Futures; import akka.pattern.Patterns; -import akka.testkit.JavaTestKit; -import com.google.common.base.Optional; +import akka.testkit.javadsl.TestKit; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Throwables; @@ -40,6 +39,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -53,6 +53,7 @@ import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; import org.mockito.Mockito; import org.mockito.stubbing.Answer; +import org.opendaylight.controller.cluster.access.client.RequestTimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore; import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; @@ -68,6 +69,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot; import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState; +import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; @@ -78,26 +80,24 @@ import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; -import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction; -import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; +import org.opendaylight.mdsal.common.api.TransactionChainListener; +import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction; +import org.opendaylight.mdsal.dom.api.DOMTransactionChain; +import org.opendaylight.mdsal.dom.spi.store.DOMStore; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain; +import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; -import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; @@ -117,7 +117,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Parameters(name = "{0}") public static Collection data() { return Arrays.asList(new Object[][] { - { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 60 } + { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 } }); } @@ -137,6 +137,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf"; private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf"; private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf"; + private static final String MODULE_SHARDS_CARS_1_2_3 = "module-shards-cars-member-1-and-2-and-3.conf"; private ActorSystem leaderSystem; private ActorSystem followerSystem; @@ -180,9 +181,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderDistributedDataStore.close(); } - JavaTestKit.shutdownActorSystem(leaderSystem); - JavaTestKit.shutdownActorSystem(followerSystem); - JavaTestKit.shutdownActorSystem(follower2System); + TestKit.shutdownActorSystem(leaderSystem); + TestKit.shutdownActorSystem(followerSystem); + TestKit.shutdownActorSystem(follower2System); InMemoryJournal.clear(); InMemorySnapshotStore.clear(); @@ -208,6 +209,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { testParameter, type, moduleShardsConfig, false, shards); leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards); + + leaderTestKit.waitForMembersUp("member-2"); + followerTestKit.waitForMembersUp("member-1"); } private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries) @@ -311,8 +315,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); } - JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE); - JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE); + TestKit.shutdownActorSystem(leaderSystem, Boolean.TRUE); + TestKit.shutdownActorSystem(followerSystem, Boolean.TRUE); final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2"); @@ -511,7 +515,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); - final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME)) @@ -520,9 +524,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); try { - writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + writeTx.commit().get(5, TimeUnit.SECONDS); fail("Expected TransactionCommitFailedException"); - } catch (final TransactionCommitFailedException e) { + } catch (final ExecutionException e) { // Expected } @@ -544,7 +548,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class); final DOMTransactionChain txChain = broker.createTransactionChain(listener); - final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); + final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction(); writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer()); @@ -557,9 +561,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData); try { - writeTx.submit().checkedGet(5, TimeUnit.SECONDS); + writeTx.commit().get(5, TimeUnit.SECONDS); fail("Expected TransactionCommitFailedException"); - } catch (final TransactionCommitFailedException e) { + } catch (final ExecutionException e) { // Expected } @@ -571,6 +575,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testSingleShardTransactionsWithLeaderChanges() throws Exception { + followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2); final String testName = "testSingleShardTransactionsWithLeaderChanges"; initDatastoresWithCars(testName); @@ -593,7 +598,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder .shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); - JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + TestKit.shutdownActorSystem(leaderSystem, true); Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS); @@ -605,7 +610,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout); - try (final AbstractDataStore ds = + try (AbstractDataStore ds = newMember1TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) { @@ -631,12 +636,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader"); followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); - final Optional carsFollowerShard = followerDistributedDataStore.getActorContext() - .findLocalShard("cars"); + final com.google.common.base.Optional carsFollowerShard = + followerDistributedDataStore.getActorContext().findLocalShard("cars"); assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent()); - final TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL); - dataTree.setSchemaContext(SchemaContextHelper.full()); + final DataTree dataTree = new InMemoryDataTreeFactory().create( + DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full()); // Send a tx with immediate commit. @@ -648,7 +653,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification); modification.ready(); - ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true); + ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, + java.util.Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -667,7 +673,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification); modification.ready(); - readyLocal = new ReadyLocalTransaction(tx2 , modification, false); + readyLocal = new ReadyLocalTransaction(tx2 , modification, false, java.util.Optional.empty()); carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -698,8 +704,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader"); followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars"); - final Optional carsFollowerShard = followerDistributedDataStore.getActorContext() - .findLocalShard("cars"); + final com.google.common.base.Optional carsFollowerShard = + followerDistributedDataStore.getActorContext().findLocalShard("cars"); assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent()); carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef()); @@ -716,7 +722,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true); + Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true, + java.util.Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); Object resp = followerTestKit.expectMsgClass(Object.class); @@ -736,7 +743,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction( - Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false); + Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false, + java.util.Optional.empty()); carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef()); resp = followerTestKit.expectMsgClass(Object.class); @@ -779,10 +787,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Wait for the commit to be replicated to the follower. MemberNode.verifyRaftState(followerDistributedDataStore, "cars", - raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied())); + raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied())); MemberNode.verifyRaftState(followerDistributedDataStore, "people", - raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied())); + raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied())); // Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in // the leader shard. @@ -842,7 +850,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()) .shardElectionTimeoutFactor(10)); - Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); leaderTestKit.waitUntilNoLeader(leaderDistributedDataStore.getActorContext(), "cars"); // Submit all tx's - the messages should get queued for retry. @@ -857,6 +864,10 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder .customRaftPolicyImplementation(null).shardElectionTimeoutFactor(1)); + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "cars") + .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); + IntegrationTestKit.findLocalShard(followerDistributedDataStore.getActorContext(), "people") + .tell(TimeoutNow.INSTANCE, ActorRef.noSender()); followerTestKit.doCommit(writeTx1CanCommit, writeTx1Cohort); followerTestKit.doCommit(writeTx2CanCommit, writeTx2Cohort); @@ -881,9 +892,12 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(100), commitTimeout); - try (final AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( + try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) { + followerTestKit.waitForMembersUp("member-3"); + follower2TestKit.waitForMembersUp("member-1", "member-2"); + // Create and submit a couple tx's so they're pending. DOMStoreWriteTransaction writeTx = followerDistributedDataStore.newWriteOnlyTransaction(); @@ -988,17 +1002,18 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { @Test public void testTransactionWithShardLeaderNotResponding() throws Exception { + followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2); followerDatastoreContextBuilder.shardElectionTimeoutFactor(50); initDatastoresWithCars("testTransactionWithShardLeaderNotResponding"); // Do an initial read to get the primary shard info cached. final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); - readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); // Shutdown the leader and try to create a new tx. - JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + TestKit.shutdownActorSystem(leaderSystem, true); followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1); sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder); @@ -1012,23 +1027,28 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { fail("Exception expected"); } catch (final ExecutionException e) { final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); - assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException - || e.getCause() instanceof ShardLeaderNotRespondingException); + if (DistributedDataStore.class.equals(testParameter)) { + assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException + || e.getCause() instanceof ShardLeaderNotRespondingException); + } else { + assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); + } } } @Test public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception { + followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2); initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader"); // Do an initial read to get the primary shard info cached. final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); - readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); // Shutdown the leader and try to create a new tx. - JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + TestKit.shutdownActorSystem(leaderSystem, true); Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); @@ -1045,25 +1065,29 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerTestKit.doCommit(rwTx.ready()); fail("Exception expected"); } catch (final ExecutionException e) { - final String msg = "Expected instance of NoShardLeaderException, actual: \n" - + Throwables.getStackTraceAsString(e.getCause()); - assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); + final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause()); + if (DistributedDataStore.class.equals(testParameter)) { + assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException); + } else { + assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException); + } } } @Test public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception { + followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2); String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx"; - initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS); + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder() - .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5); + .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10); final IntegrationTestKit follower2TestKit = new IntegrationTestKit( follower2System, follower2DatastoreContextBuilder, commitTimeout); - try (final AbstractDataStore ds = + try (AbstractDataStore ds = follower2TestKit.setupAbstractDataStore( - testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) { + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) { followerTestKit.waitForMembersUp("member-1", "member-3"); follower2TestKit.waitForMembersUp("member-1", "member-2"); @@ -1071,16 +1095,16 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Do an initial read to get the primary shard info cached. final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction(); - readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); // Shutdown the leader and try to create a new tx. - JavaTestKit.shutdownActorSystem(leaderSystem, null, true); + TestKit.shutdownActorSystem(leaderSystem, true); Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS); sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder - .operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null)); + .operationTimeoutInMillis(500).shardElectionTimeoutFactor(5).customRaftPolicyImplementation(null)); final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); @@ -1099,8 +1123,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { // Setup a saved snapshot on the leader. The follower will startup with no data and the leader should // install a snapshot to sync the follower. - TipProducingDataTree tree = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION); - tree.setSchemaContext(SchemaContextHelper.full()); + DataTree tree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_CONFIGURATION, + SchemaContextHelper.full()); final ContainerNode carsNode = CarsModel.newCarsNode( CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000)))); @@ -1118,7 +1142,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initDatastoresWithCars(testName); final Optional> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read( - CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS); + CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS); assertEquals("isPresent", true, readOptional.isPresent()); assertEquals("Node", carsNode, readOptional.get()); @@ -1129,6 +1153,23 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { initialSnapshot, snapshotRoot); } + @Test + public void testReadWriteMessageSlicing() throws Exception { + // The slicing is only implemented for tell-based protocol + Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class)); + + leaderDatastoreContextBuilder.maximumMessageSliceSize(100); + followerDatastoreContextBuilder.maximumMessageSliceSize(100); + initDatastoresWithCars("testLargeReadReplySlicing"); + + final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction(); + + final NormalizedNode carsNode = CarsModel.create(); + rwTx.write(CarsModel.BASE_PATH, carsNode); + + verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());