2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.Collection;
19 import java.util.HashMap;
21 import java.util.Objects;
22 import java.util.concurrent.Executor;
23 import javax.annotation.Nonnull;
24 import javax.annotation.Nullable;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
27 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
28 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
29 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
36 * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
37 * decapsulating DataTreeChanged messages and dispatching their context to the user.
39 final class DataTreeCohortActor extends AbstractUntypedActor {
40 private final Idle idleState = new Idle();
41 private final DOMDataTreeCommitCohort cohort;
42 private final YangInstanceIdentifier registeredPath;
43 private final Map<TransactionIdentifier, CohortBehaviour<?, ?>> currentStateMap = new HashMap<>();
45 private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
46 this.cohort = Objects.requireNonNull(cohort);
47 this.registeredPath = Objects.requireNonNull(registeredPath);
51 protected void handleReceive(final Object message) {
52 if (!(message instanceof CommitProtocolCommand)) {
53 unknownMessage(message);
57 CommitProtocolCommand<?> command = (CommitProtocolCommand<?>)message;
58 CohortBehaviour<?, ?> currentState = currentStateMap.computeIfAbsent(command.getTxId(), key -> idleState);
60 LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(),
61 currentState, message);
63 currentState.handle(command);
67 * Abstract message base for messages handled by {@link DataTreeCohortActor}.
69 * @param <R> Reply message type
71 abstract static class CommitProtocolCommand<R extends CommitReply> {
73 private final TransactionIdentifier txId;
75 final TransactionIdentifier getTxId() {
79 protected CommitProtocolCommand(TransactionIdentifier txId) {
80 this.txId = Objects.requireNonNull(txId);
84 public String toString() {
85 return getClass().getSimpleName() + " [txId=" + txId + "]";
89 static final class CanCommit extends CommitProtocolCommand<Success> {
91 private final Collection<DOMDataTreeCandidate> candidates;
92 private final ActorRef cohort;
93 private final SchemaContext schema;
95 CanCommit(TransactionIdentifier txId, Collection<DOMDataTreeCandidate> candidates, SchemaContext schema,
98 this.cohort = Objects.requireNonNull(cohort);
99 this.candidates = Objects.requireNonNull(candidates);
100 this.schema = Objects.requireNonNull(schema);
103 Collection<DOMDataTreeCandidate> getCandidates() {
107 SchemaContext getSchema() {
111 ActorRef getCohort() {
116 public String toString() {
117 return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort + "]";
121 abstract static class CommitReply {
123 private final ActorRef cohortRef;
124 private final TransactionIdentifier txId;
126 protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
127 this.cohortRef = Objects.requireNonNull(cohortRef);
128 this.txId = Objects.requireNonNull(txId);
131 ActorRef getCohort() {
135 final TransactionIdentifier getTxId() {
140 public String toString() {
141 return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]";
145 static final class Success extends CommitReply {
147 Success(ActorRef cohortRef, TransactionIdentifier txId) {
148 super(cohortRef, txId);
152 static final class PreCommit extends CommitProtocolCommand<Success> {
154 PreCommit(TransactionIdentifier txId) {
159 static final class Abort extends CommitProtocolCommand<Success> {
161 Abort(TransactionIdentifier txId) {
166 static final class Commit extends CommitProtocolCommand<Success> {
168 Commit(TransactionIdentifier txId) {
173 private abstract class CohortBehaviour<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep> {
174 private final Class<M> handledMessageType;
176 CohortBehaviour(Class<M> handledMessageType) {
177 this.handledMessageType = Objects.requireNonNull(handledMessageType);
180 void handle(CommitProtocolCommand<?> command) {
181 if (handledMessageType.isInstance(command)) {
183 } else if (command instanceof Abort) {
184 onAbort(((Abort)command).getTxId());
186 getSender().tell(new Status.Failure(new IllegalArgumentException(String.format(
187 "Unexpected message %s in cohort behavior %s", command.getClass(),
188 getClass().getSimpleName()))), getSelf());
192 private void onMessage(CommitProtocolCommand<?> message) {
193 final ActorRef sender = getSender();
194 TransactionIdentifier txId = message.getTxId();
195 ListenableFuture<S> future = process(handledMessageType.cast(message));
196 Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor()
197 : DataTreeCohortActor.this::executeInSelf;
198 Futures.addCallback(future, new FutureCallback<S>() {
200 public void onSuccess(S nextStep) {
201 success(txId, sender, nextStep);
205 public void onFailure(Throwable failure) {
206 failed(txId, sender, failure);
208 }, callbackExecutor);
211 private void failed(TransactionIdentifier txId, ActorRef sender, Throwable failure) {
212 currentStateMap.remove(txId);
213 sender.tell(new Status.Failure(failure), getSelf());
216 private void success(TransactionIdentifier txId, ActorRef sender, S nextStep) {
217 currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep));
218 sender.tell(new Success(getSelf(), txId), getSelf());
221 private void onAbort(TransactionIdentifier txId) {
222 currentStateMap.remove(txId);
223 final ActorRef sender = getSender();
224 Futures.addCallback(abort(), new FutureCallback<Object>() {
226 public void onSuccess(Object noop) {
227 sender.tell(new Success(getSelf(), txId), getSelf());
231 public void onFailure(Throwable failure) {
232 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
233 sender.tell(new Status.Failure(failure), getSelf());
235 }, MoreExecutors.directExecutor());
239 abstract CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, S nextStep);
242 abstract ListenableFuture<S> process(M command);
244 abstract ListenableFuture<?> abort();
247 public String toString() {
248 return getClass().getSimpleName();
252 private class Idle extends CohortBehaviour<CanCommit, PostCanCommitStep> {
254 super(CanCommit.class);
258 ListenableFuture<PostCanCommitStep> process(CanCommit message) {
259 return cohort.canCommit(message.getTxId(), message.getSchema(), message.getCandidates());
263 CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, PostCanCommitStep nextStep) {
264 return new PostCanCommit(txId, nextStep);
268 ListenableFuture<?> abort() {
269 return ThreePhaseCommitStep.NOOP_ABORT_FUTURE;
273 private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep,
274 N extends ThreePhaseCommitStep> extends CohortBehaviour<M, N> {
275 private final S step;
276 private final TransactionIdentifier txId;
278 CohortStateWithStep(Class<M> handledMessageType, TransactionIdentifier txId, S step) {
279 super(handledMessageType);
280 this.txId = Objects.requireNonNull(txId);
281 this.step = Objects.requireNonNull(step);
289 ListenableFuture<?> abort() {
290 return getStep().abort();
294 public String toString() {
295 return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]";
299 private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep, PostPreCommitStep> {
301 PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
302 super(PreCommit.class, txId, nextStep);
305 @SuppressWarnings("unchecked")
307 ListenableFuture<PostPreCommitStep> process(PreCommit message) {
308 return (ListenableFuture<PostPreCommitStep>) getStep().preCommit();
312 CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, PostPreCommitStep nextStep) {
313 return new PostPreCommit(txId, nextStep);
318 private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep, NoopThreePhaseCommitStep> {
320 PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
321 super(Commit.class, txId, step);
324 @SuppressWarnings("unchecked")
326 ListenableFuture<NoopThreePhaseCommitStep> process(Commit message) {
327 return (ListenableFuture<NoopThreePhaseCommitStep>) getStep().commit();
331 CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, NoopThreePhaseCommitStep nextStep) {
336 private interface NoopThreePhaseCommitStep extends ThreePhaseCommitStep {
339 static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
340 return Props.create(DataTreeCohortActor.class, cohort, registeredPath);