- void canCommit(DataTreeCandidateTip tip) throws ExecutionException, TimeoutException {
- Collection<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
- // FIXME: Optimize empty collection list with pre-created futures, containing success.
- Future<Iterable<Object>> canCommitsFuture =
- Futures.traverse(messages, new Function<CanCommit, Future<Object>>() {
- @Override
- public Future<Object> apply(CanCommit input) {
- return Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE,
- ExecutionContexts.global());
- }
- }, ExecutionContexts.global());
+ Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{}: canCommit - candidate: {}", txId, tip);
+ } else {
+ LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
+ }
+
+ final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
+ LOG.debug("{}: canCommit - messages: {}", txId, messages);
+ if (messages.isEmpty()) {
+ successfulFromPrevious = Collections.emptyList();
+ changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
+ return Optional.empty();
+ }
+
+ final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
+ for (CanCommit message : messages) {
+ final ActorRef actor = message.getCohort();
+ final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
+ ExecutionContexts.global());
+ LOG.trace("{}: requesting canCommit from {}", txId, actor);
+ futures.add(new SimpleImmutableEntry<>(actor, future));
+ }
+