import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Range;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.SystemMapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.builder.CollectionNodeBuilder;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
leaderDistributedDataStore.close();
}
- TestKit.shutdownActorSystem(leaderSystem);
- TestKit.shutdownActorSystem(followerSystem);
- TestKit.shutdownActorSystem(follower2System);
+ TestKit.shutdownActorSystem(leaderSystem, true);
+ TestKit.shutdownActorSystem(followerSystem, true);
+ TestKit.shutdownActorSystem(follower2System,true);
InMemoryJournal.clear();
InMemorySnapshotStore.clear();
}
private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards,
- DatastoreContext.Builder leaderBuilder, DatastoreContext.Builder followerBuilder) throws Exception {
+ final DatastoreContext.Builder leaderBuilder, final DatastoreContext.Builder followerBuilder)
+ throws Exception {
leaderTestKit = new IntegrationTestKit(leaderSystem, leaderBuilder, commitTimeout);
leaderDistributedDataStore = leaderTestKit.setupAbstractDataStore(
private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
throws Exception {
- final Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ final Optional<NormalizedNode> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
assertTrue("isPresent", optional.isPresent());
- final CollectionNodeBuilder<MapEntryNode, MapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
+ final CollectionNodeBuilder<MapEntryNode, SystemMapNode> listBuilder = ImmutableNodes.mapNodeBuilder(
CarsModel.CAR_QNAME);
- for (final NormalizedNode<?, ?> entry: entries) {
+ for (final NormalizedNode entry: entries) {
listBuilder.withChild((MapEntryNode) entry);
}
}
private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
- final NormalizedNode<?, ?> expNode) throws Exception {
- final Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
+ final NormalizedNode expNode) throws Exception {
+ final Optional<NormalizedNode> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
assertTrue("isPresent", optional.isPresent());
assertEquals("Data node", expNode, optional.get());
}
}
assertEquals(0, metadata.getClosedTransactions().size());
- assertEquals(Range.closedOpen(UnsignedLong.valueOf(0), UnsignedLong.valueOf(11)),
- metadata.getPurgedTransactions().asRanges().iterator().next());
+
+ final var purgedRanges = metadata.getPurgedTransactions().ranges();
+ assertEquals(1, purgedRanges.size());
+ final var purgedRange = purgedRanges.first();
+ assertEquals(UnsignedLong.ZERO, purgedRange.lower());
+ assertEquals(UnsignedLong.valueOf(10), purgedRange.upper());
} else {
// ask based should track no metadata
assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
}
});
- final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+ final Optional<NormalizedNode> optional = txChain.newReadOnlyTransaction()
.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
assertTrue("isPresent", optional.isPresent());
- assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.get().body()).size());
}
@Test
metadata = iterator.next();
}
- Set<Range<UnsignedLong>> ranges = metadata.getPurgedTransactions().asRanges();
-
assertEquals(0, metadata.getClosedTransactions().size());
- assertEquals(1, ranges.size());
+ assertEquals(1, metadata.getPurgedTransactions().size());
} else {
// ask based should track no metadata
assertTrue(frontendMetadata.getClients().get(0).getCurrentHistories().isEmpty());
}
});
- final Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction()
+ final Optional<NormalizedNode> optional = txChain.newReadOnlyTransaction()
.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
assertTrue("isPresent", optional.isPresent());
- assertEquals("# cars", numCars, ((Collection<?>) optional.get().getValue()).size());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.get().body()).size());
}
@Test
assertNotNull("newWriteOnlyTransaction returned null", writeTx);
final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
- final NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
+ final NormalizedNode carsNode = CarsModel.emptyContainer();
writeTx.write(carsPath, carsNode);
final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
- final NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
+ final NormalizedNode peopleNode = PeopleModel.emptyContainer();
writeTx.write(peoplePath, peopleNode);
followerTestKit.doCommit(writeTx.ready());
assertNotNull("newReadWriteTransaction returned null", rwTx);
final YangInstanceIdentifier carsPath = CarsModel.BASE_PATH;
- final NormalizedNode<?, ?> carsNode = CarsModel.emptyContainer();
+ final NormalizedNode carsNode = CarsModel.emptyContainer();
rwTx.write(carsPath, carsNode);
final YangInstanceIdentifier peoplePath = PeopleModel.BASE_PATH;
- final NormalizedNode<?, ?> peopleNode = PeopleModel.emptyContainer();
+ final NormalizedNode peopleNode = PeopleModel.emptyContainer();
rwTx.write(peoplePath, peopleNode);
followerTestKit.doCommit(rwTx.ready());
final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
readWriteTx.merge(personPath, person);
- Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
+ Optional<NormalizedNode> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
assertTrue("isPresent", optional.isPresent());
assertEquals("Data node", car, optional.get());
cars.add(CarsModel.newCarEntry("car" + carIndex, Uint64.valueOf(carIndex)));
writeTx2.write(CarsModel.newCarPath("car" + carIndex), cars.getLast());
carIndex++;
- NormalizedNode<?, ?> people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME)
+ NormalizedNode people = ImmutableNodes.mapNodeBuilder(PeopleModel.PERSON_QNAME)
.withChild(PeopleModel.newPersonEntry("Dude")).build();
writeTx2.write(PeopleModel.PERSON_LIST_PATH, people);
final DOMStoreThreePhaseCommitCohort writeTx2Cohort = writeTx2.ready();
followerTestKit.waitForMembersUp("member-1", "member-3");
follower2TestKit.waitForMembersUp("member-1", "member-2");
- TestKit.shutdownActorSystem(follower2System);
+ // behavior is controlled by akka.coordinated-shutdown.run-by-actor-system-terminate configuration option
+ TestKit.shutdownActorSystem(follower2System, true);
ActorRef cars = leaderDistributedDataStore.getActorUtils().findLocalShard("cars").get();
- OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+ final OnDemandRaftState initialState = (OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
.executeOperation(cars, GetOnDemandRaftState.INSTANCE);
Cluster leaderCluster = Cluster.get(leaderSystem);
CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", Uint64.valueOf(20000))));
AbstractShardTest.writeToStore(tree, CarsModel.BASE_PATH, carsNode);
- final NormalizedNode<?, ?> snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty());
+ final NormalizedNode snapshotRoot = AbstractShardTest.readStore(tree, YangInstanceIdentifier.empty());
final Snapshot initialSnapshot = Snapshot.create(
new ShardSnapshotState(new MetadataShardDataTreeSnapshot(snapshotRoot)),
Collections.emptyList(), 5, 1, 5, 1, 1, null, null);
initDatastoresWithCars(testName);
- final Optional<NormalizedNode<?, ?>> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
+ final Optional<NormalizedNode> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
assertTrue("isPresent", readOptional.isPresent());
assertEquals("Node", carsNode, readOptional.get());
final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
- final NormalizedNode<?, ?> carsNode = CarsModel.create();
+ final NormalizedNode carsNode = CarsModel.create();
rwTx.write(CarsModel.BASE_PATH, carsNode);
verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
}
@Test
- @Ignore("Writes to root node are not split into shards")
public void testSnapshotOnRootOverwrite() throws Exception {
if (!DistributedDataStore.class.isAssignableFrom(testParameter)) {
// FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate
}
final String testName = "testSnapshotOnRootOverwrite";
- String[] shards = {"cars", "default"};
- initDatastores(testName, "module-shards-default-cars-member1.conf", shards,
+ final String[] shards = {"cars", "default"};
+ initDatastores(testName, "module-shards-default-cars-member1-and-2.conf", shards,
leaderDatastoreContextBuilder.snapshotOnRootOverwrite(true),
followerDatastoreContextBuilder.snapshotOnRootOverwrite(true));
leaderTestKit.waitForMembersUp("member-2");
- ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
+ final ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
.withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME))
- .withChild((ContainerNode) CarsModel.create())
+ .withChild(CarsModel.create())
.build();
leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode);
IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
- state -> assertEquals(0, state.getSnapshotIndex()));
+ state -> assertEquals(1, state.getSnapshotIndex()));
IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
- state -> assertEquals(0, state.getSnapshotIndex()));
+ state -> assertEquals(1, state.getSnapshotIndex()));
- verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0);
- verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0);
+ verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1);
+ verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1);
for (int i = 0; i < 10; i++) {
leaderTestKit.testWriteTransaction(leaderDistributedDataStore, CarsModel.newCarPath("car " + i),
// fake snapshot causes the snapshotIndex to move
IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
- state -> assertEquals(9, state.getSnapshotIndex()));
+ state -> assertEquals(10, state.getSnapshotIndex()));
IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
- state -> assertEquals(9, state.getSnapshotIndex()));
+ state -> assertEquals(10, state.getSnapshotIndex()));
- // however the real snapshot still has not changed and was taken at index 0
- verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 0);
- verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 0);
+ // however the real snapshot still has not changed and was taken at index 1
+ verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 1);
+ verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 1);
// root overwrite so expect a snapshot
leaderTestKit.testWriteTransaction(leaderDistributedDataStore, YangInstanceIdentifier.empty(), rootNode);
- // this was a real snapshot so everything should be in it(1 + 10 + 1)
+ // this was a real snapshot so everything should be in it(1(DisableTrackingPayload) + 1 + 10 + 1)
IntegrationTestKit.verifyShardState(leaderDistributedDataStore, "cars",
- state -> assertEquals(11, state.getSnapshotIndex()));
+ state -> assertEquals(12, state.getSnapshotIndex()));
IntegrationTestKit.verifyShardState(followerDistributedDataStore, "cars",
- state -> assertEquals(11, state.getSnapshotIndex()));
+ state -> assertEquals(12, state.getSnapshotIndex()));
- verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 11);
- verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 11);
+ verifySnapshot("member-1-shard-cars-testSnapshotOnRootOverwrite", 12);
+ verifySnapshot("member-2-shard-cars-testSnapshotOnRootOverwrite", 12);
}
- private void verifySnapshot(String persistenceId, long lastAppliedIndex) {
+ private void verifySnapshot(final String persistenceId, final long lastAppliedIndex) {
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
List<Snapshot> snap = InMemorySnapshotStore.getSnapshots(persistenceId, Snapshot.class);
assertEquals(1, snap.size());
}
private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
- final NormalizedNode<?, ?> expRoot) {
+ final NormalizedNode expRoot) {
assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
assertEquals("Snapshot getLastAppliedIndex", expected.getLastAppliedIndex(), actual.getLastAppliedIndex());
assertEquals("Snapshot getLastTerm", expected.getLastTerm(), actual.getLastTerm());