import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
*
*/
class CompositeDataTreeCohort {
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
private enum State {
/**
}
void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
+ LOG.debug("{}: canCommit - candidate: {}", txId, tip);
+
Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
+
+ LOG.debug("{}: canCommit - messages: {}", txId, messages);
+
// FIXME: Optimize empty collection list with pre-created futures, containing success.
Future<Iterable<Object>> canCommitsFuture = Futures.traverse(messages,
input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
}
void preCommit() throws ExecutionException, TimeoutException {
+ LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
Preconditions.checkState(successfulFromPrevious != null);
Future<Iterable<Object>> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId));
changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
}
void commit() throws ExecutionException, TimeoutException {
+ LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
Preconditions.checkState(successfulFromPrevious != null);
Future<Iterable<Object>> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId));
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
}
Optional<Future<Iterable<Object>>> abort() {
+ LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
+
state = State.ABORTED;
if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) {
return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId)));
}
private Future<Iterable<Object>> sendMesageToSuccessful(final Object message) {
+ LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
+
return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask(
cohortResponse.getCohort(), message, timeout), ExecutionContexts.global());
}
@SuppressWarnings("checkstyle:IllegalCatch")
private void processResponses(final Future<Iterable<Object>> resultsFuture, final State currentState,
final State afterState) throws TimeoutException, ExecutionException {
+ LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
+
final Iterable<Object> results;
try {
results = Await.result(resultsFuture, timeout.duration());
} catch (Exception e) {
successfulFromPrevious = null;
+ LOG.debug("{}: processResponses - error from Future", txId, e);
Throwables.propagateIfInstanceOf(e, TimeoutException.class);
throw Throwables.propagate(e);
}
Iterable<Failure> failed = Iterables.filter(results, Status.Failure.class);
Iterable<Success> successful = Iterables.filter(results, DataTreeCohortActor.Success.class);
+
+ LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
+
successfulFromPrevious = successful;
if (!Iterables.isEmpty(failed)) {
changeStateFrom(currentState, State.FAILED);