import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.Iterator;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
@Override
- public Failure recover(Throwable error) throws Throwable {
+ public Failure recover(final Throwable error) throws Throwable {
return new Failure(error);
}
};
private Iterable<Success> successfulFromPrevious;
private State state = State.IDLE;
- CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, TransactionIdentifier transactionID,
- SchemaContext schema, Timeout timeout) {
+ CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
+ final SchemaContext schema, final Timeout timeout) {
this.registry = Preconditions.checkNotNull(registry);
this.txId = Preconditions.checkNotNull(transactionID);
this.schema = Preconditions.checkNotNull(schema);
this.timeout = Preconditions.checkNotNull(timeout);
}
- void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException {
+ void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
// FIXME: Optimize empty collection list with pre-created futures, containing success.
Future<Iterable<Object>> canCommitsFuture =
Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
@Override
- public Future<Object> apply(CanCommit input) {
+ public Future<Object> apply(final CanCommit input) {
return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
ExecutionContexts.global());
}
processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
}
- void abort() throws TimeoutException {
+ Optional<Future<Iterable<Object>>> abort() {
if (successfulFromPrevious != null) {
- sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId));
+ return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
}
+
+ return Optional.empty();
}
private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
@Override
- public Future<Object> apply(DataTreeCohortActor.Success cohortResponse) throws Exception {
+ public Future<Object> apply(final DataTreeCohortActor.Success cohortResponse) throws Exception {
return Patterns.ask(cohortResponse.getCohort(), message, timeout);
}
}, ExecutionContexts.global());
}
- private void processResponses(Future<Iterable<Object>> resultsFuture, State currentState, State afterState)
+ private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState, final State afterState)
throws TimeoutException, ExecutionException {
final Iterable<Object> results;
try {
changeStateFrom(currentState, afterState);
}
- void changeStateFrom(State expected, State followup) {
+ void changeStateFrom(final State expected, final State followup) {
Preconditions.checkState(state == expected);
state = followup;
}