import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.CLIENT_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.HISTORY_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
import akka.actor.ActorSystem;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
import akka.actor.ActorSystem;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
import org.opendaylight.controller.cluster.access.ABIVersion;
import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.common.api.CommitInfo;
+import org.opendaylight.yangtools.yang.common.Empty;
system = ActorSystem.apply();
final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
final ClientActorContext context =
AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
system = ActorSystem.apply();
final TestProbe clientContextProbe = new TestProbe(system, "clientContext");
final ClientActorContext context =
AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
- transactions = new ArrayList<>();
+ doReturn(1000).when(datastoreContext).getShardBatchedModificationCount();
+ doReturn(datastoreContext).when(actorUtils).getDatastoreContext();
+ doReturn(actorUtils).when(history).actorUtils();
+
for (int i = 0; i < TRANSACTIONS; i++) {
transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history));
}
for (int i = 0; i < TRANSACTIONS; i++) {
transactions.add(createTransactionTester(new TestProbe(system, "backend" + i), context, history));
}
}
@Test
public void testCanCommit() throws Exception {
testOpSuccess(ClientTransactionCommitCohort::canCommit, this::expectCanCommit,
}
@Test
public void testCanCommit() throws Exception {
testOpSuccess(ClientTransactionCommitCohort::canCommit, this::expectCanCommit,
@Test
public void testPreCommit() throws Exception {
testOpSuccess(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess,
@Test
public void testPreCommit() throws Exception {
testOpSuccess(ClientTransactionCommitCohort::preCommit, this::expectPreCommit, this::replyPreCommitSuccess,
- testOpSuccess(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess, null);
+ testOpSuccess(ClientTransactionCommitCohort::commit, this::expectCommit, this::replyCommitSuccess,
+ CommitInfo.empty());
- testOpSuccess(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess, null);
+ testOpSuccess(ClientTransactionCommitCohort::abort, this::expectAbort, this::replyAbortSuccess, Empty.value());
private void expectCanCommit(final TransactionTester<RemoteProxyTransaction> tester) {
final ModifyTransactionRequest request = tester.expectTransactionRequest(ModifyTransactionRequest.class);
private void expectCanCommit(final TransactionTester<RemoteProxyTransaction> tester) {
final ModifyTransactionRequest request = tester.expectTransactionRequest(ModifyTransactionRequest.class);
- Assert.assertTrue(request.getPersistenceProtocol().isPresent());
- Assert.assertEquals(PersistenceProtocol.THREE_PHASE, request.getPersistenceProtocol().get());
+ assertEquals(Optional.of(PersistenceProtocol.THREE_PHASE), request.getPersistenceProtocol());
private static TransactionTester<RemoteProxyTransaction> createTransactionTester(final TestProbe backendProbe,
final ClientActorContext context,
final AbstractClientHistory history) {
private static TransactionTester<RemoteProxyTransaction> createTransactionTester(final TestProbe backendProbe,
final ClientActorContext context,
final AbstractClientHistory history) {
- final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
+ final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.current(),
"default", UnsignedLong.ZERO, Optional.empty(), 3);
final AbstractClientConnection<ShardBackendInfo> connection =
AccessClientUtil.createConnectedConnection(context, 0L, backend);
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
final RemoteProxyTransaction transaction =
"default", UnsignedLong.ZERO, Optional.empty(), 3);
final AbstractClientConnection<ShardBackendInfo> connection =
AccessClientUtil.createConnectedConnection(context, 0L, backend);
final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
final RemoteProxyTransaction transaction =
- new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
+ new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false, false);
final T expectedResult) throws Exception {
final ListenableFuture<T> result = operation.apply(cohort);
replySuccess(transactions, expectFunction, replyFunction);
final T expectedResult) throws Exception {
final ListenableFuture<T> result = operation.apply(cohort);
replySuccess(transactions, expectFunction, replyFunction);
//reply fail to last transaction
final TransactionTester<RemoteProxyTransaction> last = transactions.get(transactions.size() - 1);
expectFunction.accept(last);
//reply fail to last transaction
final TransactionTester<RemoteProxyTransaction> last = transactions.get(transactions.size() - 1);
expectFunction.accept(last);
- final RuntimeRequestException cause = new RuntimeRequestException("fail", new RuntimeException());
+ final RuntimeException e = new RuntimeException();
+ final RuntimeRequestException cause = new RuntimeRequestException("fail", e);
last.replyFailure(cause);
//check future fail
final ExecutionException exception =
assertOperationThrowsException(() -> getWithTimeout(canCommit), ExecutionException.class);
last.replyFailure(cause);
//check future fail
final ExecutionException exception =
assertOperationThrowsException(() -> getWithTimeout(canCommit), ExecutionException.class);