import akka.cluster.Cluster;
import akka.dispatch.Futures;
import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import com.google.common.base.Optional;
+import akka.testkit.javadsl.TestKit;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.cluster.access.client.RequestTimeoutException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker;
import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.common.api.TransactionChainListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
@Parameters(name = "{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
- { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 120 }
+ { DistributedDataStore.class, 7}, { ClientBackedDataStore.class, 12 }
});
}
private static final String MODULE_SHARDS_CARS_ONLY_1_2 = "module-shards-cars-member-1-and-2.conf";
private static final String MODULE_SHARDS_CARS_PEOPLE_1_2 = "module-shards-member1-and-2.conf";
private static final String MODULE_SHARDS_CARS_PEOPLE_1_2_3 = "module-shards-member1-and-2-and-3.conf";
+ private static final String MODULE_SHARDS_CARS_1_2_3 = "module-shards-cars-member-1-and-2-and-3.conf";
private ActorSystem leaderSystem;
private ActorSystem followerSystem;
leaderDistributedDataStore.close();
}
- JavaTestKit.shutdownActorSystem(leaderSystem);
- JavaTestKit.shutdownActorSystem(followerSystem);
- JavaTestKit.shutdownActorSystem(follower2System);
+ TestKit.shutdownActorSystem(leaderSystem);
+ TestKit.shutdownActorSystem(followerSystem);
+ TestKit.shutdownActorSystem(follower2System);
InMemoryJournal.clear();
InMemorySnapshotStore.clear();
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
}
- JavaTestKit.shutdownActorSystem(leaderSystem, null, Boolean.TRUE);
- JavaTestKit.shutdownActorSystem(followerSystem, null, Boolean.TRUE);
+ TestKit.shutdownActorSystem(leaderSystem, Boolean.TRUE);
+ TestKit.shutdownActorSystem(followerSystem, Boolean.TRUE);
final ActorSystem newSystem = newActorSystem("reinstated-member2", "Member2");
final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
final DOMTransactionChain txChain = broker.createTransactionChain(listener);
- final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
final ContainerNode invalidData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
new YangInstanceIdentifier.NodeIdentifier(CarsModel.BASE_QNAME))
writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
try {
- writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+ writeTx.commit().get(5, TimeUnit.SECONDS);
fail("Expected TransactionCommitFailedException");
- } catch (final TransactionCommitFailedException e) {
+ } catch (final ExecutionException e) {
// Expected
}
final TransactionChainListener listener = Mockito.mock(TransactionChainListener.class);
final DOMTransactionChain txChain = broker.createTransactionChain(listener);
- final DOMDataWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+ final DOMDataTreeWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
writeTx.put(LogicalDatastoreType.CONFIGURATION, PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
writeTx.merge(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH, invalidData);
try {
- writeTx.submit().checkedGet(5, TimeUnit.SECONDS);
+ writeTx.commit().get(5, TimeUnit.SECONDS);
fail("Expected TransactionCommitFailedException");
- } catch (final TransactionCommitFailedException e) {
+ } catch (final ExecutionException e) {
// Expected
}
@Test
public void testSingleShardTransactionsWithLeaderChanges() throws Exception {
+ followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
final String testName = "testSingleShardTransactionsWithLeaderChanges";
initDatastoresWithCars(testName);
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
.shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
- JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+ TestKit.shutdownActorSystem(leaderSystem, true);
Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
followerTestKit.waitUntilNoLeader(followerDistributedDataStore.getActorContext(), CARS);
initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
- final Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext()
- .findLocalShard("cars");
+ final com.google.common.base.Optional<ActorRef> carsFollowerShard =
+ followerDistributedDataStore.getActorContext().findLocalShard("cars");
assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
- final TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
- dataTree.setSchemaContext(SchemaContextHelper.full());
+ final DataTree dataTree = new InMemoryDataTreeFactory().create(
+ DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
// Send a tx with immediate commit.
new WriteModification(CarsModel.newCarPath("optima"), car1).apply(modification);
modification.ready();
- ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true);
+ ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true,
+ java.util.Optional.empty());
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
new WriteModification(CarsModel.newCarPath("sportage"), car2).apply(modification);
modification.ready();
- readyLocal = new ReadyLocalTransaction(tx2 , modification, false);
+ readyLocal = new ReadyLocalTransaction(tx2 , modification, false, java.util.Optional.empty());
carsFollowerShard.get().tell(readyLocal, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), "cars");
- final Optional<ActorRef> carsFollowerShard = followerDistributedDataStore.getActorContext()
- .findLocalShard("cars");
+ final com.google.common.base.Optional<ActorRef> carsFollowerShard =
+ followerDistributedDataStore.getActorContext().findLocalShard("cars");
assertEquals("Cars follower shard found", true, carsFollowerShard.isPresent());
carsFollowerShard.get().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true);
+ Mockito.mock(ShardDataTreeTransactionParent.class), tx1, modification), true,
+ java.util.Optional.empty());
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
Object resp = followerTestKit.expectMsgClass(Object.class);
forwardedReady = new ForwardedReadyTransaction(tx2,
DataStoreVersions.CURRENT_VERSION, new ReadWriteShardDataTreeTransaction(
- Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false);
+ Mockito.mock(ShardDataTreeTransactionParent.class), tx2, modification), false,
+ java.util.Optional.empty());
carsFollowerShard.get().tell(forwardedReady, followerTestKit.getRef());
resp = followerTestKit.expectMsgClass(Object.class);
// Wait for the commit to be replicated to the follower.
MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
- raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
+ raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
MemberNode.verifyRaftState(followerDistributedDataStore, "people",
- raftState -> assertEquals("getLastApplied", 0, raftState.getLastApplied()));
+ raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
// Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
// the leader shard.
@Test
public void testTransactionWithShardLeaderNotResponding() throws Exception {
+ followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
followerDatastoreContextBuilder.shardElectionTimeoutFactor(50);
initDatastoresWithCars("testTransactionWithShardLeaderNotResponding");
// Do an initial read to get the primary shard info cached.
final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
- readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+ readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
// Shutdown the leader and try to create a new tx.
- JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+ TestKit.shutdownActorSystem(leaderSystem, true);
followerDatastoreContextBuilder.operationTimeoutInMillis(50).shardElectionTimeoutFactor(1);
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder);
fail("Exception expected");
} catch (final ExecutionException e) {
final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
- assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
- || e.getCause() instanceof ShardLeaderNotRespondingException);
+ if (DistributedDataStore.class.equals(testParameter)) {
+ assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException
+ || e.getCause() instanceof ShardLeaderNotRespondingException);
+ } else {
+ assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
+ }
}
}
@Test
public void testTransactionWithCreateTxFailureDueToNoLeader() throws Exception {
+ followerDatastoreContextBuilder.frontendRequestTimeoutInSeconds(2);
initDatastoresWithCars("testTransactionWithCreateTxFailureDueToNoLeader");
// Do an initial read to get the primary shard info cached.
final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
- readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+ readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
// Shutdown the leader and try to create a new tx.
- JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+ TestKit.shutdownActorSystem(leaderSystem, true);
Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
followerTestKit.doCommit(rwTx.ready());
fail("Exception expected");
} catch (final ExecutionException e) {
- final String msg = "Expected instance of NoShardLeaderException, actual: \n"
- + Throwables.getStackTraceAsString(e.getCause());
- assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
+ final String msg = "Unexpected exception: " + Throwables.getStackTraceAsString(e.getCause());
+ if (DistributedDataStore.class.equals(testParameter)) {
+ assertTrue(msg, Throwables.getRootCause(e) instanceof NoShardLeaderException);
+ } else {
+ assertTrue(msg, Throwables.getRootCause(e) instanceof RequestTimeoutException);
+ }
}
}
@Test
public void testTransactionRetryWithInitialAskTimeoutExOnCreateTx() throws Exception {
+ followerDatastoreContextBuilder.backendAlivenessTimerIntervalInSeconds(2);
String testName = "testTransactionRetryWithInitialAskTimeoutExOnCreateTx";
- initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS);
+ initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
final DatastoreContext.Builder follower2DatastoreContextBuilder = DatastoreContext.newBuilder()
- .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+ .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10);
final IntegrationTestKit follower2TestKit = new IntegrationTestKit(
follower2System, follower2DatastoreContextBuilder, commitTimeout);
try (AbstractDataStore ds =
follower2TestKit.setupAbstractDataStore(
- testParameter, testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) {
+ testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false, CARS)) {
followerTestKit.waitForMembersUp("member-1", "member-3");
follower2TestKit.waitForMembersUp("member-1", "member-2");
// Do an initial read to get the primary shard info cached.
final DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
- readTx.read(CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+ readTx.read(CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
// Shutdown the leader and try to create a new tx.
- JavaTestKit.shutdownActorSystem(leaderSystem, null, true);
+ TestKit.shutdownActorSystem(leaderSystem, true);
Cluster.get(followerSystem).leave(MEMBER_1_ADDRESS);
sendDatastoreContextUpdate(followerDistributedDataStore, followerDatastoreContextBuilder
- .operationTimeoutInMillis(500).shardElectionTimeoutFactor(1).customRaftPolicyImplementation(null));
+ .operationTimeoutInMillis(500).shardElectionTimeoutFactor(5).customRaftPolicyImplementation(null));
final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
// Setup a saved snapshot on the leader. The follower will startup with no data and the leader should
// install a snapshot to sync the follower.
- TipProducingDataTree tree = InMemoryDataTreeFactory.getInstance().create(TreeType.CONFIGURATION);
- tree.setSchemaContext(SchemaContextHelper.full());
+ DataTree tree = new InMemoryDataTreeFactory().create(DataTreeConfiguration.DEFAULT_CONFIGURATION,
+ SchemaContextHelper.full());
final ContainerNode carsNode = CarsModel.newCarsNode(
CarsModel.newCarsMapNode(CarsModel.newCarEntry("optima", BigInteger.valueOf(20000))));
initDatastoresWithCars(testName);
final Optional<NormalizedNode<?, ?>> readOptional = leaderDistributedDataStore.newReadOnlyTransaction().read(
- CarsModel.BASE_PATH).checkedGet(5, TimeUnit.SECONDS);
+ CarsModel.BASE_PATH).get(5, TimeUnit.SECONDS);
assertEquals("isPresent", true, readOptional.isPresent());
assertEquals("Node", carsNode, readOptional.get());
initialSnapshot, snapshotRoot);
}
+ @Test
+ public void testReadWriteMessageSlicing() throws Exception {
+ // The slicing is only implemented for tell-based protocol
+ Assume.assumeTrue(testParameter.equals(ClientBackedDataStore.class));
+
+ leaderDatastoreContextBuilder.maximumMessageSliceSize(100);
+ followerDatastoreContextBuilder.maximumMessageSliceSize(100);
+ initDatastoresWithCars("testLargeReadReplySlicing");
+
+ final DOMStoreReadWriteTransaction rwTx = followerDistributedDataStore.newReadWriteTransaction();
+
+ final NormalizedNode<?, ?> carsNode = CarsModel.create();
+ rwTx.write(CarsModel.BASE_PATH, carsNode);
+
+ verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
+ }
+
private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
final NormalizedNode<?, ?> expRoot) {
assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());