import akka.cluster.Cluster;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
}};
}
+ @Test
+ public void testSingleTransactionsWritesInQuickSuccession() throws Exception{
+ new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
+ DistributedDataStore dataStore = setupDistributedDataStore(
+ "testSingleTransactionsWritesInQuickSuccession", "cars-1");
+
+ DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+ DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ doCommit(writeTx.ready());
+
+ writeTx = txChain.newWriteOnlyTransaction();
+
+ int nCars = 5;
+ for(int i = 0; i < nCars; i++) {
+ writeTx.write(CarsModel.newCarPath("car" + i),
+ CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+ }
+
+ doCommit(writeTx.ready());
+
+ Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
+ CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
+
+ cleanup(dataStore);
+ }};
+ }
+
private void testTransactionWritesWithShardNotInitiallyReady(final String testName,
final boolean writeOnly) throws Exception {
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
@Test
public void testTransactionAbort() throws Exception{
- System.setProperty("shard.persistent", "true");
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
DistributedDataStore dataStore =
setupDistributedDataStore("transactionAbortIntegrationTest", "test-1");
public void testCreateChainedTransactionsInQuickSuccession() throws Exception{
new IntegrationTestKit(getSystem(), datastoreContextBuilder) {{
DistributedDataStore dataStore = setupDistributedDataStore(
- "testCreateChainedTransactionsInQuickSuccession", "test-1");
+ "testCreateChainedTransactionsInQuickSuccession", "cars-1");
- DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+ ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
+ ImmutableMap.<LogicalDatastoreType, DOMStore>builder().put(
+ LogicalDatastoreType.CONFIGURATION, dataStore).build(), MoreExecutors.directExecutor());
- NormalizedNode<?, ?> testNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
+ DOMTransactionChain txChain = broker.createTransactionChain(listener);
+
+ List<CheckedFuture<Void, TransactionCommitFailedException>> futures = new ArrayList<>();
- int nTxs = 20;
- List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(nTxs);
- for(int i = 0; i < nTxs; i++) {
- DOMStoreReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
+ DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ futures.add(writeTx.submit());
- rwTx.merge(TestModel.TEST_PATH, testNode);
+ int nCars = 100;
+ for(int i = 0; i < nCars; i++) {
+ DOMDataReadWriteTransaction rwTx = txChain.newReadWriteTransaction();
- cohorts.add(rwTx.ready());
+ rwTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.newCarPath("car" + i),
+ CarsModel.newCarEntry("car" + i, BigInteger.valueOf(20000)));
+ futures.add(rwTx.submit());
}
- for(DOMStoreThreePhaseCommitCohort cohort: cohorts) {
- doCommit(cohort);
+ for(CheckedFuture<Void, TransactionCommitFailedException> f: futures) {
+ f.checkedGet();
}
+ Optional<NormalizedNode<?, ?>> optional = txChain.newReadOnlyTransaction().read(
+ LogicalDatastoreType.CONFIGURATION, CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
+ assertEquals("isPresent", true, optional.isPresent());
+ assertEquals("# cars", nCars, ((Collection<?>)optional.get().getValue()).size());
+
txChain.close();
+ broker.close();
+
cleanup(dataStore);
}};
}