2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.Status;
13 import akka.actor.Status.Failure;
14 import akka.dispatch.ExecutionContexts;
15 import akka.dispatch.Futures;
16 import akka.dispatch.Recover;
17 import akka.pattern.Patterns;
18 import akka.util.Timeout;
19 import com.google.common.base.Preconditions;
20 import com.google.common.base.Throwables;
21 import com.google.common.collect.ImmutableList;
22 import com.google.common.collect.Lists;
23 import java.util.AbstractMap.SimpleImmutableEntry;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map.Entry;
29 import java.util.Optional;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.TimeoutException;
32 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
33 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
34 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
35 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
43 * Composite cohort, which coordinates multiple user-provided cohorts as if it was only one cohort.
45 * It tracks current operation and list of cohorts which successfuly finished previous phase in
46 * case, if abort is necessary to invoke it only on cohort steps which are still active.
49 class CompositeDataTreeCohort {
50 private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
54 * Cohorts are idle, no messages were sent.
58 * CanCommit message was sent to all participating cohorts.
62 * Successful canCommit responses were received from every participating cohort.
64 CAN_COMMIT_SUCCESSFUL,
66 * PreCommit message was sent to all participating cohorts.
70 * Successful preCommit responses were received from every participating cohort.
72 PRE_COMMIT_SUCCESSFUL,
74 * Commit message was send to all participating cohorts.
78 * Successful commit responses were received from all participating cohorts.
82 * Some of cohorts responded back with unsuccessful message.
86 * Abort message was send to all cohorts which responded with success previously.
91 static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
93 public Failure recover(final Throwable error) {
94 return new Failure(error);
98 private final DataTreeCohortActorRegistry registry;
99 private final TransactionIdentifier txId;
100 private final SchemaContext schema;
101 private final Timeout timeout;
103 private List<Success> successfulFromPrevious;
104 private State state = State.IDLE;
106 CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
107 final SchemaContext schema, final Timeout timeout) {
108 this.registry = Preconditions.checkNotNull(registry);
109 this.txId = Preconditions.checkNotNull(transactionID);
110 this.schema = Preconditions.checkNotNull(schema);
111 this.timeout = Preconditions.checkNotNull(timeout);
116 case CAN_COMMIT_SENT:
117 case CAN_COMMIT_SUCCESSFUL:
118 case PRE_COMMIT_SENT:
119 case PRE_COMMIT_SUCCESSFUL:
129 throw new IllegalStateException("Unhandled state " + state);
132 successfulFromPrevious = null;
136 void canCommit(final DataTreeCandidate tip) throws ExecutionException, TimeoutException {
137 if (LOG.isTraceEnabled()) {
138 LOG.trace("{}: canCommit - candidate: {}", txId, tip);
140 LOG.debug("{}: canCommit - candidate rootPath: {}", txId, tip.getRootPath());
143 final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
144 LOG.debug("{}: canCommit - messages: {}", txId, messages);
145 if (messages.isEmpty()) {
146 successfulFromPrevious = ImmutableList.of();
147 changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
151 final List<Entry<ActorRef, Future<Object>>> futures = new ArrayList<>(messages.size());
152 for (CanCommit message : messages) {
153 final ActorRef actor = message.getCohort();
154 final Future<Object> future = Patterns.ask(actor, message, timeout).recover(EXCEPTION_TO_MESSAGE,
155 ExecutionContexts.global());
156 LOG.trace("{}: requesting canCommit from {}", txId, actor);
157 futures.add(new SimpleImmutableEntry<>(actor, future));
160 changeStateFrom(State.IDLE, State.CAN_COMMIT_SENT);
161 processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL);
164 void preCommit() throws ExecutionException, TimeoutException {
165 LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
167 Preconditions.checkState(successfulFromPrevious != null);
168 if (successfulFromPrevious.isEmpty()) {
169 changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SUCCESSFUL);
173 final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
174 new DataTreeCohortActor.PreCommit(txId));
175 changeStateFrom(State.CAN_COMMIT_SUCCESSFUL, State.PRE_COMMIT_SENT);
176 processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL);
179 void commit() throws ExecutionException, TimeoutException {
180 LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
181 if (successfulFromPrevious.isEmpty()) {
182 changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
186 Preconditions.checkState(successfulFromPrevious != null);
187 final List<Entry<ActorRef, Future<Object>>> futures = sendMessageToSuccessful(
188 new DataTreeCohortActor.Commit(txId));
189 changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMIT_SENT);
190 processResponses(futures, State.COMMIT_SENT, State.COMMITED);
193 Optional<List<Future<Object>>> abort() {
194 LOG.debug("{}: abort - successfulFromPrevious: {}", txId, successfulFromPrevious);
196 state = State.ABORTED;
197 if (successfulFromPrevious == null || successfulFromPrevious.isEmpty()) {
198 return Optional.empty();
201 final DataTreeCohortActor.Abort message = new DataTreeCohortActor.Abort(txId);
202 final List<Future<Object>> futures = new ArrayList<>(successfulFromPrevious.size());
203 for (Success s : successfulFromPrevious) {
204 futures.add(Patterns.ask(s.getCohort(), message, timeout));
206 return Optional.of(futures);
209 private List<Entry<ActorRef, Future<Object>>> sendMessageToSuccessful(final Object message) {
210 LOG.debug("{}: sendMesageToSuccessful: {}", txId, message);
212 final List<Entry<ActorRef, Future<Object>>> ret = new ArrayList<>(successfulFromPrevious.size());
213 for (Success s : successfulFromPrevious) {
214 final ActorRef actor = s.getCohort();
215 ret.add(new SimpleImmutableEntry<>(actor, Patterns.ask(actor, message, timeout)));
220 @SuppressWarnings("checkstyle:IllegalCatch")
221 private void processResponses(final List<Entry<ActorRef, Future<Object>>> futures, final State currentState,
222 final State afterState) throws TimeoutException, ExecutionException {
223 LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
225 final Iterable<Object> results;
227 results = Await.result(Futures.sequence(Lists.transform(futures, e -> e.getValue()),
228 ExecutionContexts.global()), timeout.duration());
229 } catch (TimeoutException e) {
230 successfulFromPrevious = null;
231 LOG.debug("{}: processResponses - error from Future", txId, e);
233 for (Entry<ActorRef, Future<Object>> f : futures) {
234 if (!f.getValue().isCompleted()) {
235 LOG.info("{}: actor {} failed to respond", txId, f.getKey());
239 } catch (ExecutionException e) {
240 successfulFromPrevious = null;
241 LOG.debug("{}: processResponses - error from Future", txId, e);
243 } catch (Exception e) {
244 successfulFromPrevious = null;
245 LOG.debug("{}: processResponses - error from Future", txId, e);
246 throw new ExecutionException(e);
249 final Collection<Failure> failed = new ArrayList<>(1);
250 final List<Success> successful = new ArrayList<>(futures.size());
251 for (Object result : results) {
252 if (result instanceof DataTreeCohortActor.Success) {
253 successful.add((Success) result);
254 } else if (result instanceof Status.Failure) {
255 failed.add((Failure) result);
257 LOG.warn("{}: unrecognized response {}, ignoring it", result);
261 LOG.debug("{}: processResponses - successful: {}, failed: {}", txId, successful, failed);
263 successfulFromPrevious = successful;
264 if (!failed.isEmpty()) {
265 changeStateFrom(currentState, State.FAILED);
266 final Iterator<Failure> it = failed.iterator();
267 final Throwable firstEx = it.next().cause();
268 while (it.hasNext()) {
269 firstEx.addSuppressed(it.next().cause());
271 Throwables.propagateIfInstanceOf(firstEx, ExecutionException.class);
272 Throwables.propagateIfInstanceOf(firstEx, TimeoutException.class);
273 throw new ExecutionException(firstEx);
275 changeStateFrom(currentState, afterState);
278 void changeStateFrom(final State expected, final State followup) {
279 Preconditions.checkState(state == expected);