2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.Collection;
18 import java.util.HashMap;
20 import java.util.Objects;
21 import java.util.concurrent.Executor;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.eclipse.jdt.annotation.Nullable;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
26 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
27 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
28 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
29 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
35 * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
36 * decapsulating DataTreeChanged messages and dispatching their context to the user.
38 final class DataTreeCohortActor extends AbstractUntypedActor {
39 private final Idle idleState = new Idle();
40 private final DOMDataTreeCommitCohort cohort;
41 private final YangInstanceIdentifier registeredPath;
42 private final Map<TransactionIdentifier, CohortBehaviour<?, ?>> currentStateMap = new HashMap<>();
44 private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
45 this.cohort = Objects.requireNonNull(cohort);
46 this.registeredPath = Objects.requireNonNull(registeredPath);
50 protected void handleReceive(final Object message) {
51 if (!(message instanceof CommitProtocolCommand)) {
52 unknownMessage(message);
56 CommitProtocolCommand<?> command = (CommitProtocolCommand<?>)message;
57 CohortBehaviour<?, ?> currentState = currentStateMap.computeIfAbsent(command.getTxId(), key -> idleState);
59 LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(),
60 currentState, message);
62 currentState.handle(command);
66 * Abstract message base for messages handled by {@link DataTreeCohortActor}.
68 * @param <R> Reply message type
70 abstract static class CommitProtocolCommand<R extends CommitReply> {
72 private final TransactionIdentifier txId;
74 final TransactionIdentifier getTxId() {
78 protected CommitProtocolCommand(final TransactionIdentifier txId) {
79 this.txId = Objects.requireNonNull(txId);
83 public String toString() {
84 return getClass().getSimpleName() + " [txId=" + txId + "]";
88 static final class CanCommit extends CommitProtocolCommand<Success> {
90 private final Collection<DOMDataTreeCandidate> candidates;
91 private final ActorRef cohort;
92 private final EffectiveModelContext schema;
94 CanCommit(final TransactionIdentifier txId, final Collection<DOMDataTreeCandidate> candidates,
95 final EffectiveModelContext schema, final ActorRef cohort) {
97 this.cohort = Objects.requireNonNull(cohort);
98 this.candidates = Objects.requireNonNull(candidates);
99 this.schema = Objects.requireNonNull(schema);
102 Collection<DOMDataTreeCandidate> getCandidates() {
106 EffectiveModelContext getSchema() {
110 ActorRef getCohort() {
115 public String toString() {
116 return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort + "]";
120 abstract static class CommitReply {
122 private final ActorRef cohortRef;
123 private final TransactionIdentifier txId;
125 protected CommitReply(final ActorRef cohortRef, final TransactionIdentifier txId) {
126 this.cohortRef = Objects.requireNonNull(cohortRef);
127 this.txId = Objects.requireNonNull(txId);
130 ActorRef getCohort() {
134 final TransactionIdentifier getTxId() {
139 public String toString() {
140 return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]";
144 static final class Success extends CommitReply {
146 Success(final ActorRef cohortRef, final TransactionIdentifier txId) {
147 super(cohortRef, txId);
151 static final class PreCommit extends CommitProtocolCommand<Success> {
153 PreCommit(final TransactionIdentifier txId) {
158 static final class Abort extends CommitProtocolCommand<Success> {
160 Abort(final TransactionIdentifier txId) {
165 static final class Commit extends CommitProtocolCommand<Success> {
167 Commit(final TransactionIdentifier txId) {
172 private abstract class CohortBehaviour<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep> {
173 private final Class<M> handledMessageType;
175 CohortBehaviour(final Class<M> handledMessageType) {
176 this.handledMessageType = Objects.requireNonNull(handledMessageType);
179 void handle(final CommitProtocolCommand<?> command) {
180 if (handledMessageType.isInstance(command)) {
182 } else if (command instanceof Abort) {
183 onAbort(((Abort)command).getTxId());
185 getSender().tell(new Status.Failure(new IllegalArgumentException(String.format(
186 "Unexpected message %s in cohort behavior %s", command.getClass(),
187 getClass().getSimpleName()))), getSelf());
191 private void onMessage(final CommitProtocolCommand<?> message) {
192 final ActorRef sender = getSender();
193 TransactionIdentifier txId = message.getTxId();
194 ListenableFuture<S> future = process(handledMessageType.cast(message));
195 Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor()
196 : DataTreeCohortActor.this::executeInSelf;
197 Futures.addCallback(future, new FutureCallback<S>() {
199 public void onSuccess(final S nextStep) {
200 success(txId, sender, nextStep);
204 public void onFailure(final Throwable failure) {
205 failed(txId, sender, failure);
207 }, callbackExecutor);
210 private void failed(final TransactionIdentifier txId, final ActorRef sender, final Throwable failure) {
211 currentStateMap.remove(txId);
212 sender.tell(new Status.Failure(failure), getSelf());
215 private void success(final TransactionIdentifier txId, final ActorRef sender, final S nextStep) {
216 currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep));
217 sender.tell(new Success(getSelf(), txId), getSelf());
220 private void onAbort(final TransactionIdentifier txId) {
221 currentStateMap.remove(txId);
222 final ActorRef sender = getSender();
223 Futures.addCallback(abort(), new FutureCallback<Object>() {
225 public void onSuccess(final Object noop) {
226 sender.tell(new Success(getSelf(), txId), getSelf());
230 public void onFailure(final Throwable failure) {
231 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
232 sender.tell(new Status.Failure(failure), getSelf());
234 }, MoreExecutors.directExecutor());
237 abstract @Nullable CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, S nextStep);
239 abstract @NonNull ListenableFuture<S> process(M command);
241 abstract ListenableFuture<?> abort();
244 public String toString() {
245 return getClass().getSimpleName();
249 private class Idle extends CohortBehaviour<CanCommit, PostCanCommitStep> {
251 super(CanCommit.class);
255 ListenableFuture<PostCanCommitStep> process(final CanCommit message) {
256 return cohort.canCommit(message.getTxId(), message.getSchema(), message.getCandidates());
260 CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final PostCanCommitStep nextStep) {
261 return new PostCanCommit(txId, nextStep);
265 ListenableFuture<?> abort() {
266 return ThreePhaseCommitStep.NOOP_ABORT_FUTURE;
270 private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep,
271 N extends ThreePhaseCommitStep> extends CohortBehaviour<M, N> {
272 private final S step;
273 private final TransactionIdentifier txId;
275 CohortStateWithStep(final Class<M> handledMessageType, final TransactionIdentifier txId, final S step) {
276 super(handledMessageType);
277 this.txId = Objects.requireNonNull(txId);
278 this.step = Objects.requireNonNull(step);
286 ListenableFuture<?> abort() {
287 return getStep().abort();
291 public String toString() {
292 return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]";
296 private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep, PostPreCommitStep> {
298 PostCanCommit(final TransactionIdentifier txId, final PostCanCommitStep nextStep) {
299 super(PreCommit.class, txId, nextStep);
302 @SuppressWarnings("unchecked")
304 ListenableFuture<PostPreCommitStep> process(final PreCommit message) {
305 return (ListenableFuture<PostPreCommitStep>) getStep().preCommit();
309 CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final PostPreCommitStep nextStep) {
310 return new PostPreCommit(txId, nextStep);
315 private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep, NoopThreePhaseCommitStep> {
317 PostPreCommit(final TransactionIdentifier txId, final PostPreCommitStep step) {
318 super(Commit.class, txId, step);
321 @SuppressWarnings("unchecked")
323 ListenableFuture<NoopThreePhaseCommitStep> process(final Commit message) {
324 return (ListenableFuture<NoopThreePhaseCommitStep>) getStep().commit();
328 CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final NoopThreePhaseCommitStep nextStep) {
333 private interface NoopThreePhaseCommitStep extends ThreePhaseCommitStep {
336 static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
337 return Props.create(DataTreeCohortActor.class, cohort, registeredPath);