2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.base.Preconditions;
15 import java.util.Collection;
16 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
17 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
18 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
19 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
20 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
23 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
24 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
27 * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
28 * decapsulating DataTreeChanged messages and dispatching their context to the user.
30 final class DataTreeCohortActor extends AbstractUntypedActor {
31 private final CohortBehaviour<?> idleState = new Idle();
32 private final DOMDataTreeCommitCohort cohort;
33 private final YangInstanceIdentifier registeredPath;
34 private CohortBehaviour<?> currentState = idleState;
36 private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
37 this.cohort = Preconditions.checkNotNull(cohort);
38 this.registeredPath = Preconditions.checkNotNull(registeredPath);
42 protected void handleReceive(final Object message) {
43 LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(),
44 currentState, message);
46 currentState = currentState.handle(message);
51 * Abstract message base for messages handled by {@link DataTreeCohortActor}.
53 * @param <R> Reply message type
55 abstract static class CommitProtocolCommand<R extends CommitReply> {
57 private final TransactionIdentifier txId;
59 final TransactionIdentifier getTxId() {
63 protected CommitProtocolCommand(TransactionIdentifier txId) {
64 this.txId = Preconditions.checkNotNull(txId);
68 public String toString() {
69 return getClass().getSimpleName() + " [txId=" + txId + "]";
73 static final class CanCommit extends CommitProtocolCommand<Success> {
75 private final Collection<DOMDataTreeCandidate> candidates;
76 private final ActorRef cohort;
77 private final SchemaContext schema;
79 CanCommit(TransactionIdentifier txId, Collection<DOMDataTreeCandidate> candidates, SchemaContext schema,
82 this.cohort = Preconditions.checkNotNull(cohort);
83 this.candidates = Preconditions.checkNotNull(candidates);
84 this.schema = Preconditions.checkNotNull(schema);
87 Collection<DOMDataTreeCandidate> getCandidates() {
91 SchemaContext getSchema() {
95 ActorRef getCohort() {
100 public String toString() {
101 return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort + "]";
105 abstract static class CommitReply {
107 private final ActorRef cohortRef;
108 private final TransactionIdentifier txId;
110 protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
111 this.cohortRef = Preconditions.checkNotNull(cohortRef);
112 this.txId = Preconditions.checkNotNull(txId);
115 ActorRef getCohort() {
119 final TransactionIdentifier getTxId() {
124 public String toString() {
125 return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]";
129 static final class Success extends CommitReply {
131 Success(ActorRef cohortRef, TransactionIdentifier txId) {
132 super(cohortRef, txId);
136 static final class PreCommit extends CommitProtocolCommand<Success> {
138 PreCommit(TransactionIdentifier txId) {
143 static final class Abort extends CommitProtocolCommand<Success> {
145 Abort(TransactionIdentifier txId) {
150 static final class Commit extends CommitProtocolCommand<Success> {
152 Commit(TransactionIdentifier txId) {
157 private abstract static class CohortBehaviour<E> {
159 abstract Class<E> getHandledMessageType();
161 CohortBehaviour<?> handle(Object message) {
162 if (getHandledMessageType().isInstance(message)) {
163 return process(getHandledMessageType().cast(message));
164 } else if (message instanceof Abort) {
167 throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
168 message.getClass(), getClass().getSimpleName()));
171 abstract CohortBehaviour<?> abort();
173 abstract CohortBehaviour<?> process(E message);
176 public String toString() {
177 return getClass().getSimpleName();
181 private class Idle extends CohortBehaviour<CanCommit> {
184 Class<CanCommit> getHandledMessageType() {
185 return CanCommit.class;
189 @SuppressWarnings("checkstyle:IllegalCatch")
190 CohortBehaviour<?> process(CanCommit message) {
191 final PostCanCommitStep nextStep;
193 nextStep = cohort.canCommit(message.getTxId(), message.getCandidates(), message.getSchema()).get();
194 } catch (final Exception e) {
195 getSender().tell(new Status.Failure(e), getSelf());
198 getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
199 return new PostCanCommit(message.getTxId(), nextStep);
203 CohortBehaviour<?> abort() {
209 private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
210 extends CohortBehaviour<M> {
212 private final S step;
213 private final TransactionIdentifier txId;
215 CohortStateWithStep(TransactionIdentifier txId, S step) {
216 this.txId = Preconditions.checkNotNull(txId);
217 this.step = Preconditions.checkNotNull(step);
224 final TransactionIdentifier getTxId() {
229 @SuppressWarnings("checkstyle:IllegalCatch")
230 final CohortBehaviour<?> abort() {
232 getStep().abort().get();
233 } catch (final Exception e) {
234 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
235 getSender().tell(new Status.Failure(e), getSelf());
238 getSender().tell(new Success(getSelf(), txId), getSelf());
243 public String toString() {
244 return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]";
248 private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
250 PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
251 super(txId, nextStep);
255 Class<PreCommit> getHandledMessageType() {
256 return PreCommit.class;
260 @SuppressWarnings("checkstyle:IllegalCatch")
261 CohortBehaviour<?> process(PreCommit message) {
262 final PostPreCommitStep nextStep;
264 nextStep = getStep().preCommit().get();
265 } catch (final Exception e) {
266 getSender().tell(new Status.Failure(e), getSelf());
269 getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
270 return new PostPreCommit(getTxId(), nextStep);
275 private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
277 PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
282 @SuppressWarnings("checkstyle:IllegalCatch")
283 CohortBehaviour<?> process(Commit message) {
285 getStep().commit().get();
286 } catch (final Exception e) {
287 getSender().tell(new Status.Failure(e), getSender());
290 getSender().tell(new Success(getSelf(), getTxId()), getSelf());
295 Class<Commit> getHandledMessageType() {
301 static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
302 return Props.create(DataTreeCohortActor.class, cohort, registeredPath);