import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
throws Exception {
- final Optional<NormalizedNode> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
-
- final CollectionNodeBuilder<MapEntryNode, SystemMapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
- CarsModel.CAR_QNAME);
- for (final NormalizedNode entry: entries) {
- listBuilder.withChild((MapEntryNode) entry);
- }
-
- assertEquals("Car list node", listBuilder.build(), optional.get());
+ assertEquals("Car list node",
+ Optional.of(ImmutableNodes.mapNodeBuilder(CarsModel.CAR_QNAME).withValue(Arrays.asList(entries)).build()),
+ readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS));
}
private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
- carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+ carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
if (resp instanceof akka.actor.Status.Failure) {
throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
- carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
+ carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
if (resp instanceof akka.actor.Status.Failure) {
throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
followerDistributedDataStore.getActorUtils().findLocalShard("cars");
assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
- carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
+ carsFollowerShard.orElseThrow().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
// Send a tx with immediate commit.
new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification),
true, Optional.empty());
- carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+ carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
if (resp instanceof akka.actor.Status.Failure) {
throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification),
false, Optional.empty());
- carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
+ carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
if (resp instanceof akka.actor.Status.Failure) {
throw new AssertionError("Unexpected failure response", ((akka.actor.Status.Failure)resp).cause());
initDatastoresWithCarsAndPeople("testTransactionForwardedToLeaderAfterRetry");
// Verify backend statistics on start
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
- IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, 0);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
// Do an initial write to get the primary shard info cached.
final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
// At this point only leader should see the transactions
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 2, stats.getReadWriteTransactionCount()));
- IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 0, stats.getReadWriteTransactionCount()));
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, 2);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
// Prepare another WO that writes to a single shard and thus will be directly committed on ready. This
// tx writes 5 cars so 2 BatchedModifications messages will be sent initially and cached in the leader shard
final YangInstanceIdentifier carPath = CarsModel.newCarPath("car" + carIndex);
readWriteTx.write(carPath, cars.getLast());
- // There is a difference here between implementations: tell-based protocol will postpone write operations until
- // either a read is made or the transaction is submitted. Here we flush out the last transaction, so we see
- // three transactions, not just the ones we have started committing
- assertTrue(readWriteTx.exists(carPath).get(2, TimeUnit.SECONDS));
+ // There is a difference here between implementations: tell-based protocol enforces batching on per-transaction
+ // level whereas ask-based protocol has a global limit towards a shard -- and hence flushes out last two
+ // transactions eagerly.
final int earlyTxCount = DistributedDataStore.class.isAssignableFrom(testParameter) ? 5 : 3;
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", earlyTxCount, stats.getReadWriteTransactionCount()));
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 0);
// Disable elections on the leader so it switches to follower.
// At this point everything is committed and the follower datastore should see 5 transactions, but leader should
// only see the initial transactions
- IntegrationTestKit.verifyShardStats(leaderDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", earlyTxCount, stats.getReadWriteTransactionCount()));
- IntegrationTestKit.verifyShardStats(followerDistributedDataStore, "cars",
- stats -> assertEquals("getReadWriteTransactionCount", 5, stats.getReadWriteTransactionCount()));
+ verifyCarsReadWriteTransactions(leaderDistributedDataStore, earlyTxCount);
+ verifyCarsReadWriteTransactions(followerDistributedDataStore, 5);
DOMStoreReadTransaction readTx = leaderDistributedDataStore.newReadOnlyTransaction();
verifyCars(readTx, cars.toArray(new MapEntryNode[cars.size()]));
verifyNode(readTx, PeopleModel.PERSON_LIST_PATH, people);
}
+ private static void verifyCarsReadWriteTransactions(final AbstractDataStore datastore, final int expected)
+ throws Exception {
+ IntegrationTestKit.verifyShardStats(datastore, "cars",
+ stats -> assertEquals("getReadWriteTransactionCount", expected, stats.getReadWriteTransactionCount()));
+ }
+
@Test
public void testLeadershipTransferOnShutdown() throws Exception {
leaderDatastoreContextBuilder.shardBatchedModificationCount(1);
// behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option
TestKit.shutdownActorSystem(follower2System, true);
- ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+ ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
.executeOperation(cars, GetOnDemandRaftState.INSTANCE);
await().atMost(10, TimeUnit.SECONDS)
.until(() -> containsUnreachable(followerCluster, follower2Member));
- ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").get();
+ ActorRef followerCars = followerDistributedDataStore.getActorUtils().findLocalShard("cars").orElseThrow();
// to simulate a follower not being able to receive messages, but still being able to send messages and becoming
// candidate, we can just send a couple of RequestVotes to both leader and follower.
assertEquals("Snapshot state type", ShardSnapshotState.class, actual.getState().getClass());
MetadataShardDataTreeSnapshot shardSnapshot =
(MetadataShardDataTreeSnapshot) ((ShardSnapshotState)actual.getState()).getSnapshot();
- assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().get());
+ assertEquals("Snapshot root node", expRoot, shardSnapshot.getRootNode().orElseThrow());
}
private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {