2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
16 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
17 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
18 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
19 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
23 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
28 * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
29 * decapsulating DataTreeChanged messages and dispatching their context to the user.
31 final class DataTreeCohortActor extends AbstractUntypedActor {
32 private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class);
33 private final CohortBehaviour<?> idleState = new Idle();
34 private final DOMDataTreeCommitCohort cohort;
35 private final YangInstanceIdentifier registeredPath;
36 private CohortBehaviour<?> currentState = idleState;
38 private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
39 this.cohort = Preconditions.checkNotNull(cohort);
40 this.registeredPath = Preconditions.checkNotNull(registeredPath);
44 protected void handleReceive(final Object message) {
45 currentState = currentState.handle(message);
50 * Abstract message base for messages handled by {@link DataTreeCohortActor}.
52 * @param <R> Reply message type
54 static abstract class CommitProtocolCommand<R extends CommitReply> {
56 private final TransactionIdentifier txId;
58 final TransactionIdentifier getTxId() {
62 protected CommitProtocolCommand(TransactionIdentifier txId) {
63 this.txId = Preconditions.checkNotNull(txId);
67 static final class CanCommit extends CommitProtocolCommand<Success> {
69 private final DOMDataTreeCandidate candidate;
70 private final ActorRef cohort;
71 private final SchemaContext schema;
73 CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
75 this.cohort = Preconditions.checkNotNull(cohort);
76 this.candidate = Preconditions.checkNotNull(candidate);
77 this.schema = Preconditions.checkNotNull(schema);
80 DOMDataTreeCandidate getCandidate() {
84 SchemaContext getSchema() {
88 ActorRef getCohort() {
94 static abstract class CommitReply {
96 private final ActorRef cohortRef;
97 private final TransactionIdentifier txId;
99 protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
100 this.cohortRef = Preconditions.checkNotNull(cohortRef);
101 this.txId = Preconditions.checkNotNull(txId);
104 ActorRef getCohort() {
108 final TransactionIdentifier getTxId() {
113 static final class Success extends CommitReply {
115 public Success(ActorRef cohortRef, TransactionIdentifier txId) {
116 super(cohortRef, txId);
121 static final class PreCommit extends CommitProtocolCommand<Success> {
123 public PreCommit(TransactionIdentifier txId) {
128 static final class Abort extends CommitProtocolCommand<Success> {
130 public Abort(TransactionIdentifier txId) {
135 static final class Commit extends CommitProtocolCommand<Success> {
137 public Commit(TransactionIdentifier txId) {
142 private static abstract class CohortBehaviour<E> {
144 abstract Class<E> getHandledMessageType();
146 CohortBehaviour<?> handle(Object message) {
147 if (getHandledMessageType().isInstance(message)) {
148 return process(getHandledMessageType().cast(message));
149 } else if (message instanceof Abort) {
152 throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
153 message.getClass(), getClass().getSimpleName()));
156 abstract CohortBehaviour<?> abort();
158 abstract CohortBehaviour<?> process(E message);
162 private class Idle extends CohortBehaviour<CanCommit> {
165 Class<CanCommit> getHandledMessageType() {
166 return CanCommit.class;
170 CohortBehaviour<?> process(CanCommit message) {
171 final PostCanCommitStep nextStep;
173 nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
174 } catch (final Exception e) {
175 getSender().tell(new Status.Failure(e), getSelf());
178 getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
179 return new PostCanCommit(message.getTxId(), nextStep);
183 CohortBehaviour<?> abort() {
190 private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
191 extends CohortBehaviour<M> {
193 private final S step;
194 private final TransactionIdentifier txId;
196 CohortStateWithStep(TransactionIdentifier txId, S step) {
197 this.txId = Preconditions.checkNotNull(txId);
198 this.step = Preconditions.checkNotNull(step);
205 final TransactionIdentifier getTxId() {
210 final CohortBehaviour<?> abort() {
212 getStep().abort().get();
213 } catch (final Exception e) {
214 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
215 getSender().tell(new Status.Failure(e), getSelf());
218 getSender().tell(new Success(getSelf(), txId), getSelf());
224 private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
226 PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
227 super(txId, nextStep);
231 Class<PreCommit> getHandledMessageType() {
232 return PreCommit.class;
236 CohortBehaviour<?> process(PreCommit message) {
237 final PostPreCommitStep nextStep;
239 nextStep = getStep().preCommit().get();
240 } catch (final Exception e) {
241 getSender().tell(new Status.Failure(e), getSelf());
244 getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
245 return new PostPreCommit(getTxId(), nextStep);
250 private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
252 PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
257 CohortBehaviour<?> process(Commit message) {
259 getStep().commit().get();
260 } catch (final Exception e) {
261 getSender().tell(new Status.Failure(e), getSender());
264 getSender().tell(new Success(getSelf(), getTxId()), getSelf());
269 Class<Commit> getHandledMessageType() {
275 static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
276 return Props.create(DataTreeCohortActor.class, cohort, registeredPath);