2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
16 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
17 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
18 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
19 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
21 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
26 * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
27 * decapsulating DataTreeChanged messages and dispatching their context to the user.
29 final class DataTreeCohortActor extends AbstractUntypedActor {
30 private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class);
31 private final CohortBehaviour<?> idleState = new Idle();
32 private final DOMDataTreeCommitCohort cohort;
33 private CohortBehaviour<?> currentState = idleState;
35 private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
36 this.cohort = Preconditions.checkNotNull(cohort);
40 protected void handleReceive(final Object message) {
41 currentState = currentState.handle(message);
46 * Abstract message base for messages handled by {@link DataTreeCohortActor}.
48 * @param <R> Reply message type
50 static abstract class CommitProtocolCommand<R extends CommitReply> {
52 private final String txId;
54 final String getTxId() {
58 protected CommitProtocolCommand(String txId) {
59 this.txId = Preconditions.checkNotNull(txId);
63 static final class CanCommit extends CommitProtocolCommand<Success> {
65 private final DOMDataTreeCandidate candidate;
66 private final ActorRef cohort;
67 private final SchemaContext schema;
69 CanCommit(String txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
71 this.cohort = Preconditions.checkNotNull(cohort);
72 this.candidate = Preconditions.checkNotNull(candidate);
73 this.schema = Preconditions.checkNotNull(schema);
76 DOMDataTreeCandidate getCandidate() {
80 SchemaContext getSchema() {
84 ActorRef getCohort() {
90 static abstract class CommitReply {
92 private final ActorRef cohortRef;
93 private final String txId;
95 protected CommitReply(ActorRef cohortRef, String txId) {
96 this.cohortRef = Preconditions.checkNotNull(cohortRef);
97 this.txId = Preconditions.checkNotNull(txId);
100 ActorRef getCohort() {
104 final String getTxId() {
110 static final class Success extends CommitReply {
112 public Success(ActorRef cohortRef, String txId) {
113 super(cohortRef, txId);
118 static final class PreCommit extends CommitProtocolCommand<Success> {
120 public PreCommit(String txId) {
125 static final class Abort extends CommitProtocolCommand<Success> {
127 public Abort(String txId) {
132 static final class Commit extends CommitProtocolCommand<Success> {
134 public Commit(String txId) {
139 private static abstract class CohortBehaviour<E> {
141 abstract Class<E> getHandledMessageType();
143 CohortBehaviour<?> handle(Object message) {
144 if (getHandledMessageType().isInstance(message)) {
145 return process(getHandledMessageType().cast(message));
146 } else if (message instanceof Abort) {
149 throw new UnsupportedOperationException();
152 abstract CohortBehaviour<?> abort();
154 abstract CohortBehaviour<?> process(E message);
158 private class Idle extends CohortBehaviour<CanCommit> {
161 Class<CanCommit> getHandledMessageType() {
162 return CanCommit.class;
166 CohortBehaviour<?> process(CanCommit message) {
167 final PostCanCommitStep nextStep;
169 nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
170 } catch (final Exception e) {
171 getSender().tell(new Status.Failure(e), getSelf());
174 getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
175 return new PostCanCommit(message.getTxId(), nextStep);
179 CohortBehaviour<?> abort() {
186 private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
187 extends CohortBehaviour<M> {
189 private final S step;
190 private final String txId;
192 CohortStateWithStep(String txId, S step) {
193 this.txId = Preconditions.checkNotNull(txId);
194 this.step = Preconditions.checkNotNull(step);
201 final String getTxId() {
206 final CohortBehaviour<?> abort() {
208 getStep().abort().get();
209 } catch (final Exception e) {
210 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
211 getSender().tell(new Status.Failure(e), getSelf());
214 getSender().tell(new Success(getSelf(), txId), getSelf());
220 private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
222 PostCanCommit(String txId, PostCanCommitStep nextStep) {
223 super(txId, nextStep);
227 Class<PreCommit> getHandledMessageType() {
228 return PreCommit.class;
232 CohortBehaviour<?> process(PreCommit message) {
233 final PostPreCommitStep nextStep;
235 nextStep = getStep().preCommit().get();
236 } catch (final Exception e) {
237 getSender().tell(new Status.Failure(e), getSelf());
240 getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
241 return new PostPreCommit(getTxId(), nextStep);
246 private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
248 PostPreCommit(String txId, PostPreCommitStep step) {
253 CohortBehaviour<?> process(Commit message) {
255 getStep().commit().get();
256 } catch (final Exception e) {
257 getSender().tell(new Status.Failure(e), getSender());
260 getSender().tell(new Success(getSelf(), getTxId()), getSelf());
265 Class<Commit> getHandledMessageType() {
271 static Props props(final DOMDataTreeCommitCohort cohort) {
272 return Props.create(DataTreeCohortActor.class, cohort);