X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FCompositeDataTreeCohort.java;h=a84fa4610d5763c9ab84ae73687640f1236024f5;hp=4044a4cb3a0ec991ca429a1d5c3dac63db426768;hb=e78622411319748472b5d9edab14eb6dc92cf6b1;hpb=806ca77eb8f5c6a7de07f5f6a0f2b3d234752050 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java index 4044a4cb3a..a84fa4610d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CompositeDataTreeCohort.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorRef; import akka.actor.Status; import akka.actor.Status.Failure; import akka.dispatch.ExecutionContexts; @@ -17,9 +18,14 @@ import akka.pattern.Patterns; import akka.util.Timeout; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -82,19 +88,19 @@ class CompositeDataTreeCohort { ABORTED } - protected static final Recover EXCEPTION_TO_MESSAGE = new Recover() { + static final Recover EXCEPTION_TO_MESSAGE = new Recover() { @Override - public Failure recover(final Throwable error) throws Throwable { + public Failure recover(final Throwable error) { return new Failure(error); } }; - private final DataTreeCohortActorRegistry registry; private final TransactionIdentifier txId; private final SchemaContext schema; private final Timeout timeout; - private Iterable successfulFromPrevious; + + private List successfulFromPrevious; private State state = State.IDLE; CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID, @@ -114,8 +120,13 @@ class CompositeDataTreeCohort { case COMMIT_SENT: abort(); break; - default : + case ABORTED: + case COMMITED: + case FAILED: + case IDLE: break; + default: + throw new IllegalStateException("Unhandled state " + state); } successfulFromPrevious = null; @@ -123,86 +134,139 @@ class CompositeDataTreeCohort { } void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException { - LOG.debug("{}: canCommit - candidate: {}", txId, tip); - - Collection messages = registry.createCanCommitMessages(txId, tip, schema); + LOG.debug("{}: canCommit - candidate: {}", txId, tip); + final List messages = registry.createCanCommitMessages(txId, tip, schema); LOG.debug("{}: canCommit - messages: {}", txId, messages); + if (messages.isEmpty()) { + successfulFromPrevious = ImmutableList.of(); + changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL); + return; + } + + final List>> futures = new ArrayList<>(messages.size()); + for (CanCommit message : messages) { + final ActorRef actor = message.getCohort(); + final Future future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE, + ExecutionContexts.global()); + LOG.trace("{}: requesting canCommit from {}", txId, actor); + futures.add(new SimpleImmutableEntry<>(actor, future)); + } - // FIXME: Optimize empty collection list with pre-created futures, containing success. - Future> canCommitsFuture = Futures.traverse(messages, - input -> Patterns.ask(input.getCohort(), input, timeout).recover(EXCEPTION_TO_MESSAGE, - ExecutionContexts.global()), ExecutionContexts.global()); changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT); - processResponses(canCommitsFuture, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL); + processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL); } void preCommit() throws ExecutionException, TimeoutException { LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious); Preconditions.checkState(successfulFromPrevious != null); - Future> preCommitFutures = sendMesageToSuccessful(new DataTreeCohortActor.PreCommit(txId)); + if (successfulFromPrevious.isEmpty()) { + changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL); + return; + } + + final List>> futures = sendMessageToSuccessful( + new DataTreeCohortActor.PreCommit(txId)); changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT); - processResponses(preCommitFutures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL); + processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL); } void commit() throws ExecutionException, TimeoutException { LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious); + if (successfulFromPrevious.isEmpty()) { + changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED); + return; + } Preconditions.checkState(successfulFromPrevious != null); - Future> commitsFuture = sendMesageToSuccessful(new DataTreeCohortActor.Commit(txId)); + final List>> futures = sendMessageToSuccessful( + new DataTreeCohortActor.Commit(txId)); changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT); - processResponses(commitsFuture, State.COMMIT_SENT, State.COMMITED); + processResponses(futures, State.COMMIT_SENT, State.COMMITED); } - Optional>> abort() { + Optional>> abort() { LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious); state = State.ABORTED; - if (successfulFromPrevious != null && !Iterables.isEmpty(successfulFromPrevious)) { - return Optional.of(sendMesageToSuccessful(new DataTreeCohortActor.Abort(txId))); + if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) { + return Optional.empty(); } - return Optional.empty(); + final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId); + final List> futures = new ArrayList<>(successfulFromPrevious.size()); + for (Success s : successfulFromPrevious) { + futures.add(Patterns.ask(s.getCohort(), message, timeout)); + } + return Optional.of(futures); } - private Future> sendMesageToSuccessful(final Object message) { + private List>> sendMessageToSuccessful(final Object message) { LOG.debug("{}: sendMesageToSuccessful: {}", txId, message); - return Futures.traverse(successfulFromPrevious, cohortResponse -> Patterns.ask( - cohortResponse.getCohort(), message, timeout), ExecutionContexts.global()); + final List>> ret = new ArrayList<>(successfulFromPrevious.size()); + for (Success s : successfulFromPrevious) { + final ActorRef actor = s.getCohort(); + ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout))); + } + return ret; } @SuppressWarnings("checkstyle:IllegalCatch") - private void processResponses(final Future> resultsFuture, final State currentState, + private void processResponses(final List>> futures, final State currentState, final State afterState) throws TimeoutException, ExecutionException { LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState); final Iterable results; try { - results = Await.result(resultsFuture, timeout.duration()); + results = Await.result(Futures.sequence(Lists.transform(futures, e -> e.getValue()), + ExecutionContexts.global()), timeout.duration()); + } catch (TimeoutException e) { + successfulFromPrevious = null; + LOG.debug("{}: processResponses - error from Future", txId, e); + + for (Entry> f : futures) { + if (!f.getValue().isCompleted()) { + LOG.info("{}: actor {} failed to respond", txId, f.getKey()); + } + } + throw e; + } catch (ExecutionException e) { + successfulFromPrevious = null; + LOG.debug("{}: processResponses - error from Future", txId, e); + throw e; } catch (Exception e) { successfulFromPrevious = null; LOG.debug("{}: processResponses - error from Future", txId, e); - Throwables.propagateIfInstanceOf(e, TimeoutException.class); - throw Throwables.propagate(e); + throw new ExecutionException(e); + } + + final Collection failed = new ArrayList<>(1); + final List successful = new ArrayList<>(futures.size()); + for (Object result : results) { + if (result instanceof DataTreeCohortActor.Success) { + successful.add((Success) result); + } else if (result instanceof Status.Failure) { + failed.add((Failure) result); + } else { + LOG.warn("{}: unrecognized response {}, ignoring it", result); + } } - Iterable failed = Iterables.filter(results, Status.Failure.class); - Iterable successful = Iterables.filter(results, DataTreeCohortActor.Success.class); LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed); successfulFromPrevious = successful; - if (!Iterables.isEmpty(failed)) { + if (!failed.isEmpty()) { changeStateFrom(currentState, State.FAILED); - Iterator it = failed.iterator(); - Throwable firstEx = it.next().cause(); + final Iterator it = failed.iterator(); + final Throwable firstEx = it.next().cause(); while (it.hasNext()) { firstEx.addSuppressed(it.next().cause()); } - Throwables.propagateIfPossible(firstEx, ExecutionException.class); - Throwables.propagateIfPossible(firstEx, TimeoutException.class); - throw Throwables.propagate(firstEx); + Throwables.propagateIfInstanceOf(firstEx, ExecutionException.class); + Throwables.propagateIfInstanceOf(firstEx, TimeoutException.class); + throw new ExecutionException(firstEx); } changeStateFrom(currentState, afterState); }