Fix warnings/javadocs in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
16 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
17 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
18 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
19 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
22 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
23
24 /**
25  * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
26  * decapsulating DataTreeChanged messages and dispatching their context to the user.
27  */
28 final class DataTreeCohortActor extends AbstractUntypedActor {
29     private final CohortBehaviour<?> idleState = new Idle();
30     private final DOMDataTreeCommitCohort cohort;
31     private CohortBehaviour<?> currentState = idleState;
32
33     private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
34         this.cohort = Preconditions.checkNotNull(cohort);
35     }
36
37     @Override
38     protected void handleReceive(final Object message) {
39         currentState = currentState.handle(message);
40     }
41
42
43     /**
44      * Abstract message base for messages handled by {@link DataTreeCohortActor}.
45      *
46      * @param <R> Reply message type
47      */
48     abstract static class CommitProtocolCommand<R extends CommitReply> {
49
50         private final TransactionIdentifier txId;
51
52         final TransactionIdentifier getTxId() {
53             return txId;
54         }
55
56         protected CommitProtocolCommand(TransactionIdentifier txId) {
57             this.txId = Preconditions.checkNotNull(txId);
58         }
59     }
60
61     static final class CanCommit extends CommitProtocolCommand<Success> {
62
63         private final DOMDataTreeCandidate candidate;
64         private final ActorRef cohort;
65         private final SchemaContext schema;
66
67         CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
68             super(txId);
69             this.cohort = Preconditions.checkNotNull(cohort);
70             this.candidate = Preconditions.checkNotNull(candidate);
71             this.schema = Preconditions.checkNotNull(schema);
72         }
73
74         DOMDataTreeCandidate getCandidate() {
75             return candidate;
76         }
77
78         SchemaContext getSchema() {
79             return schema;
80         }
81
82         ActorRef getCohort() {
83             return cohort;
84         }
85
86     }
87
88     abstract static class CommitReply {
89
90         private final ActorRef cohortRef;
91         private final TransactionIdentifier txId;
92
93         protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
94             this.cohortRef = Preconditions.checkNotNull(cohortRef);
95             this.txId = Preconditions.checkNotNull(txId);
96         }
97
98         ActorRef getCohort() {
99             return cohortRef;
100         }
101
102         final TransactionIdentifier getTxId() {
103             return txId;
104         }
105     }
106
107     static final class Success extends CommitReply {
108
109         Success(ActorRef cohortRef, TransactionIdentifier txId) {
110             super(cohortRef, txId);
111         }
112
113     }
114
115     static final class PreCommit extends CommitProtocolCommand<Success> {
116
117         PreCommit(TransactionIdentifier txId) {
118             super(txId);
119         }
120     }
121
122     static final class Abort extends CommitProtocolCommand<Success> {
123
124         Abort(TransactionIdentifier txId) {
125             super(txId);
126         }
127     }
128
129     static final class Commit extends CommitProtocolCommand<Success> {
130
131         Commit(TransactionIdentifier txId) {
132             super(txId);
133         }
134     }
135
136     private abstract static class CohortBehaviour<E> {
137
138         abstract Class<E> getHandledMessageType();
139
140         CohortBehaviour<?> handle(Object message) {
141             if (getHandledMessageType().isInstance(message)) {
142                 return process(getHandledMessageType().cast(message));
143             } else if (message instanceof Abort) {
144                 return abort();
145             }
146             throw new UnsupportedOperationException();
147         }
148
149         abstract CohortBehaviour<?> abort();
150
151         abstract CohortBehaviour<?> process(E message);
152
153     }
154
155     private class Idle extends CohortBehaviour<CanCommit> {
156
157         @Override
158         Class<CanCommit> getHandledMessageType() {
159             return CanCommit.class;
160         }
161
162         @Override
163         @SuppressWarnings("checkstyle:IllegalCatch")
164         CohortBehaviour<?> process(CanCommit message) {
165             final PostCanCommitStep nextStep;
166             try {
167                 nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
168             } catch (final Exception e) {
169                 getSender().tell(new Status.Failure(e), getSelf());
170                 return this;
171             }
172             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
173             return new PostCanCommit(message.getTxId(), nextStep);
174         }
175
176         @Override
177         CohortBehaviour<?> abort() {
178             return this;
179         }
180
181     }
182
183
184     private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
185             extends CohortBehaviour<M> {
186
187         private final S step;
188         private final TransactionIdentifier txId;
189
190         CohortStateWithStep(TransactionIdentifier txId, S step) {
191             this.txId = Preconditions.checkNotNull(txId);
192             this.step = Preconditions.checkNotNull(step);
193         }
194
195         final S getStep() {
196             return step;
197         }
198
199         final TransactionIdentifier getTxId() {
200             return txId;
201         }
202
203         @Override
204         @SuppressWarnings("checkstyle:IllegalCatch")
205         final CohortBehaviour<?> abort() {
206             try {
207                 getStep().abort().get();
208             } catch (final Exception e) {
209                 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
210                 getSender().tell(new Status.Failure(e), getSelf());
211                 return idleState;
212             }
213             getSender().tell(new Success(getSelf(), txId), getSelf());
214             return idleState;
215         }
216
217     }
218
219     private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
220
221         PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
222             super(txId, nextStep);
223         }
224
225         @Override
226         Class<PreCommit> getHandledMessageType() {
227             return PreCommit.class;
228         }
229
230         @Override
231         @SuppressWarnings("checkstyle:IllegalCatch")
232         CohortBehaviour<?> process(PreCommit message) {
233             final PostPreCommitStep nextStep;
234             try {
235                 nextStep = getStep().preCommit().get();
236             } catch (final Exception e) {
237                 getSender().tell(new Status.Failure(e), getSelf());
238                 return idleState;
239             }
240             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
241             return new PostPreCommit(getTxId(), nextStep);
242         }
243
244     }
245
246     private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
247
248         PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
249             super(txId, step);
250         }
251
252         @Override
253         @SuppressWarnings("checkstyle:IllegalCatch")
254         CohortBehaviour<?> process(Commit message) {
255             try {
256                 getStep().commit().get();
257             } catch (final Exception e) {
258                 getSender().tell(new Status.Failure(e), getSender());
259                 return idleState;
260             }
261             getSender().tell(new Success(getSelf(), getTxId()), getSelf());
262             return idleState;
263         }
264
265         @Override
266         Class<Commit> getHandledMessageType() {
267             return Commit.class;
268         }
269
270     }
271
272     static Props props(final DOMDataTreeCommitCohort cohort) {
273         return Props.create(DataTreeCohortActor.class, cohort);
274     }
275 }