The common case is when we do not have any user cohorts, in which
case these is nothing we need to do. Address the FIXME by adding
shortcuts which transition state directly without burdening the
global execution context.
Change-Id: I38e163a879949c3755322ed371db3bff5d28142f
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
import akka.dispatch.Recover;
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
import akka.dispatch.Recover;
-import akka.japi.Function;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.Iterator;
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.Iterator;
private final DataTreeCohortActorRegistry registry;
private final TransactionIdentifier txId;
private final SchemaContext schema;
private final DataTreeCohortActorRegistry registry;
private final TransactionIdentifier txId;
private final SchemaContext schema;
void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
- // FIXME: Optimize empty collection list with pre-created futures, containing success.
- Future<Iterable<Object>> canCommitsFuture =
- Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
- @Override
- public Future<Object> apply(final CanCommit input) {
- return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
- ExecutionContexts.global());
- }
- }, ExecutionContexts.global());
+ if (messages.isEmpty()) {
+ successfulFromPrevious = ImmutableList.of();
+ changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
+ return;
+ }
+
+ Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
+ input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
+ ExecutionContexts.global()), ExecutionContexts.global());
changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
}
void preCommit() throws ExecutionException, TimeoutException {
Preconditions.checkState(successfulFromPrevious != null);
changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
}
void preCommit() throws ExecutionException, TimeoutException {
Preconditions.checkState(successfulFromPrevious != null);
+ if (isEmpty()) {
+ changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
+ return;
+ }
+
Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
void commit() throws ExecutionException, TimeoutException {
Preconditions.checkState(successfulFromPrevious != null);
void commit() throws ExecutionException, TimeoutException {
Preconditions.checkState(successfulFromPrevious != null);
+ if (isEmpty()) {
+ changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
+ return;
+ }
+
Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
return Optional.empty();
}
return Optional.empty();
}
- private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
- return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
-
- @Override
- public Future<Object> apply(final DataTreeCohortActor.Success cohortResponse) throws Exception {
- return Patterns.ask(cohortResponse.getCohort(), message, timeout);
- }
+ private boolean isEmpty() {
+ return successfulFromPrevious instanceof Collection && ((Collection<?>) successfulFromPrevious).isEmpty();
+ }
- }, ExecutionContexts.global());
+ private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
+ return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(cohortResponse.getCohort(),
+ message, timeout), ExecutionContexts.global());
- private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState, final State afterState)
- throws TimeoutException, ExecutionException {
+ private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
+ final State afterState) throws TimeoutException, ExecutionException {
final Iterable<Object> results;
try {
results = Await.result(resultsFuture, timeout.duration());
final Iterable<Object> results;
try {
results = Await.result(resultsFuture, timeout.duration());
changeStateFrom(currentState, afterState);
}
changeStateFrom(currentState, afterState);
}
- void changeStateFrom(final State expected, final State followup) {
+ private void changeStateFrom(final State expected, final State followup) {
Preconditions.checkState(state == expected);
state = followup;
}
Preconditions.checkState(state == expected);
state = followup;
}