2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.Props;
12 import akka.actor.Status;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.Collection;
19 import java.util.HashMap;
21 import java.util.Objects;
22 import java.util.concurrent.Executor;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.eclipse.jdt.annotation.Nullable;
25 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
27 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
28 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
29 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
30 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
36 * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
37 * decapsulating DataTreeChanged messages and dispatching their context to the user.
39 final class DataTreeCohortActor extends AbstractUntypedActor {
40 private final Idle idleState = new Idle();
41 private final DOMDataTreeCommitCohort cohort;
42 private final YangInstanceIdentifier registeredPath;
43 private final Map<TransactionIdentifier, CohortBehaviour<?, ?>> currentStateMap = new HashMap<>();
45 private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
46 this.cohort = Objects.requireNonNull(cohort);
47 this.registeredPath = Objects.requireNonNull(registeredPath);
51 protected void handleReceive(final Object message) {
52 if (!(message instanceof CommitProtocolCommand)) {
53 unknownMessage(message);
57 CommitProtocolCommand<?> command = (CommitProtocolCommand<?>)message;
58 CohortBehaviour<?, ?> currentState = currentStateMap.computeIfAbsent(command.getTxId(), key -> idleState);
60 LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(),
61 currentState, message);
63 currentState.handle(command);
67 * Abstract message base for messages handled by {@link DataTreeCohortActor}.
69 * @param <R> Reply message type
71 abstract static class CommitProtocolCommand<R extends CommitReply> {
73 private final TransactionIdentifier txId;
75 final TransactionIdentifier getTxId() {
79 protected CommitProtocolCommand(final TransactionIdentifier txId) {
80 this.txId = Objects.requireNonNull(txId);
84 public String toString() {
85 return getClass().getSimpleName() + " [txId=" + txId + "]";
89 static final class CanCommit extends CommitProtocolCommand<Success> {
91 private final Collection<DOMDataTreeCandidate> candidates;
92 private final ActorRef cohort;
93 private final SchemaContext schema;
95 CanCommit(final TransactionIdentifier txId, final Collection<DOMDataTreeCandidate> candidates,
96 final SchemaContext schema, final ActorRef cohort) {
98 this.cohort = Objects.requireNonNull(cohort);
99 this.candidates = Objects.requireNonNull(candidates);
100 this.schema = Objects.requireNonNull(schema);
103 Collection<DOMDataTreeCandidate> getCandidates() {
107 SchemaContext getSchema() {
111 ActorRef getCohort() {
116 public String toString() {
117 return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort + "]";
121 abstract static class CommitReply {
123 private final ActorRef cohortRef;
124 private final TransactionIdentifier txId;
126 protected CommitReply(final ActorRef cohortRef, final TransactionIdentifier txId) {
127 this.cohortRef = Objects.requireNonNull(cohortRef);
128 this.txId = Objects.requireNonNull(txId);
131 ActorRef getCohort() {
135 final TransactionIdentifier getTxId() {
140 public String toString() {
141 return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]";
145 static final class Success extends CommitReply {
147 Success(final ActorRef cohortRef, final TransactionIdentifier txId) {
148 super(cohortRef, txId);
152 static final class PreCommit extends CommitProtocolCommand<Success> {
154 PreCommit(final TransactionIdentifier txId) {
159 static final class Abort extends CommitProtocolCommand<Success> {
161 Abort(final TransactionIdentifier txId) {
166 static final class Commit extends CommitProtocolCommand<Success> {
168 Commit(final TransactionIdentifier txId) {
173 private abstract class CohortBehaviour<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep> {
174 private final Class<M> handledMessageType;
176 CohortBehaviour(final Class<M> handledMessageType) {
177 this.handledMessageType = Objects.requireNonNull(handledMessageType);
180 void handle(final CommitProtocolCommand<?> command) {
181 if (handledMessageType.isInstance(command)) {
183 } else if (command instanceof Abort) {
184 onAbort(((Abort)command).getTxId());
186 getSender().tell(new Status.Failure(new IllegalArgumentException(String.format(
187 "Unexpected message %s in cohort behavior %s", command.getClass(),
188 getClass().getSimpleName()))), getSelf());
192 private void onMessage(final CommitProtocolCommand<?> message) {
193 final ActorRef sender = getSender();
194 TransactionIdentifier txId = message.getTxId();
195 ListenableFuture<S> future = process(handledMessageType.cast(message));
196 Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor()
197 : DataTreeCohortActor.this::executeInSelf;
198 Futures.addCallback(future, new FutureCallback<S>() {
200 public void onSuccess(final S nextStep) {
201 success(txId, sender, nextStep);
205 public void onFailure(final Throwable failure) {
206 failed(txId, sender, failure);
208 }, callbackExecutor);
211 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
212 justification = "https://github.com/spotbugs/spotbugs/issues/811")
213 private void failed(final TransactionIdentifier txId, final ActorRef sender, final Throwable failure) {
214 currentStateMap.remove(txId);
215 sender.tell(new Status.Failure(failure), getSelf());
218 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
219 justification = "https://github.com/spotbugs/spotbugs/issues/811")
220 private void success(final TransactionIdentifier txId, final ActorRef sender, final S nextStep) {
221 currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep));
222 sender.tell(new Success(getSelf(), txId), getSelf());
225 private void onAbort(final TransactionIdentifier txId) {
226 currentStateMap.remove(txId);
227 final ActorRef sender = getSender();
228 Futures.addCallback(abort(), new FutureCallback<Object>() {
230 public void onSuccess(final Object noop) {
231 sender.tell(new Success(getSelf(), txId), getSelf());
235 public void onFailure(final Throwable failure) {
236 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
237 sender.tell(new Status.Failure(failure), getSelf());
239 }, MoreExecutors.directExecutor());
242 abstract @Nullable CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, S nextStep);
244 abstract @NonNull ListenableFuture<S> process(M command);
246 abstract ListenableFuture<?> abort();
249 public String toString() {
250 return getClass().getSimpleName();
254 private class Idle extends CohortBehaviour<CanCommit, PostCanCommitStep> {
256 super(CanCommit.class);
260 ListenableFuture<PostCanCommitStep> process(final CanCommit message) {
261 return cohort.canCommit(message.getTxId(), message.getSchema(), message.getCandidates());
265 CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final PostCanCommitStep nextStep) {
266 return new PostCanCommit(txId, nextStep);
270 ListenableFuture<?> abort() {
271 return ThreePhaseCommitStep.NOOP_ABORT_FUTURE;
275 private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep,
276 N extends ThreePhaseCommitStep> extends CohortBehaviour<M, N> {
277 private final S step;
278 private final TransactionIdentifier txId;
280 CohortStateWithStep(final Class<M> handledMessageType, final TransactionIdentifier txId, final S step) {
281 super(handledMessageType);
282 this.txId = Objects.requireNonNull(txId);
283 this.step = Objects.requireNonNull(step);
291 ListenableFuture<?> abort() {
292 return getStep().abort();
296 public String toString() {
297 return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]";
301 private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep, PostPreCommitStep> {
303 PostCanCommit(final TransactionIdentifier txId, final PostCanCommitStep nextStep) {
304 super(PreCommit.class, txId, nextStep);
307 @SuppressWarnings("unchecked")
309 ListenableFuture<PostPreCommitStep> process(final PreCommit message) {
310 return (ListenableFuture<PostPreCommitStep>) getStep().preCommit();
314 CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final PostPreCommitStep nextStep) {
315 return new PostPreCommit(txId, nextStep);
320 private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep, NoopThreePhaseCommitStep> {
322 PostPreCommit(final TransactionIdentifier txId, final PostPreCommitStep step) {
323 super(Commit.class, txId, step);
326 @SuppressWarnings("unchecked")
328 ListenableFuture<NoopThreePhaseCommitStep> process(final Commit message) {
329 return (ListenableFuture<NoopThreePhaseCommitStep>) getStep().commit();
333 CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final NoopThreePhaseCommitStep nextStep) {
338 private interface NoopThreePhaseCommitStep extends ThreePhaseCommitStep {
341 static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
342 return Props.create(DataTreeCohortActor.class, cohort, registeredPath);