X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FDistributedDataStoreRemotingIntegrationTest.java;h=7fa19bc95f237900c3c26256865d6baea8085a01;hb=e66df4e9ae44728c178147fe2462b7138d74810a;hp=e403bbc20a2ccc5837121d6e502eaa72d2b53c2d;hpb=e66759266dc43d5f58b2837aca5047b42c205e4a;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java index e403bbc20a..7fa19bc95f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -46,7 +47,10 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import org.junit.After; @@ -67,6 +71,8 @@ import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker; import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore; import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata; +import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages; +import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -84,6 +90,7 @@ import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow; import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.client.messages.Shutdown; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; import org.opendaylight.controller.cluster.raft.messages.RequestVote; import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries; import org.opendaylight.controller.cluster.raft.persisted.Snapshot; @@ -117,6 +124,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -196,9 +204,9 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { leaderDistributedDataStore.close(); } - TestKit.shutdownActorSystem(leaderSystem); - TestKit.shutdownActorSystem(followerSystem); - TestKit.shutdownActorSystem(follower2System); + TestKit.shutdownActorSystem(leaderSystem, true); + TestKit.shutdownActorSystem(followerSystem, true); + TestKit.shutdownActorSystem(follower2System,true); InMemoryJournal.clear(); InMemorySnapshotStore.clear(); @@ -214,12 +222,19 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) throws Exception { - leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder, commitTimeout); + initDatastores(type, moduleShardsConfig, shards, leaderDatastoreContextBuilder, + followerDatastoreContextBuilder); + } + + private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards, + final DatastoreContext.Builder leaderBuilder, final DatastoreContext.Builder followerBuilder) + throws Exception { + leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout); leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); - followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder, commitTimeout); + followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout); followerDistributedDataStore = followerTestKit.setupAbstractDataStore( testParameter, type, moduleShardsConfig, false, shards); @@ -1271,10 +1286,11 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { followerTestKit.waitForMembersUp("member-1", "member-3"); follower2TestKit.waitForMembersUp("member-1", "member-2"); - TestKit.shutdownActorSystem(follower2System); + // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option + TestKit.shutdownActorSystem(follower2System, true); ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get(); - OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils() .executeOperation(cars, GetOnDemandRaftState.INSTANCE); Cluster leaderCluster = Cluster.get(leaderSystem); @@ -1364,6 +1380,137 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest { verifyNode(rwTx, CarsModel.BASE_PATH, carsNode); } + @SuppressWarnings("IllegalCatch") + @Test + public void testRaftCallbackDuringLeadershipDrop() throws Exception { + final String testName = "testRaftCallbackDuringLeadershipDrop"; + initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + + final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, + DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500) + .shardLeaderElectionTimeoutInSeconds(3600), + commitTimeout); + + final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer()); + leaderTestKit.doCommit(initialWriteTx.ready()); + + try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore( + testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) { + + final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + member2Cars.tell(new StartDropMessages(AppendEntries.class), null); + member3Cars.tell(new StartDropMessages(AppendEntries.class), null); + + final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction(); + newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode()); + final AtomicBoolean submitDone = new AtomicBoolean(false); + executor.submit(() -> { + try { + leaderTestKit.doCommit(newTx.ready()); + submitDone.set(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards() + .getLocalShards().get("cars").getActor(); + await().atMost(10, TimeUnit.SECONDS) + .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1); + + final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils() + .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE); + + // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with + // new term(switching to candidate after election timeout) + leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1, + "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1, + -1), member3Cars); + + member2Cars.tell(new StopDropMessages(AppendEntries.class), null); + member3Cars.tell(new StopDropMessages(AppendEntries.class), null); + + await("Is tx stuck in COMMIT_PENDING") + .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true)); + + } + + executor.shutdownNow(); + } + + @Test + public void testSnapshotOnRootOverwrite() throws Exception { + if (!DistributedDataStore.class.isAssignableFrom(testParameter)) { + // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate + return; + } + + final String testName = "testSnapshotOnRootOverwrite"; + final String[] shards = {"cars", "default"}; + initDatastores(testName, "module-shards-default-cars-member1-and-2.conf", shards, + leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true), + followerDatastoreContextBuilder.snapshotOnRootOverwrite(true)); + + leaderTestKit.waitForMembersUp("member-2"); + final ContainerNode rootNode = ImmutableContainerNodeBuilder.create() + .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME)) + .withChild((ContainerNode) CarsModel.create()) + .build(); + + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(1, state.getSnapshotIndex())); + + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(1, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); + + for (int i = 0; i < 10; i++) { + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i), + CarsModel.newCarEntry("car " + i, Uint64.ONE)); + } + + // fake snapshot causes the snapshotIndex to move + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(10, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(10, state.getSnapshotIndex())); + + // however the real snapshot still has not changed and was taken at index 1 + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1); + + // root overwrite so expect a snapshot + leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode); + + // this was a real snapshot so everything should be in it(1(DisableTrackingPayload) + 1 + 10 + 1) + IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars", + state -> assertEquals(12, state.getSnapshotIndex())); + IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars", + state -> assertEquals(12, state.getSnapshotIndex())); + + verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 12); + verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 12); + } + + private void verifySnapshot(final String persistenceId, final long lastAppliedIndex) { + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + List snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class); + assertEquals(1, snap.size()); + assertEquals(lastAppliedIndex, snap.get(0).getLastAppliedIndex()); + } + ); + } + private static void verifySnapshot(final Snapshot actual, final Snapshot expected, final NormalizedNode expRoot) { assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());