import akka.dispatch.ExecutionContexts;
import akka.dispatch.Futures;
import akka.dispatch.Recover;
-import akka.japi.Function;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.Iterator;
+import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
/**
- *
* Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
- *
+ * <p/>
* It tracks current operation and list of cohorts which successfuly finished previous phase in
* case, if abort is necessary to invoke it only on cohort steps which are still active.
*
*/
class CompositeDataTreeCohort {
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
private enum State {
/**
*/
COMMITED,
/**
- * Some of cohorts responsed back with unsuccessful message.
- *
+ * Some of cohorts responded back with unsuccessful message.
*/
FAILED,
/**
- *
* Abort message was send to all cohorts which responded with success previously.
- *
*/
ABORTED
}
protected static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
@Override
- public Failure recover(Throwable error) throws Throwable {
+ public Failure recover(final Throwable error) throws Throwable {
return new Failure(error);
}
};
private final DataTreeCohortActorRegistry registry;
- private final String txId;
+ private final TransactionIdentifier txId;
private final SchemaContext schema;
private final Timeout timeout;
private Iterable<Success> successfulFromPrevious;
private State state = State.IDLE;
- CompositeDataTreeCohort(DataTreeCohortActorRegistry registry, String txId, SchemaContext schema, Timeout timeout) {
+ CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
+ final SchemaContext schema, final Timeout timeout) {
this.registry = Preconditions.checkNotNull(registry);
- this.txId = Preconditions.checkNotNull(txId);
+ this.txId = Preconditions.checkNotNull(transactionID);
this.schema = Preconditions.checkNotNull(schema);
this.timeout = Preconditions.checkNotNull(timeout);
}
- void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException {
+ void reset() {
+ switch (state) {
+ case CAN_COMMIT_SENT:
+ case CAN_COMMIT_SUCCESSFUL:
+ case PRE_COMMIT_SENT:
+ case PRE_COMMIT_SUCCESSFUL:
+ case COMMIT_SENT:
+ abort();
+ break;
+ default :
+ break;
+ }
+
+ successfulFromPrevious = null;
+ state = State.IDLE;
+ }
+
+ void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
+ LOG.debug("{}: canCommit - candidate: {}", txId, tip);
+
Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
+
+ LOG.debug("{}: canCommit - messages: {}", txId, messages);
+
// FIXME: Optimize empty collection list with pre-created futures, containing success.
- Future<Iterable<Object>> canCommitsFuture =
- Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
- @Override
- public Future<Object> apply(CanCommit input) {
- return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
- ExecutionContexts.global());
- }
- }, ExecutionContexts.global());
+ Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
+ input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
+ ExecutionContexts.global()), ExecutionContexts.global());
changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
}
void preCommit() throws ExecutionException, TimeoutException {
+ LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
Preconditions.checkState(successfulFromPrevious != null);
Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
}
void commit() throws ExecutionException, TimeoutException {
+ LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
Preconditions.checkState(successfulFromPrevious != null);
Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED);
}
- void abort() throws TimeoutException {
- if (successfulFromPrevious != null) {
- sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId));
+ Optional<Future<Iterable<Object>>> abort() {
+ LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
+ state = State.ABORTED;
+ if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) {
+ return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
}
+
+ return Optional.empty();
}
private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
- return Futures.traverse(successfulFromPrevious, new Function<DataTreeCohortActor.Success, Future<Object>>() {
+ LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
- @Override
- public Future<Object> apply(DataTreeCohortActor.Success cohortResponse) throws Exception {
- return Patterns.ask(cohortResponse.getCohort(), message, timeout);
- }
-
- }, ExecutionContexts.global());
+ return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(
+ cohortResponse.getCohort(), message, timeout), ExecutionContexts.global());
}
- private void processResponses(Future<Iterable<Object>> resultsFuture, State currentState, State afterState)
- throws TimeoutException, ExecutionException {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
+ final State afterState) throws TimeoutException, ExecutionException {
+ LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+
final Iterable<Object> results;
try {
results = Await.result(resultsFuture, timeout.duration());
} catch (Exception e) {
successfulFromPrevious = null;
+ LOG.debug("{}: processResponses - error from Future", txId, e);
Throwables.propagateIfInstanceOf(e, TimeoutException.class);
throw Throwables.propagate(e);
}
Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
+
+ LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
+
successfulFromPrevious = successful;
if (!Iterables.isEmpty(failed)) {
changeStateFrom(currentState, State.FAILED);
changeStateFrom(currentState, afterState);
}
- void changeStateFrom(State expected, State followup) {
+ void changeStateFrom(final State expected, final State followup) {
Preconditions.checkState(state == expected);
state = followup;
}