import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
import akka.dispatch.Recover;
-import akka.japi.Function;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.Iterator;
}
};
-
private final DataTreeCohortActorRegistry registry;
private final TransactionIdentifier txId;
private final SchemaContext schema;
void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
- // FIXME: Optimize empty collection list with pre-created futures, containing success.
- Future<Iterable<Object>> canCommitsFuture =
- Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
- @Override
- public Future<Object> apply(final CanCommit input) {
- return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
- ExecutionContexts.global());
- }
- }, ExecutionContexts.global());
+ if (messages.isEmpty()) {
+ successfulFromPrevious = ImmutableList.of();
+ changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
+ return;
+ }
+
+ Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
+ input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
+ ExecutionContexts.global()), ExecutionContexts.global());
changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
}
void preCommit() throws ExecutionException, TimeoutException {
Preconditions.checkState(successfulFromPrevious != null);
+ if (isEmpty()) {
+ changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
+ return;
+ }
+
Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
void commit() throws ExecutionException, TimeoutException {
Preconditions.checkState(successfulFromPrevious != null);
+ if (isEmpty()) {
+ changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
+ return;
+ }
+
Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
return Optional.empty();
}
- private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
- return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
-
- @Override
- public Future<Object> apply(final DataTreeCohortActor.Success cohortResponse) throws Exception {
- return Patterns.ask(cohortResponse.getCohort(), message, timeout);
- }
+ private boolean isEmpty() {
+ return successfulFromPrevious instanceof Collection && ((Collection<?>) successfulFromPrevious).isEmpty();
+ }
- }, ExecutionContexts.global());
+ private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
+ return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(cohortResponse.getCohort(),
+ message, timeout), ExecutionContexts.global());
}
- private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState, final State afterState)
- throws TimeoutException, ExecutionException {
+ private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
+ final State afterState) throws TimeoutException, ExecutionException {
final Iterable<Object> results;
try {
results = Await.result(resultsFuture, timeout.duration());
changeStateFrom(currentState, afterState);
}
- void changeStateFrom(final State expected, final State followup) {
+ private void changeStateFrom(final State expected, final State followup) {
Preconditions.checkState(state == expected);
state = followup;
}