2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
16 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
17 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
18 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
19 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
22 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
25 * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
26 * decapsulating DataTreeChanged messages and dispatching their context to the user.
28 final class DataTreeCohortActor extends AbstractUntypedActor {
29 private final CohortBehaviour<?> idleState = new Idle();
30 private final DOMDataTreeCommitCohort cohort;
31 private CohortBehaviour<?> currentState = idleState;
33 private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
34 this.cohort = Preconditions.checkNotNull(cohort);
38 protected void handleReceive(final Object message) {
39 currentState = currentState.handle(message);
44 * Abstract message base for messages handled by {@link DataTreeCohortActor}.
46 * @param <R> Reply message type
48 abstract static class CommitProtocolCommand<R extends CommitReply> {
50 private final TransactionIdentifier txId;
52 final TransactionIdentifier getTxId() {
56 protected CommitProtocolCommand(TransactionIdentifier txId) {
57 this.txId = Preconditions.checkNotNull(txId);
61 static final class CanCommit extends CommitProtocolCommand<Success> {
63 private final DOMDataTreeCandidate candidate;
64 private final ActorRef cohort;
65 private final SchemaContext schema;
67 CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
69 this.cohort = Preconditions.checkNotNull(cohort);
70 this.candidate = Preconditions.checkNotNull(candidate);
71 this.schema = Preconditions.checkNotNull(schema);
74 DOMDataTreeCandidate getCandidate() {
78 SchemaContext getSchema() {
82 ActorRef getCohort() {
88 abstract static class CommitReply {
90 private final ActorRef cohortRef;
91 private final TransactionIdentifier txId;
93 protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
94 this.cohortRef = Preconditions.checkNotNull(cohortRef);
95 this.txId = Preconditions.checkNotNull(txId);
98 ActorRef getCohort() {
102 final TransactionIdentifier getTxId() {
107 static final class Success extends CommitReply {
109 Success(ActorRef cohortRef, TransactionIdentifier txId) {
110 super(cohortRef, txId);
115 static final class PreCommit extends CommitProtocolCommand<Success> {
117 PreCommit(TransactionIdentifier txId) {
122 static final class Abort extends CommitProtocolCommand<Success> {
124 Abort(TransactionIdentifier txId) {
129 static final class Commit extends CommitProtocolCommand<Success> {
131 Commit(TransactionIdentifier txId) {
136 private abstract static class CohortBehaviour<E> {
138 abstract Class<E> getHandledMessageType();
140 CohortBehaviour<?> handle(Object message) {
141 if (getHandledMessageType().isInstance(message)) {
142 return process(getHandledMessageType().cast(message));
143 } else if (message instanceof Abort) {
146 throw new UnsupportedOperationException();
149 abstract CohortBehaviour<?> abort();
151 abstract CohortBehaviour<?> process(E message);
155 private class Idle extends CohortBehaviour<CanCommit> {
158 Class<CanCommit> getHandledMessageType() {
159 return CanCommit.class;
163 @SuppressWarnings("checkstyle:IllegalCatch")
164 CohortBehaviour<?> process(CanCommit message) {
165 final PostCanCommitStep nextStep;
167 nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
168 } catch (final Exception e) {
169 getSender().tell(new Status.Failure(e), getSelf());
172 getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
173 return new PostCanCommit(message.getTxId(), nextStep);
177 CohortBehaviour<?> abort() {
184 private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
185 extends CohortBehaviour<M> {
187 private final S step;
188 private final TransactionIdentifier txId;
190 CohortStateWithStep(TransactionIdentifier txId, S step) {
191 this.txId = Preconditions.checkNotNull(txId);
192 this.step = Preconditions.checkNotNull(step);
199 final TransactionIdentifier getTxId() {
204 @SuppressWarnings("checkstyle:IllegalCatch")
205 final CohortBehaviour<?> abort() {
207 getStep().abort().get();
208 } catch (final Exception e) {
209 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
210 getSender().tell(new Status.Failure(e), getSelf());
213 getSender().tell(new Success(getSelf(), txId), getSelf());
219 private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
221 PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
222 super(txId, nextStep);
226 Class<PreCommit> getHandledMessageType() {
227 return PreCommit.class;
231 @SuppressWarnings("checkstyle:IllegalCatch")
232 CohortBehaviour<?> process(PreCommit message) {
233 final PostPreCommitStep nextStep;
235 nextStep = getStep().preCommit().get();
236 } catch (final Exception e) {
237 getSender().tell(new Status.Failure(e), getSelf());
240 getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
241 return new PostPreCommit(getTxId(), nextStep);
246 private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
248 PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
253 @SuppressWarnings("checkstyle:IllegalCatch")
254 CohortBehaviour<?> process(Commit message) {
256 getStep().commit().get();
257 } catch (final Exception e) {
258 getSender().tell(new Status.Failure(e), getSender());
261 getSender().tell(new Success(getSelf(), getTxId()), getSelf());
266 Class<Commit> getHandledMessageType() {
272 static Props props(final DOMDataTreeCommitCohort cohort) {
273 return Props.create(DataTreeCohortActor.class, cohort);