da1117764f8b8146ef7f8f49e22bcc7f2ef64d8b
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.base.Preconditions;
15 import java.util.Collection;
16 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
17 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
18 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
19 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
20 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
23 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
24 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
25
26 /**
27  * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
28  * decapsulating DataTreeChanged messages and dispatching their context to the user.
29  */
30 final class DataTreeCohortActor extends AbstractUntypedActor {
31     private final CohortBehaviour<?> idleState = new Idle();
32     private final DOMDataTreeCommitCohort cohort;
33     private final YangInstanceIdentifier registeredPath;
34     private CohortBehaviour<?> currentState = idleState;
35
36     private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
37         this.cohort = Preconditions.checkNotNull(cohort);
38         this.registeredPath = Preconditions.checkNotNull(registeredPath);
39     }
40
41     @Override
42     protected void handleReceive(final Object message) {
43         LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(),
44                 currentState, message);
45
46         currentState = currentState.handle(message);
47     }
48
49
50     /**
51      * Abstract message base for messages handled by {@link DataTreeCohortActor}.
52      *
53      * @param <R> Reply message type
54      */
55     abstract static class CommitProtocolCommand<R extends CommitReply> {
56
57         private final TransactionIdentifier txId;
58
59         final TransactionIdentifier getTxId() {
60             return txId;
61         }
62
63         protected CommitProtocolCommand(TransactionIdentifier txId) {
64             this.txId = Preconditions.checkNotNull(txId);
65         }
66
67         @Override
68         public String toString() {
69             return getClass().getSimpleName() + " [txId=" + txId + "]";
70         }
71     }
72
73     static final class CanCommit extends CommitProtocolCommand<Success> {
74
75         private final Collection<DOMDataTreeCandidate> candidates;
76         private final ActorRef cohort;
77         private final SchemaContext schema;
78
79         CanCommit(TransactionIdentifier txId, Collection<DOMDataTreeCandidate> candidates, SchemaContext schema,
80                 ActorRef cohort) {
81             super(txId);
82             this.cohort = Preconditions.checkNotNull(cohort);
83             this.candidates = Preconditions.checkNotNull(candidates);
84             this.schema = Preconditions.checkNotNull(schema);
85         }
86
87         Collection<DOMDataTreeCandidate> getCandidates() {
88             return candidates;
89         }
90
91         SchemaContext getSchema() {
92             return schema;
93         }
94
95         ActorRef getCohort() {
96             return cohort;
97         }
98
99         @Override
100         public String toString() {
101             return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort  + "]";
102         }
103     }
104
105     abstract static class CommitReply {
106
107         private final ActorRef cohortRef;
108         private final TransactionIdentifier txId;
109
110         protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
111             this.cohortRef = Preconditions.checkNotNull(cohortRef);
112             this.txId = Preconditions.checkNotNull(txId);
113         }
114
115         ActorRef getCohort() {
116             return cohortRef;
117         }
118
119         final TransactionIdentifier getTxId() {
120             return txId;
121         }
122
123         @Override
124         public String toString() {
125             return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]";
126         }
127     }
128
129     static final class Success extends CommitReply {
130
131         Success(ActorRef cohortRef, TransactionIdentifier txId) {
132             super(cohortRef, txId);
133         }
134     }
135
136     static final class PreCommit extends CommitProtocolCommand<Success> {
137
138         PreCommit(TransactionIdentifier txId) {
139             super(txId);
140         }
141     }
142
143     static final class Abort extends CommitProtocolCommand<Success> {
144
145         Abort(TransactionIdentifier txId) {
146             super(txId);
147         }
148     }
149
150     static final class Commit extends CommitProtocolCommand<Success> {
151
152         Commit(TransactionIdentifier txId) {
153             super(txId);
154         }
155     }
156
157     private abstract static class CohortBehaviour<E> {
158
159         abstract Class<E> getHandledMessageType();
160
161         CohortBehaviour<?> handle(Object message) {
162             if (getHandledMessageType().isInstance(message)) {
163                 return process(getHandledMessageType().cast(message));
164             } else if (message instanceof Abort) {
165                 return abort();
166             }
167             throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
168                     message.getClass(), getClass().getSimpleName()));
169         }
170
171         abstract CohortBehaviour<?> abort();
172
173         abstract CohortBehaviour<?> process(E message);
174
175         @Override
176         public String toString() {
177             return getClass().getSimpleName();
178         }
179     }
180
181     private class Idle extends CohortBehaviour<CanCommit> {
182
183         @Override
184         Class<CanCommit> getHandledMessageType() {
185             return CanCommit.class;
186         }
187
188         @Override
189         @SuppressWarnings("checkstyle:IllegalCatch")
190         CohortBehaviour<?> process(CanCommit message) {
191             final PostCanCommitStep nextStep;
192             try {
193                 nextStep = cohort.canCommit(message.getTxId(), message.getCandidates(), message.getSchema()).get();
194             } catch (final Exception e) {
195                 getSender().tell(new Status.Failure(e), getSelf());
196                 return this;
197             }
198             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
199             return new PostCanCommit(message.getTxId(), nextStep);
200         }
201
202         @Override
203         CohortBehaviour<?> abort() {
204             return this;
205         }
206     }
207
208
209     private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
210             extends CohortBehaviour<M> {
211
212         private final S step;
213         private final TransactionIdentifier txId;
214
215         CohortStateWithStep(TransactionIdentifier txId, S step) {
216             this.txId = Preconditions.checkNotNull(txId);
217             this.step = Preconditions.checkNotNull(step);
218         }
219
220         final S getStep() {
221             return step;
222         }
223
224         final TransactionIdentifier getTxId() {
225             return txId;
226         }
227
228         @Override
229         @SuppressWarnings("checkstyle:IllegalCatch")
230         final CohortBehaviour<?> abort() {
231             try {
232                 getStep().abort().get();
233             } catch (final Exception e) {
234                 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
235                 getSender().tell(new Status.Failure(e), getSelf());
236                 return idleState;
237             }
238             getSender().tell(new Success(getSelf(), txId), getSelf());
239             return idleState;
240         }
241
242         @Override
243         public String toString() {
244             return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]";
245         }
246     }
247
248     private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
249
250         PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
251             super(txId, nextStep);
252         }
253
254         @Override
255         Class<PreCommit> getHandledMessageType() {
256             return PreCommit.class;
257         }
258
259         @Override
260         @SuppressWarnings("checkstyle:IllegalCatch")
261         CohortBehaviour<?> process(PreCommit message) {
262             final PostPreCommitStep nextStep;
263             try {
264                 nextStep = getStep().preCommit().get();
265             } catch (final Exception e) {
266                 getSender().tell(new Status.Failure(e), getSelf());
267                 return idleState;
268             }
269             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
270             return new PostPreCommit(getTxId(), nextStep);
271         }
272
273     }
274
275     private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
276
277         PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
278             super(txId, step);
279         }
280
281         @Override
282         @SuppressWarnings("checkstyle:IllegalCatch")
283         CohortBehaviour<?> process(Commit message) {
284             try {
285                 getStep().commit().get();
286             } catch (final Exception e) {
287                 getSender().tell(new Status.Failure(e), getSender());
288                 return idleState;
289             }
290             getSender().tell(new Success(getSelf(), getTxId()), getSelf());
291             return idleState;
292         }
293
294         @Override
295         Class<Commit> getHandledMessageType() {
296             return Commit.class;
297         }
298
299     }
300
301     static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
302         return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
303     }
304 }