import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.timeout;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameter;
import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.Uint64;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
+import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public abstract class AbstractDistributedDataStoreIntegrationTest {
// Verify the data in the store
final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
- Optional<NormalizedNode> optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", car, optional.get());
-
- optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", person, optional.get());
+ assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+ assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
}
}
final Boolean exists = readWriteTx.exists(nodePath).get(5, TimeUnit.SECONDS);
assertEquals("exists", Boolean.TRUE, exists);
- Optional<NormalizedNode> optional = readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", nodeToWrite, optional.get());
+ assertEquals(Optional.of(nodeToWrite), readWriteTx.read(nodePath).get(5, TimeUnit.SECONDS));
// 4. Ready the Tx for commit
final DOMStoreThreePhaseCommitCohort cohort = readWriteTx.ready();
// 6. Verify the data in the store
final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
- optional = readTx.read(nodePath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", nodeToWrite, optional.get());
+ assertEquals(Optional.of(nodeToWrite), readTx.read(nodePath).get(5, TimeUnit.SECONDS));
}
}
final Boolean exists = readWriteTx.exists(carPath).get(5, TimeUnit.SECONDS);
assertEquals("exists", Boolean.TRUE, exists);
- Optional<NormalizedNode> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", car, optional.get());
+ assertEquals("Data node", Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
testKit.doCommit(readWriteTx.ready());
// Verify the data in the store
DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
- optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", car, optional.get());
-
- optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", person, optional.get());
+ assertEquals(Optional.of(car), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+ assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
}
}
// leader was elected in time, the Tx
// should have timed out and throw an appropriate
// exception cause.
- try {
- txCohort.get().canCommit().get(10, TimeUnit.SECONDS);
- fail("Expected NoShardLeaderException");
- } catch (final ExecutionException e) {
- final String msg = "Unexpected exception: "
- + Throwables.getStackTraceAsString(e.getCause());
- if (DistributedDataStore.class.isAssignableFrom(testParameter)) {
- assertTrue(Throwables.getRootCause(e) instanceof NoShardLeaderException);
- } else {
- assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
- }
- }
+ final var ex = assertThrows(ExecutionException.class,
+ () -> txCohort.get().canCommit().get(10, TimeUnit.SECONDS));
+ assertTrue("Unexpected exception: " + Throwables.getStackTraceAsString(ex.getCause()),
+ Throwables.getRootCause(ex) instanceof RequestTimeoutException);
} finally {
try {
if (writeTxToClose != null) {
// the data from the first
// Tx is visible after being readied.
DOMStoreReadTransaction readTx = txChain.newReadOnlyTransaction();
- Optional<NormalizedNode> optional = readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", testNode, optional.get());
+ assertEquals(Optional.of(testNode), readTx.read(TestModel.TEST_PATH).get(5, TimeUnit.SECONDS));
// 6. Create a new RW Tx from the chain, write more data,
// and ready it
// from the last RW Tx to
// verify it is visible.
readTx = txChain.newReadWriteTransaction();
- optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", outerNode, optional.get());
+ assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS));
// 8. Wait for the 2 commits to complete and close the
// chain.
// 9. Create a new read Tx from the data store and verify
// committed data.
readTx = dataStore.newReadOnlyTransaction();
- optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", outerNode, optional.get());
+ assertEquals(Optional.of(outerNode), readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS));
}
}
final YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
readWriteTx.merge(personPath, person);
- Optional<NormalizedNode> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", car, optional.get());
-
- optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", person, optional.get());
+ assertEquals(Optional.of(car), readWriteTx.read(carPath).get(5, TimeUnit.SECONDS));
+ assertEquals(Optional.of(person), readWriteTx.read(personPath).get(5, TimeUnit.SECONDS));
final DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
- optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
- assertFalse("isPresent", optional.isPresent());
-
- optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", person, optional.get());
+ assertEquals(Optional.empty(), readTx.read(carPath).get(5, TimeUnit.SECONDS));
+ assertEquals(Optional.of(person), readTx.read(personPath).get(5, TimeUnit.SECONDS));
}
}
final Optional<NormalizedNode> optional = txChain.newReadOnlyTransaction()
.read(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
assertTrue("isPresent", optional.isPresent());
- assertEquals("# cars", numCars, ((Collection<?>) optional.get().body()).size());
+ assertEquals("# cars", numCars, ((Collection<?>) optional.orElseThrow().body()).size());
txChain.close();
@Test
public void testChainedTransactionFailureWithSingleShard() throws Exception {
- final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testChainedTransactionFailureWithSingleShard", "cars-1")) {
+ final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+ try (var dataStore = testKit.setupAbstractDataStore(testParameter,
+ "testChainedTransactionFailureWithSingleShard", "cars-1")) {
- final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+ final var broker = new ConcurrentDOMDataBroker(
ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
.put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
MoreExecutors.directExecutor());
- final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
- final DOMTransactionChain txChain = broker.createTransactionChain(listener);
+ final var listener = Mockito.mock(DOMTransactionChainListener.class);
+ final var txChain = broker.createTransactionChain(listener);
- final DOMDataTreeReadWriteTransaction writeTx = txChain.newReadWriteTransaction();
+ final var writeTx = txChain.newReadWriteTransaction();
writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
PeopleModel.emptyContainer());
- final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+ final var invalidData = Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+ .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+ .build();
writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
- try {
- writeTx.commit().get(5, TimeUnit.SECONDS);
- fail("Expected TransactionCommitFailedException");
- } catch (final ExecutionException e) {
- // Expected
- }
+ assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
any(Throwable.class));
writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH,
PeopleModel.emptyContainer());
- final ContainerNode invalidData = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
- .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk")).build();
+ final ContainerNode invalidData = Builders.containerBuilder()
+ .withNodeIdentifier(new NodeIdentifier(CarsModel.BASE_QNAME))
+ .withChild(ImmutableNodes.leafNode(TestModel.JUNK_QNAME, "junk"))
+ .build();
writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
// Note that merge will validate the data and fail but put
// succeeds b/c deep validation is not
// done for put for performance reasons.
- try {
- writeTx.commit().get(5, TimeUnit.SECONDS);
- fail("Expected TransactionCommitFailedException");
- } catch (final ExecutionException e) {
- // Expected
- }
+ assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
any(Throwable.class));
DataTree dataTree = new InMemoryDataTreeFactory().create(
DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
AbstractShardTest.writeToStore(dataTree, CarsModel.BASE_PATH, carsNode);
- NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
+ NormalizedNode root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.of());
final Snapshot carsSnapshot = Snapshot.create(
new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
final NormalizedNode peopleNode = PeopleModel.create();
AbstractShardTest.writeToStore(dataTree, PeopleModel.BASE_PATH, peopleNode);
- root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.empty());
+ root = AbstractShardTest.readStore(dataTree, YangInstanceIdentifier.of());
final Snapshot peopleSnapshot = Snapshot.create(
new ShardSnapshotState(new MetadataShardDataTreeSnapshot(root)),
final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
// two reads
- Optional<NormalizedNode> optional = readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", carsNode, optional.get());
-
- optional = readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS);
- assertTrue("isPresent", optional.isPresent());
- assertEquals("Data node", peopleNode, optional.get());
+ assertEquals(Optional.of(carsNode), readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS));
+ assertEquals(Optional.of(peopleNode), readTx.read(PeopleModel.BASE_PATH).get(5, TimeUnit.SECONDS));
}
}
@Test
+ @Ignore("ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate")
+ // FIXME: re-enable this test
public void testSnapshotOnRootOverwrite() throws Exception {
- if (!DistributedDataStore.class.isAssignableFrom(testParameter)) {
- // FIXME: ClientBackedDatastore does not have stable indexes/term, the snapshot index seems to fluctuate
- return;
- }
-
- final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(),
- datastoreContextBuilder.snapshotOnRootOverwrite(true));
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
+ final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder.snapshotOnRootOverwrite(true));
+ try (var dataStore = testKit.setupAbstractDataStore(
testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf",
true, "cars", "default")) {
- ContainerNode rootNode = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(SchemaContext.NAME))
- .withChild(CarsModel.create())
- .build();
+ final var rootNode = Builders.containerBuilder()
+ .withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME))
+ .withChild(CarsModel.create())
+ .build();
- testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
+ testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.of(), rootNode);
IntegrationTestKit.verifyShardState(dataStore, "cars",
state -> assertEquals(1, state.getSnapshotIndex()));
verifySnapshot("member-1-shard-cars-testRootOverwrite", 1, 1);
// root overwrite so expect a snapshot
- testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.empty(), rootNode);
+ testKit.testWriteTransaction(dataStore, YangInstanceIdentifier.of(), rootNode);
// this was a real snapshot so everything should be in it(1 + 10 + 1)
IntegrationTestKit.verifyShardState(dataStore, "cars",