Cache MapJoiner
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
16 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
17 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
18 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
19 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
22 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
23 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
24
25 /**
26  * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
27  * decapsulating DataTreeChanged messages and dispatching their context to the user.
28  */
29 final class DataTreeCohortActor extends AbstractUntypedActor {
30     private final CohortBehaviour<?> idleState = new Idle();
31     private final DOMDataTreeCommitCohort cohort;
32     private final YangInstanceIdentifier registeredPath;
33     private CohortBehaviour<?> currentState = idleState;
34
35     private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
36         this.cohort = Preconditions.checkNotNull(cohort);
37         this.registeredPath = Preconditions.checkNotNull(registeredPath);
38     }
39
40     @Override
41     protected void handleReceive(final Object message) {
42         currentState = currentState.handle(message);
43     }
44
45
46     /**
47      * Abstract message base for messages handled by {@link DataTreeCohortActor}.
48      *
49      * @param <R> Reply message type
50      */
51     abstract static class CommitProtocolCommand<R extends CommitReply> {
52
53         private final TransactionIdentifier txId;
54
55         final TransactionIdentifier getTxId() {
56             return txId;
57         }
58
59         protected CommitProtocolCommand(TransactionIdentifier txId) {
60             this.txId = Preconditions.checkNotNull(txId);
61         }
62     }
63
64     static final class CanCommit extends CommitProtocolCommand<Success> {
65
66         private final DOMDataTreeCandidate candidate;
67         private final ActorRef cohort;
68         private final SchemaContext schema;
69
70         CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
71             super(txId);
72             this.cohort = Preconditions.checkNotNull(cohort);
73             this.candidate = Preconditions.checkNotNull(candidate);
74             this.schema = Preconditions.checkNotNull(schema);
75         }
76
77         DOMDataTreeCandidate getCandidate() {
78             return candidate;
79         }
80
81         SchemaContext getSchema() {
82             return schema;
83         }
84
85         ActorRef getCohort() {
86             return cohort;
87         }
88
89     }
90
91     abstract static class CommitReply {
92
93         private final ActorRef cohortRef;
94         private final TransactionIdentifier txId;
95
96         protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
97             this.cohortRef = Preconditions.checkNotNull(cohortRef);
98             this.txId = Preconditions.checkNotNull(txId);
99         }
100
101         ActorRef getCohort() {
102             return cohortRef;
103         }
104
105         final TransactionIdentifier getTxId() {
106             return txId;
107         }
108     }
109
110     static final class Success extends CommitReply {
111
112         Success(ActorRef cohortRef, TransactionIdentifier txId) {
113             super(cohortRef, txId);
114         }
115
116     }
117
118     static final class PreCommit extends CommitProtocolCommand<Success> {
119
120         PreCommit(TransactionIdentifier txId) {
121             super(txId);
122         }
123     }
124
125     static final class Abort extends CommitProtocolCommand<Success> {
126
127         Abort(TransactionIdentifier txId) {
128             super(txId);
129         }
130     }
131
132     static final class Commit extends CommitProtocolCommand<Success> {
133
134         Commit(TransactionIdentifier txId) {
135             super(txId);
136         }
137     }
138
139     private abstract static class CohortBehaviour<E> {
140
141         abstract Class<E> getHandledMessageType();
142
143         CohortBehaviour<?> handle(Object message) {
144             if (getHandledMessageType().isInstance(message)) {
145                 return process(getHandledMessageType().cast(message));
146             } else if (message instanceof Abort) {
147                 return abort();
148             }
149             throw new UnsupportedOperationException(String.format("Unexpected message %s in cohort behavior %s",
150                     message.getClass(), getClass().getSimpleName()));
151         }
152
153         abstract CohortBehaviour<?> abort();
154
155         abstract CohortBehaviour<?> process(E message);
156
157     }
158
159     private class Idle extends CohortBehaviour<CanCommit> {
160
161         @Override
162         Class<CanCommit> getHandledMessageType() {
163             return CanCommit.class;
164         }
165
166         @Override
167         @SuppressWarnings("checkstyle:IllegalCatch")
168         CohortBehaviour<?> process(CanCommit message) {
169             final PostCanCommitStep nextStep;
170             try {
171                 nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
172             } catch (final Exception e) {
173                 getSender().tell(new Status.Failure(e), getSelf());
174                 return this;
175             }
176             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
177             return new PostCanCommit(message.getTxId(), nextStep);
178         }
179
180         @Override
181         CohortBehaviour<?> abort() {
182             return this;
183         }
184
185     }
186
187
188     private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
189             extends CohortBehaviour<M> {
190
191         private final S step;
192         private final TransactionIdentifier txId;
193
194         CohortStateWithStep(TransactionIdentifier txId, S step) {
195             this.txId = Preconditions.checkNotNull(txId);
196             this.step = Preconditions.checkNotNull(step);
197         }
198
199         final S getStep() {
200             return step;
201         }
202
203         final TransactionIdentifier getTxId() {
204             return txId;
205         }
206
207         @Override
208         @SuppressWarnings("checkstyle:IllegalCatch")
209         final CohortBehaviour<?> abort() {
210             try {
211                 getStep().abort().get();
212             } catch (final Exception e) {
213                 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
214                 getSender().tell(new Status.Failure(e), getSelf());
215                 return idleState;
216             }
217             getSender().tell(new Success(getSelf(), txId), getSelf());
218             return idleState;
219         }
220
221     }
222
223     private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
224
225         PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
226             super(txId, nextStep);
227         }
228
229         @Override
230         Class<PreCommit> getHandledMessageType() {
231             return PreCommit.class;
232         }
233
234         @Override
235         @SuppressWarnings("checkstyle:IllegalCatch")
236         CohortBehaviour<?> process(PreCommit message) {
237             final PostPreCommitStep nextStep;
238             try {
239                 nextStep = getStep().preCommit().get();
240             } catch (final Exception e) {
241                 getSender().tell(new Status.Failure(e), getSelf());
242                 return idleState;
243             }
244             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
245             return new PostPreCommit(getTxId(), nextStep);
246         }
247
248     }
249
250     private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
251
252         PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
253             super(txId, step);
254         }
255
256         @Override
257         @SuppressWarnings("checkstyle:IllegalCatch")
258         CohortBehaviour<?> process(Commit message) {
259             try {
260                 getStep().commit().get();
261             } catch (final Exception e) {
262                 getSender().tell(new Status.Failure(e), getSender());
263                 return idleState;
264             }
265             getSender().tell(new Success(getSelf(), getTxId()), getSelf());
266             return idleState;
267         }
268
269         @Override
270         Class<Commit> getHandledMessageType() {
271             return Commit.class;
272         }
273
274     }
275
276     static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) {
277         return Props.create(DataTreeCohortActor.class, cohort, registeredPath);
278     }
279 }