*/
package org.opendaylight.controller.cluster.datastore;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import akka.pattern.Patterns;
import akka.testkit.javadsl.TestKit;
import com.google.common.base.Stopwatch;
-import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
+import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
+import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 }
+ { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
});
}
}
}
+ @Test
+ public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
+ final String testName = "testWriteTransactionWithSingleShard";
+ initDatastoresWithCars(testName);
+
+ final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ int numCars = 5;
+ for (int i = 0; i < numCars; i++) {
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.newCarPath("car" + i),
+ CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+
+ followerTestKit.doCommit(writeTx.ready());
+
+ DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+ domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+ domStoreReadTransaction.close();
+ }
+
+ // wait to let the shard catch up with purged
+ await("Range set leak test").atMost(5, TimeUnit.SECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Optional<ActorRef> localShard =
+ leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+ FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+ if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ Iterator<FrontendHistoryMetadata> iterator =
+ frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+ FrontendHistoryMetadata metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
+ }
+
+ assertEquals(0, metadata.getClosedTransactions().size());
+ assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
+ metadata.getPurgedTransactions().asRanges().iterator().next());
+ } else {
+ // ask based should track no metadata
+ assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+ }
+ });
+
+ final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+ .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertTrue("isPresent", optional.isPresent());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ }
+
+ @Test
+ @Ignore("Flushes out tell based leak needs to be handled separately")
+ public void testCloseTransactionMetadataLeak() throws Exception {
+ // Ask based frontend seems to have some issues with back to back close
+ Assume.assumeTrue(testParameter.isAssignableFrom(TestClientBackedDataStore.class));
+
+ final String testName = "testWriteTransactionWithSingleShard";
+ initDatastoresWithCars(testName);
+
+ final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ int numCars = 5;
+ for (int i = 0; i < numCars; i++) {
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.close();
+
+ DOMStoreReadTransaction domStoreReadTransaction = txChain.newReadOnlyTransaction();
+ domStoreReadTransaction.read(CarsModel.BASE_PATH).get();
+
+ domStoreReadTransaction.close();
+ }
+
+ writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ followerTestKit.doCommit(writeTx.ready());
+
+ // wait to let the shard catch up with purged
+ await("Close transaction purge leak test.").atMost(5, TimeUnit.SECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ Optional<ActorRef> localShard =
+ leaderDistributedDataStore.getActorUtils().findLocalShard("cars");
+ FrontendShardDataTreeSnapshotMetadata frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard.get(), new RequestFrontendMetadata());
+
+ if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ Iterator<FrontendHistoryMetadata> iterator =
+ frontendMetadata.getClients().get(0).getCurrentHistories().iterator();
+ FrontendHistoryMetadata metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
+ }
+
+ Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
+
+ assertEquals(0, metadata.getClosedTransactions().size());
+ assertEquals(1, ranges.size());
+ } else {
+ // ask based should track no metadata
+ assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
+ }
+ });
+
+ final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+ .read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertTrue("isPresent", optional.isPresent());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ }
+
@Test
public void testReadWriteTransactionWithSingleShard() throws Exception {
initDatastoresWithCars("testReadWriteTransactionWithSingleShard");
initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
- final com.google.common.base.Optional<ActorRef> carsFollowerShard =
+ final Optional<ActorRef> carsFollowerShard =
followerDistributedDataStore.getActorUtils().findLocalShard("cars");
assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
- final com.google.common.base.Optional<ActorRef> carsFollowerShard =
+ final Optional<ActorRef> carsFollowerShard =
followerDistributedDataStore.getActorUtils().findLocalShard("cars");
assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
@Test
public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+ // FIXME: remove when test passes also for ClientBackedDataStore
+ Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
followerDatastoreContextBuilder.shardBatchedModificationCount(2);
leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 1, stats.getReadWriteTransactionCount()));
+ stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
// Disable elections on the leader so it switches to follower.
@Test
public void testLeadershipTransferOnShutdown() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+ // FIXME: remove when test passes also for ClientBackedDataStore
+ Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
final String testName = "testLeadershipTransferOnShutdown";
@Test
public void testTransactionWithIsolatedLeader() throws Exception {
- //TODO remove when test passes also for ClientBackedDataStore
- Assume.assumeTrue(testParameter.equals(DistributedDataStore.class));
+ // FIXME: remove when test passes also for ClientBackedDataStore
+ Assume.assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
// Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
final String testName = "testTransactionWithIsolatedLeader";
fail("Exception expected");
} catch (final ExecutionException e) {
final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
- if (DistributedDataStore.class.equals(testParameter)) {
+ if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
|| e.getCause() instanceof ShardLeaderNotRespondingException);
} else {
fail("Exception expected");
} catch (final ExecutionException e) {
final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
- if (DistributedDataStore.class.equals(testParameter)) {
+ if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
} else {
assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
@Test
public void testReadWriteMessageSlicing() throws Exception {
// The slicing is only implemented for tell-based protocol
- Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+ Assume.assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
followerDatastoreContextBuilder.maximumMessageSliceSize(100);