2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
16 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
17 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
18 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
19 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
23 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
26 * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
27 * decapsulating DataTreeChanged messages and dispatching their context to the user.
29 final class DataTreeCohortActor extends AbstractUntypedActor {
30 private final CohortBehaviour<?> idleState = new Idle();
31 private final DOMDataTreeCommitCohort cohort;
32 private final YangInstanceIdentifier registeredPath;
33 private CohortBehaviour<?> currentState = idleState;
35 private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
36 this.cohort = Preconditions.checkNotNull(cohort);
37 this.registeredPath = Preconditions.checkNotNull(registeredPath);
41 protected void handleReceive(final Object message) {
42 currentState = currentState.handle(message);
47 * Abstract message base for messages handled by {@link DataTreeCohortActor}.
49 * @param <R> Reply message type
51 abstract static class CommitProtocolCommand<R extends CommitReply> {
53 private final TransactionIdentifier txId;
55 final TransactionIdentifier getTxId() {
59 protected CommitProtocolCommand(TransactionIdentifier txId) {
60 this.txId = Preconditions.checkNotNull(txId);
64 static final class CanCommit extends CommitProtocolCommand<Success> {
66 private final DOMDataTreeCandidate candidate;
67 private final ActorRef cohort;
68 private final SchemaContext schema;
70 CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
72 this.cohort = Preconditions.checkNotNull(cohort);
73 this.candidate = Preconditions.checkNotNull(candidate);
74 this.schema = Preconditions.checkNotNull(schema);
77 DOMDataTreeCandidate getCandidate() {
81 SchemaContext getSchema() {
85 ActorRef getCohort() {
91 abstract static class CommitReply {
93 private final ActorRef cohortRef;
94 private final TransactionIdentifier txId;
96 protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
97 this.cohortRef = Preconditions.checkNotNull(cohortRef);
98 this.txId = Preconditions.checkNotNull(txId);
101 ActorRef getCohort() {
105 final TransactionIdentifier getTxId() {
110 static final class Success extends CommitReply {
112 Success(ActorRef cohortRef, TransactionIdentifier txId) {
113 super(cohortRef, txId);
118 static final class PreCommit extends CommitProtocolCommand<Success> {
120 PreCommit(TransactionIdentifier txId) {
125 static final class Abort extends CommitProtocolCommand<Success> {
127 Abort(TransactionIdentifier txId) {
132 static final class Commit extends CommitProtocolCommand<Success> {
134 Commit(TransactionIdentifier txId) {
139 private abstract static class CohortBehaviour<E> {
141 abstract Class<E> getHandledMessageType();
143 CohortBehaviour<?> handle(Object message) {
144 if (getHandledMessageType().isInstance(message)) {
145 return process(getHandledMessageType().cast(message));
146 } else if (message instanceof Abort) {
149 throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
150 message.getClass(), getClass().getSimpleName()));
153 abstract CohortBehaviour<?> abort();
155 abstract CohortBehaviour<?> process(E message);
159 private class Idle extends CohortBehaviour<CanCommit> {
162 Class<CanCommit> getHandledMessageType() {
163 return CanCommit.class;
167 @SuppressWarnings("checkstyle:IllegalCatch")
168 CohortBehaviour<?> process(CanCommit message) {
169 final PostCanCommitStep nextStep;
171 nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
172 } catch (final Exception e) {
173 getSender().tell(new Status.Failure(e), getSelf());
176 getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
177 return new PostCanCommit(message.getTxId(), nextStep);
181 CohortBehaviour<?> abort() {
188 private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
189 extends CohortBehaviour<M> {
191 private final S step;
192 private final TransactionIdentifier txId;
194 CohortStateWithStep(TransactionIdentifier txId, S step) {
195 this.txId = Preconditions.checkNotNull(txId);
196 this.step = Preconditions.checkNotNull(step);
203 final TransactionIdentifier getTxId() {
208 @SuppressWarnings("checkstyle:IllegalCatch")
209 final CohortBehaviour<?> abort() {
211 getStep().abort().get();
212 } catch (final Exception e) {
213 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
214 getSender().tell(new Status.Failure(e), getSelf());
217 getSender().tell(new Success(getSelf(), txId), getSelf());
223 private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
225 PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
226 super(txId, nextStep);
230 Class<PreCommit> getHandledMessageType() {
231 return PreCommit.class;
235 @SuppressWarnings("checkstyle:IllegalCatch")
236 CohortBehaviour<?> process(PreCommit message) {
237 final PostPreCommitStep nextStep;
239 nextStep = getStep().preCommit().get();
240 } catch (final Exception e) {
241 getSender().tell(new Status.Failure(e), getSelf());
244 getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
245 return new PostPreCommit(getTxId(), nextStep);
250 private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
252 PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
257 @SuppressWarnings("checkstyle:IllegalCatch")
258 CohortBehaviour<?> process(Commit message) {
260 getStep().commit().get();
261 } catch (final Exception e) {
262 getSender().tell(new Status.Failure(e), getSender());
265 getSender().tell(new Success(getSelf(), getTxId()), getSelf());
270 Class<Commit> getHandledMessageType() {
276 static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
277 return Props.create(DataTreeCohortActor.class, cohort, registeredPath);