import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameter;
-import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
+import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.ShardSnapshotState;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
import org.opendaylight.mdsal.dom.api.DOMTransactionChainClosedException;
-import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
import org.opendaylight.mdsal.dom.spi.store.DOMStore;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.Uint64;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public abstract class AbstractDistributedDataStoreIntegrationTest {
-
@Parameter
- public Class<? extends AbstractDataStore> testParameter;
+ public Class<? extends ClientBackedDataStore> testParameter;
protected ActorSystem system;
@Test
public void testWriteTransactionWithSingleShard() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "transactionIntegrationTest", "test-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "transactionIntegrationTest", "test-1")) {
testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@Test
public void testWriteTransactionWithMultipleShards() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testWriteTransactionWithMultipleShards",
+ "cars-1", "people-1")) {
DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@Test
public void testReadWriteTransactionWithSingleShard() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testReadWriteTransactionWithSingleShard", "test-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testReadWriteTransactionWithSingleShard",
+ "test-1")) {
// 1. Create a read-write Tx
final DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
@Test
public void testReadWriteTransactionWithMultipleShards() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testReadWriteTransactionWithMultipleShards",
+ "cars-1", "people-1")) {
DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
assertNotNull("newReadWriteTransaction returned null", readWriteTx);
@Test
public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
- final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
+ final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
+ try (var dataStore = testKit.setupDataStore(testParameter, "testSingleTransactionsWritesInQuickSuccession",
+ "cars-1")) {
- final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+ final var txChain = dataStore.createTransactionChain();
- DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ var writeTx = txChain.newWriteOnlyTransaction();
writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
testKit.doCommit(writeTx.ready());
.untilAsserted(() -> {
// verify frontend metadata has no holes in purged transactions causing overtime memory leak
final var localShard = dataStore.getActorUtils().findLocalShard("cars-1") .orElseThrow();
- FrontendShardDataTreeSnapshotMetadata frontendMetadata =
- (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
+ final var frontendMetadata = (FrontendShardDataTreeSnapshotMetadata) dataStore.getActorUtils()
.executeOperation(localShard, new RequestFrontendMetadata());
final var clientMeta = frontendMetadata.getClients().get(0);
- if (dataStore.getActorUtils().getDatastoreContext().isUseTellBasedProtocol()) {
- assertTellMetadata(clientMeta);
- } else {
- assertAskMetadata(clientMeta);
+ final var iterator = clientMeta.getCurrentHistories().iterator();
+ var metadata = iterator.next();
+ while (iterator.hasNext() && metadata.getHistoryId() != 1) {
+ metadata = iterator.next();
}
+ assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
});
final var body = txChain.newReadOnlyTransaction().read(CarsModel.CAR_LIST_PATH)
}
}
- private static void assertAskMetadata(final FrontendClientMetadata clientMeta) {
- // ask based should track no metadata
- assertEquals(List.of(), clientMeta.getCurrentHistories());
- }
-
- private static void assertTellMetadata(final FrontendClientMetadata clientMeta) {
- final var iterator = clientMeta.getCurrentHistories().iterator();
- var metadata = iterator.next();
- while (iterator.hasNext() && metadata.getHistoryId() != 1) {
- metadata = iterator.next();
- }
- assertEquals("[[0..10]]", metadata.getPurgedTransactions().ranges().toString());
- }
-
@SuppressWarnings("checkstyle:IllegalCatch")
private void testTransactionCommitFailureWithNoShardLeader(final boolean writeOnly, final String testName)
throws Exception {
datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
.shardInitializationTimeout(200, TimeUnit.MILLISECONDS).frontendRequestTimeoutInSeconds(2);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(testParameter, testName, false, shardName)) {
-
+ try (var dataStore = testKit.setupDataStore(testParameter, testName, false, shardName)) {
final Object result = dataStore.getActorUtils().executeOperation(
dataStore.getActorUtils().getShardManager(), new FindLocalShard(shardName, true));
assertTrue("Expected LocalShardFound. Actual: " + result, result instanceof LocalShardFound);
@Test
public void testTransactionAbort() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "transactionAbortIntegrationTest", "test-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "transactionAbortIntegrationTest", "test-1")) {
final DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
assertNotNull("newWriteOnlyTransaction returned null", writeTx);
@SuppressWarnings("checkstyle:IllegalCatch")
public void testTransactionChainWithSingleShard() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testTransactionChainWithSingleShard", "test-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testTransactionChainWithSingleShard", "test-1")) {
// 1. Create a Tx chain and write-only Tx
final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@Test
public void testTransactionChainWithMultipleShards() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testTransactionChainWithMultipleShards",
+ "cars-1", "people-1")) {
final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@Test
public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionsInQuickSuccession",
+ "cars-1")) {
final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
.put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
MoreExecutors.directExecutor());
- final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
- DOMTransactionChain txChain = broker.createTransactionChain(listener);
+ DOMTransactionChain txChain = broker.createTransactionChain();
final List<ListenableFuture<?>> futures = new ArrayList<>();
@Test
public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionAfterEmptyTxReadied",
+ "test-1")) {
final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@Test
public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionWhenPreviousNotReady",
+ "test-1")) {
final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@Test
public void testCreateChainedTransactionAfterClose() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testCreateChainedTransactionAfterClose", "test-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testCreateChainedTransactionAfterClose",
+ "test-1")) {
final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
txChain.close();
@Test
public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testChainWithReadOnlyTxAfterPreviousReady",
+ "test-1")) {
final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@Test
public void testChainedTransactionFailureWithSingleShard() throws Exception {
final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (var dataStore = testKit.setupAbstractDataStore(testParameter,
- "testChainedTransactionFailureWithSingleShard", "cars-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testChainedTransactionFailureWithSingleShard",
+ "cars-1")) {
final var broker = new ConcurrentDOMDataBroker(
ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
.put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
MoreExecutors.directExecutor());
- final var listener = Mockito.mock(DOMTransactionChainListener.class);
- final var txChain = broker.createTransactionChain(listener);
+ final var listener = mock(FutureCallback.class);
+ final var txChain = broker.createTransactionChain();
+ txChain.addCallback(listener);
final var writeTx = txChain.newReadWriteTransaction();
assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
- verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
- any(Throwable.class));
+ verify(listener, timeout(5000)).onFailure(any());
txChain.close();
broker.close();
@Test
public void testChainedTransactionFailureWithMultipleShards() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testChainedTransactionFailureWithMultipleShards",
+ "cars-1", "people-1")) {
final ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
.put(LogicalDatastoreType.CONFIGURATION, dataStore).build(),
MoreExecutors.directExecutor());
- final DOMTransactionChainListener listener = Mockito.mock(DOMTransactionChainListener.class);
- final DOMTransactionChain txChain = broker.createTransactionChain(listener);
+ final var listener = mock(FutureCallback.class);
+ final DOMTransactionChain txChain = broker.createTransactionChain();
+ txChain.addCallback(listener);
final DOMDataTreeWriteTransaction writeTx = txChain.newReadWriteTransaction();
// done for put for performance reasons.
assertThrows(ExecutionException.class, () -> writeTx.commit().get(5, TimeUnit.SECONDS));
- verify(listener, timeout(5000)).onTransactionChainFailed(eq(txChain), eq(writeTx),
- any(Throwable.class));
+ verify(listener, timeout(5000)).onFailure(any());
txChain.close();
broker.close();
@Test
public void testDataTreeChangeListenerRegistration() throws Exception {
final IntegrationTestKit testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder);
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, "testDataTreeChangeListenerRegistration", "test-1")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testDataTreeChangeListenerRegistration",
+ "test-1")) {
testKit.testWriteTransaction(dataStore, TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME));
final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
- ListenerRegistration<MockDataTreeChangeListener> listenerReg = dataStore
- .registerTreeChangeListener(TestModel.TEST_PATH, listener);
+ final var listenerReg = dataStore.registerTreeChangeListener(TestModel.TEST_PATH, listener);
assertNotNull("registerTreeChangeListener returned null", listenerReg);
new DatastoreSnapshot.ShardSnapshot("cars", carsSnapshot),
new DatastoreSnapshot.ShardSnapshot("people", peopleSnapshot)));
- try (AbstractDataStore dataStore = testKit.setupAbstractDataStore(
- testParameter, name, "module-shards-member1.conf", true, "cars", "people")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, name, "module-shards-member1.conf", true,
+ "cars", "people")) {
final DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
// FIXME: re-enable this test
public void testSnapshotOnRootOverwrite() throws Exception {
final var testKit = new IntegrationTestKit(getSystem(), datastoreContextBuilder.snapshotOnRootOverwrite(true));
- try (var dataStore = testKit.setupAbstractDataStore(
- testParameter, "testRootOverwrite", "module-shards-default-cars-member1.conf",
- true, "cars", "default")) {
+ try (var dataStore = testKit.setupDataStore(testParameter, "testRootOverwrite",
+ "module-shards-default-cars-member1.conf", true, "cars", "default")) {
final var rootNode = Builders.containerBuilder()
.withNodeIdentifier(NodeIdentifier.create(SchemaContext.NAME))