package org.opendaylight.controller.cluster.datastore;
import static org.awaitility.Awaitility.await;
-import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
import org.opendaylight.mdsal.dom.spi.store.DOMStore;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
}
@Parameter(0)
- public Class<? extends AbstractDataStore> testParameter;
+ public Class<? extends ClientBackedDataStore> testParameter;
@Parameter(1)
public int commitTimeout;
private final TransactionIdentifier tx1 = nextTransactionId();
private final TransactionIdentifier tx2 = nextTransactionId();
- private AbstractDataStore followerDistributedDataStore;
- private AbstractDataStore leaderDistributedDataStore;
+ private ClientBackedDataStore followerDistributedDataStore;
+ private ClientBackedDataStore leaderDistributedDataStore;
private IntegrationTestKit followerTestKit;
private IntegrationTestKit leaderTestKit;
@After
public void tearDown() {
if (followerDistributedDataStore != null) {
- leaderDistributedDataStore.close();
+ followerDistributedDataStore.close();
}
if (leaderDistributedDataStore != null) {
leaderDistributedDataStore.close();
throws Exception {
leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout);
- leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
- testParameter, type, moduleShardsConfig, false, shards);
+ leaderDistributedDataStore = leaderTestKit.setupDataStore(testParameter, type, moduleShardsConfig, false,
+ shards);
followerTestKit = new IntegrationTestKit(followerSystem, followerBuilder, commitTimeout);
- followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
+ followerDistributedDataStore = followerTestKit.setupDataStore(
testParameter, type, moduleShardsConfig, false, shards);
leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(), shards);
final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
- try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder,
- commitTimeout)
- .setupAbstractDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
+ try (var member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder, commitTimeout)
+ .setupDataStore(testParameter, testName, "module-shards-member2", true, CARS)) {
verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
}
}
// wait to let the shard catch up with purged
await("Range set leak test").atMost(5, TimeUnit.SECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> {
- final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
- .orElseThrow();
- final var frontendMetadata =
- (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
- .executeOperation(localShard, new RequestFrontendMetadata());
-
- final var clientMeta = frontendMetadata.getClients().get(0);
- if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
- assertTellClientMetadata(clientMeta, numCars * 2);
- } else {
- assertAskClientMetadata(clientMeta);
- }
- });
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
+ final var frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard, new RequestFrontendMetadata());
+
+ assertClientMetadata(frontendMetadata.getClients().get(0), numCars * 2);
+ });
try (var tx = txChain.newReadOnlyTransaction()) {
- final var body = tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body();
- assertThat(body, instanceOf(Collection.class));
+ final var body = assertInstanceOf(Collection.class,
+ tx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS).orElseThrow().body());
assertEquals(numCars, ((Collection<?>) body).size());
}
}
- private static void assertAskClientMetadata(final FrontendClientMetadata clientMeta) {
- // ask based should track no metadata
- assertEquals(List.of(), clientMeta.getCurrentHistories());
- }
-
- private static void assertTellClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
+ private static void assertClientMetadata(final FrontendClientMetadata clientMeta, final long lastPurged) {
final var iterator = clientMeta.getCurrentHistories().iterator();
var metadata = iterator.next();
while (iterator.hasNext() && metadata.getHistoryId() != 1) {
@Test
public void testCloseTransactionMetadataLeak() throws Exception {
- // FIXME: CONTROLLER-2016: ask-based frontend triggers this:
- //
- // java.lang.IllegalStateException: Previous transaction
- // member-2-datastore-testCloseTransactionMetadataLeak-fe-0-chn-1-txn-1-0 is not ready yet
- // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy$Allocated.checkReady()
- // at org.opendaylight.controller.cluster.datastore.TransactionChainProxy.newReadOnlyTransaction()
- assumeTrue(testParameter.isAssignableFrom(ClientBackedDataStore.class));
-
initDatastoresWithCars("testCloseTransactionMetadataLeak");
- final DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+ final var txChain = followerDistributedDataStore.createTransactionChain();
- DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ var writeTx = txChain.newWriteOnlyTransaction();
writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
followerTestKit.doCommit(writeTx.ready());
// wait to let the shard catch up with purged
await("wait for purges to settle").atMost(5, TimeUnit.SECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .untilAsserted(() -> {
- final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars")
- .orElseThrow();
- final var frontendMetadata =
- (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
- .executeOperation(localShard, new RequestFrontendMetadata());
-
- final var clientMeta = frontendMetadata.getClients().get(0);
- if (leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
- assertTellClientMetadata(clientMeta, numCars * 2);
- } else {
- assertAskClientMetadata(clientMeta);
- }
- });
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> {
+ final var localShard = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
+ final var frontendMetadata =
+ (FrontendShardDataTreeSnapshotMetadata) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(localShard, new RequestFrontendMetadata());
+
+ assertClientMetadata(frontendMetadata.getClients().get(0), numCars * 2);
+ });
}
@Test
LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
MoreExecutors.directExecutor());
- final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class);
- final DOMTransactionChain txChain = broker.createTransactionChain(listener);
+ final var listener = mock(FutureCallback.class);
+ final DOMTransactionChain txChain = broker.createTransactionChain();
+ txChain.addCallback(listener);
final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
.withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
.build());
- final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
- .getCause();
- assertThat(ex, instanceOf(TransactionCommitFailedException.class));
+ final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
+ assertInstanceOf(TransactionCommitFailedException.class, ex.getCause());
- verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
+ verify(listener, timeout(5000)).onFailure(any());
txChain.close();
broker.close();
public void testChainedTransactionFailureWithMultipleShards() throws Exception {
initDatastoresWithCarsAndPeople("testChainedTransactionFailureWithMultipleShards");
- final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
- ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
- LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore).build(),
- MoreExecutors.directExecutor());
+ try (var broker = new ConcurrentDOMDataBroker(
+ Map.of(LogicalDatastoreType.CONFIGURATION, followerDistributedDataStore), MoreExecutors.directExecutor())) {
- final DOMTransactionChainListener listener = mock(DOMTransactionChainListener.class);
- final DOMTransactionChain txChain = broker.createTransactionChain(listener);
+ final var listener = mock(FutureCallback.class);
+ final DOMTransactionChain txChain = broker.createTransactionChain();
+ txChain.addCallback(listener);
- final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
- writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
- // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
- // done for put for performance reasons.
- writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
- .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
- .build());
+ // Note that merge will validate the data and fail but put succeeds b/c deep validation is not
+ // done for put for performance reasons.
+ writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+ .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+ .build());
- final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
- .getCause();
- assertThat(ex, instanceOf(TransactionCommitFailedException.class));
+ final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
+ .getCause();
+ assertThat(ex, instanceOf(TransactionCommitFailedException.class));
- verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx), any(Throwable.class));
+ verify(listener, timeout(5000)).onFailure(any());
- txChain.close();
- broker.close();
+ txChain.close();
+ }
}
@Test
.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder, commitTimeout);
- try (AbstractDataStore ds =
- newMember1TestKit.setupAbstractDataStore(
- testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
+ try (var ds = newMember1TestKit.setupDataStore(testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false,
+ CARS)) {
followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), CARS);
verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
}
- private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected)
+ private static void verifyCarsReadWriteTransactions(final ClientBackedDataStore datastore, final int expected)
throws Exception {
IntegrationTestKit.verifyShardStats(datastore, "cars",
stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount()));
final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500),
commitTimeout);
- try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
- testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
+ try (var follower2DistributedDataStore = follower2TestKit.setupDataStore(testParameter, testName,
+ MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
followerTestKit.waitForMembersUp("member-3");
follower2TestKit.waitForMembersUp("member-1", "member-2");
raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
final var noShardLeaderCohort = noShardLeaderWriteTx.ready();
- final ListenableFuture<Boolean> canCommit;
-
- // There is difference in behavior here:
- if (!leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
- // ask-based canCommit() times out and aborts
- final var ex = assertThrows(ExecutionException.class,
- () -> leaderTestKit.doCommit(noShardLeaderCohort)).getCause();
- assertThat(ex, instanceOf(NoShardLeaderException.class));
- assertThat(ex.getMessage(), containsString(
- "Shard member-1-shard-cars-testTransactionWithIsolatedLeader currently has no leader."));
- canCommit = null;
- } else {
- // tell-based canCommit() does not have a real timeout and hence continues
- canCommit = noShardLeaderCohort.canCommit();
- Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS);
- assertFalse(canCommit.isDone());
- }
+ // tell-based canCommit() does not have a real timeout and hence continues
+ final var canCommit = noShardLeaderCohort.canCommit();
+ Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS);
+ assertFalse(canCommit.isDone());
sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
.shardElectionTimeoutFactor(100));
final DOMStoreThreePhaseCommitCohort successTxCohort = successWriteTx.ready();
- followerDistributedDataStore = followerTestKit.setupAbstractDataStore(
- testParameter, testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
+ followerDistributedDataStore = followerTestKit.setupDataStore(testParameter, testName,
+ MODULE_SHARDS_CARS_ONLY_1_2, false, CARS);
leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
leaderTestKit.doCommit(successTxCohort);
- // continuation of tell-based protocol: readied transaction will complete commit, but will report an OLFE
- if (canCommit != null) {
- final var ex = assertThrows(ExecutionException.class,
- () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause();
- assertThat(ex, instanceOf(OptimisticLockFailedException.class));
- assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage());
- final var cause = ex.getCause();
- assertThat(cause, instanceOf(ConflictingModificationAppliedException.class));
- final var cmae = (ConflictingModificationAppliedException) cause;
- assertEquals("Node was created by other transaction.", cmae.getMessage());
- assertEquals(CarsModel.BASE_PATH, cmae.getPath());
- }
+ // continuation of canCommit(): readied transaction will complete commit, but will report an OLFE
+ final var ex = assertThrows(ExecutionException.class,
+ () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause();
+ assertThat(ex, instanceOf(OptimisticLockFailedException.class));
+ assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage());
+ final var cause = ex.getCause();
+ assertThat(cause, instanceOf(ConflictingModificationAppliedException.class));
+ final var cmae = (ConflictingModificationAppliedException) cause;
+ assertEquals("Node was created by other transaction.", cmae.getMessage());
+ assertEquals(CarsModel.BASE_PATH, cmae.getPath());
}
@Test
final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
follower2System, follower2DatastoreContextBuilder, commitTimeout);
- try (AbstractDataStore ds =
- follower2TestKit.setupAbstractDataStore(
- testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
+ try (var ds = follower2TestKit.setupDataStore(testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
followerTestKit.waitForMembersUp("member-1", "member-3");
follower2TestKit.waitForMembersUp("member-1", "member-2");
final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
follower2System, follower2DatastoreContextBuilder, commitTimeout);
- final AbstractDataStore ds2 =
- follower2TestKit.setupAbstractDataStore(
- testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
+ final var ds2 = follower2TestKit.setupDataStore(testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS);
followerTestKit.waitForMembersUp("member-1", "member-3");
follower2TestKit.waitForMembersUp("member-1", "member-2");
@Test
public void testReadWriteMessageSlicing() throws Exception {
- // The slicing is only implemented for tell-based protocol
- assumeTrue(ClientBackedDataStore.class.isAssignableFrom(testParameter));
-
leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
followerDatastoreContextBuilder.maximumMessageSliceSize(100);
initDatastoresWithCars("testLargeReadReplySlicing");
initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
leaderTestKit.doCommit(initialWriteTx.ready());
- try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
- testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) {
+ try (var follower2DistributedDataStore = follower2TestKit.setupDataStore(testParameter, testName,
+ MODULE_SHARDS_CARS_1_2_3, false)) {
final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards()
.getLocalShards().get("cars").getActor();
assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().orElseThrow());
}
- private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
+ private static void sendDatastoreContextUpdate(final ClientBackedDataStore dataStore, final Builder builder) {
final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
final DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
final Answer<DatastoreContext> answer = invocation -> newBuilder.build();