package org.opendaylight.controller.cluster.datastore;
import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.common.Uint64;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.collection.Set;
import scala.concurrent.Await;
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { TestDistributedDataStore.class, 7}, { TestClientBackedDataStore.class, 12 }
+ { TestDistributedDataStore.class, 7 }, { TestClientBackedDataStore.class, 12 }
});
}
private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
throws Exception {
- final Optional<NormalizedNode> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
-
- final CollectionNodeBuilder<MapEntryNode, SystemMapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
- CarsModel.CAR_QNAME);
- for (final NormalizedNode entry: entries) {
- listBuilder.withChild((MapEntryNode) entry);
- }
-
- assertEquals("Car list node", listBuilder.build(), optional.get());
+ assertEquals("Car list node",
+ Optional.of(ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME).withValue(Arrays.asList(entries)).build()),
+ readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS));
}
private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
- final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
-
- writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+ writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+ .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+ .build());
final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
.getCause();
writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
- final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
- new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
-
// Note that merge will validate the data and fail but put succeeds b/c deep validation is not
// done for put for performance reasons.
- writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
+ writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+ .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+ .build());
final var ex = assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS))
.getCause();
}
}
- @SuppressWarnings("unchecked")
@Test
public void testReadyLocalTransactionForwardedToLeader() throws Exception {
initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
- carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+ carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
if (resp instanceof akka.actor.Status.Failure) {
throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
- carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+ carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
if (resp instanceof akka.actor.Status.Failure) {
throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
}
- @SuppressWarnings("unchecked")
@Test
public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
followerDistributedDataStore.getActorUtils().findLocalShard("cars");
assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
- carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
+ carsFollowerShard.orElseThrow().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
// Send a tx with immediate commit.
new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification),
true, Optional.empty());
- carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+ carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
if (resp instanceof akka.actor.Status.Failure) {
throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification),
false, Optional.empty());
- carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+ carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
if (resp instanceof akka.actor.Status.Failure) {
throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
leaderDatastoreContextBuilder.shardBatchedModificationCount(2);
initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
+ // Verify backend statistics on start
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
+
// Do an initial write to get the primary shard info cached.
final DOMStoreWriteTransaction initialWriteTx = followerDistributedDataStore.newWriteOnlyTransaction();
writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
+ // At this point only leader should see the transactions
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
+
// Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
- // tx writes 5 cars so 2 BatchedModidifications messages will be sent initially and cached in the
- // leader shard (with shardBatchedModificationCount set to 2). The 3rd BatchedModidifications will be
- // sent on ready.
+ // tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard
+ // (with shardBatchedModificationCount set to 2). The 3rd BatchedModifications will be sent on ready.
final DOMStoreWriteTransaction writeTx3 = followerDistributedDataStore.newWriteOnlyTransaction();
for (int i = 1; i <= 5; i++, carIndex++) {
writeTx3.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
}
- // Prepare another WO that writes to a single shard. This will send a single BatchedModidifications
- // message on ready.
+ // Prepare another WO that writes to a single shard. This will send a single BatchedModifications message
+ // on ready.
final DOMStoreWriteTransaction writeTx4 = followerDistributedDataStore.newWriteOnlyTransaction();
cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
writeTx4.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
carIndex++;
- // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaciton message to the
- // leader shard on ready.
+ // Prepare a RW tx that will create a tx actor and send a ForwardedReadyTransaction message to the leader shard
+ // on ready.
final DOMStoreReadWriteTransaction readWriteTx = followerDistributedDataStore.newReadWriteTransaction();
cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
- readWriteTx.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
+ final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex);
+ readWriteTx.write(carPath, cars.getLast());
- // FIXME: CONTROLLER-2017: ClientBackedDataStore reports only 4 transactions
- assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
+ // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction
+ // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two
+ // transactions eagerly.
+ final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3;
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
// Disable elections on the leader so it switches to follower.
followerTestKit.doCommit(writeTx4Cohort);
followerTestKit.doCommit(rwTxCohort);
+ // At this point everything is committed and the follower datastore should see 5 transactions, but leader should
+ // only see the initial transactions
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 5);
+
DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
}
+ private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected)
+ throws Exception {
+ IntegrationTestKit.verifyShardStats(datastore, "cars",
+ stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount()));
+ }
+
@Test
public void testLeadershipTransferOnShutdown() throws Exception {
- // FIXME: remove when test passes also for ClientBackedDataStore
- assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
-
leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
followerDatastoreContextBuilder.shardElectionTimeoutFactor(10).customRaftPolicyImplementation(null);
final String testName = "testLeadershipTransferOnShutdown";
writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
final DOMStoreThreePhaseCommitCohort cohort1 = writeTx.ready();
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
+ final var usesCohorts = DistributedDataStore.class.isAssignableFrom(testParameter);
+ if (usesCohorts) {
+ IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+ stats -> assertEquals("getTxCohortCacheSize", 1, stats.getTxCohortCacheSize()));
+ }
writeTx = followerDistributedDataStore.newWriteOnlyTransaction();
final MapEntryNode car = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
writeTx.write(CarsModel.newCarPath("optima"), car);
final DOMStoreThreePhaseCommitCohort cohort2 = writeTx.ready();
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
+ if (usesCohorts) {
+ IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
+ stats -> assertEquals("getTxCohortCacheSize", 2, stats.getTxCohortCacheSize()));
+ }
// Gracefully stop the leader via a Shutdown message.
@Test
public void testTransactionWithIsolatedLeader() throws Exception {
- // FIXME: CONTROLLER-2018: remove when test passes also for ClientBackedDataStore
- assumeTrue(DistributedDataStore.class.isAssignableFrom(testParameter));
-
// Set the isolated leader check interval high so we can control the switch to IsolatedLeader.
leaderDatastoreContextBuilder.shardIsolatedLeaderCheckIntervalInMillis(10000000);
final String testName = "testTransactionWithIsolatedLeader";
MemberNode.verifyRaftState(leaderDistributedDataStore, "cars",
raftState -> assertEquals("getRaftState", "IsolatedLeader", raftState.getRaftState()));
- final var ex = assertThrows(ExecutionException.class,
- () -> leaderTestKit.doCommit(noShardLeaderWriteTx.ready()));
- assertEquals(NoShardLeaderException.class, Throwables.getRootCause(ex).getClass());
+ final var noShardLeaderCohort = noShardLeaderWriteTx.ready();
+ final ListenableFuture<Boolean> canCommit;
+
+ // There is difference in behavior here:
+ if (!leaderDistributedDataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
+ // ask-based canCommit() times out and aborts
+ final var ex = assertThrows(ExecutionException.class,
+ () -> leaderTestKit.doCommit(noShardLeaderCohort)).getCause();
+ assertThat(ex, instanceOf(NoShardLeaderException.class));
+ assertThat(ex.getMessage(), containsString(
+ "Shard member-1-shard-cars-testTransactionWithIsolatedLeader currently has no leader."));
+ canCommit = null;
+ } else {
+ // tell-based canCommit() does not have a real timeout and hence continues
+ canCommit = noShardLeaderCohort.canCommit();
+ Uninterruptibles.sleepUninterruptibly(commitTimeout, TimeUnit.SECONDS);
+ assertFalse(canCommit.isDone());
+ }
sendDatastoreContextUpdate(leaderDistributedDataStore, leaderDatastoreContextBuilder
.shardElectionTimeoutFactor(100));
leaderTestKit.doCommit(preIsolatedLeaderTxCohort);
leaderTestKit.doCommit(successTxCohort);
+
+ // continuation of tell-based protocol: readied transaction will complete commit, but will report an OLFE
+ if (canCommit != null) {
+ final var ex = assertThrows(ExecutionException.class,
+ () -> canCommit.get(commitTimeout, TimeUnit.SECONDS)).getCause();
+ assertThat(ex, instanceOf(OptimisticLockFailedException.class));
+ assertEquals("Optimistic lock failed for path " + CarsModel.BASE_PATH, ex.getMessage());
+ final var cause = ex.getCause();
+ assertThat(cause, instanceOf(ConflictingModificationAppliedException.class));
+ final var cmae = (ConflictingModificationAppliedException) cause;
+ assertEquals("Node was created by other transaction.", cmae.getMessage());
+ assertEquals(CarsModel.BASE_PATH, cmae.getPath());
+ }
}
@Test
// behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option
TestKit.shutdownActorSystem(follower2System, true);
- ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+ ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
.executeOperation(cars, GetOnDemandRaftState.INSTANCE);
await().atMost(10, TimeUnit.SECONDS)
.until(() -> containsUnreachable(followerCluster, follower2Member));
- ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+ ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
// to simulate a follower not being able to receive messages, but still being able to send messages and becoming
// candidate, we can just send a couple of RequestVotes to both leader and follower.
followerDatastoreContextBuilder.snapshotOnRootOverwrite(true));
leaderTestKit.waitForMembersUp("member-2");
- final ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME))
+ final ContainerNode rootNode = Builders.containerBuilder()
+ .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME))
.withChild(CarsModel.create())
.build();
assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
MetadataShardDataTreeSnapshot shardSnapshot =
(MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
- assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
+ assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().orElseThrow());
}
private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {