10ffe1f7b7dd40793820caa078b58be7855a65e1
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DataTreeCohortActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Props;
13 import akka.actor.Status;
14 import com.google.common.base.Preconditions;
15 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
16 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
17 import org.opendaylight.mdsal.common.api.PostCanCommitStep;
18 import org.opendaylight.mdsal.common.api.PostPreCommitStep;
19 import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep;
20 import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
22 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 /**
27  * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for
28  * decapsulating DataTreeChanged messages and dispatching their context to the user.
29  */
30 final class DataTreeCohortActor extends AbstractUntypedActor {
31     private static final Logger LOG = LoggerFactory.getLogger(DataTreeCohortActor.class);
32     private final CohortBehaviour<?> idleState = new Idle();
33     private final DOMDataTreeCommitCohort cohort;
34     private CohortBehaviour<?> currentState = idleState;
35
36     private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort) {
37         this.cohort = Preconditions.checkNotNull(cohort);
38     }
39
40     @Override
41     protected void handleReceive(final Object message) {
42         currentState = currentState.handle(message);
43     }
44
45
46     /**
47      * Abstract message base for messages handled by {@link DataTreeCohortActor}.
48      *
49      * @param <R> Reply message type
50      */
51     static abstract class CommitProtocolCommand<R extends CommitReply> {
52
53         private final TransactionIdentifier txId;
54
55         final TransactionIdentifier getTxId() {
56             return txId;
57         }
58
59         protected CommitProtocolCommand(TransactionIdentifier txId) {
60             this.txId = Preconditions.checkNotNull(txId);
61         }
62     }
63
64     static final class CanCommit extends CommitProtocolCommand<Success> {
65
66         private final DOMDataTreeCandidate candidate;
67         private final ActorRef cohort;
68         private final SchemaContext schema;
69
70         CanCommit(TransactionIdentifier txId, DOMDataTreeCandidate candidate, SchemaContext schema, ActorRef cohort) {
71             super(txId);
72             this.cohort = Preconditions.checkNotNull(cohort);
73             this.candidate = Preconditions.checkNotNull(candidate);
74             this.schema = Preconditions.checkNotNull(schema);
75         }
76
77         DOMDataTreeCandidate getCandidate() {
78             return candidate;
79         }
80
81         SchemaContext getSchema() {
82             return schema;
83         }
84
85         ActorRef getCohort() {
86             return cohort;
87         }
88
89     }
90
91     static abstract class CommitReply {
92
93         private final ActorRef cohortRef;
94         private final TransactionIdentifier txId;
95
96         protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
97             this.cohortRef = Preconditions.checkNotNull(cohortRef);
98             this.txId = Preconditions.checkNotNull(txId);
99         }
100
101         ActorRef getCohort() {
102             return cohortRef;
103         }
104
105         final TransactionIdentifier getTxId() {
106             return txId;
107         }
108     }
109
110     static final class Success extends CommitReply {
111
112         public Success(ActorRef cohortRef, TransactionIdentifier txId) {
113             super(cohortRef, txId);
114         }
115
116     }
117
118     static final class PreCommit extends CommitProtocolCommand<Success> {
119
120         public PreCommit(TransactionIdentifier txId) {
121             super(txId);
122         }
123     }
124
125     static final class Abort extends CommitProtocolCommand<Success> {
126
127         public Abort(TransactionIdentifier txId) {
128             super(txId);
129         }
130     }
131
132     static final class Commit extends CommitProtocolCommand<Success> {
133
134         public Commit(TransactionIdentifier txId) {
135             super(txId);
136         }
137     }
138
139     private static abstract class CohortBehaviour<E> {
140
141         abstract Class<E> getHandledMessageType();
142
143         CohortBehaviour<?> handle(Object message) {
144             if (getHandledMessageType().isInstance(message)) {
145                 return process(getHandledMessageType().cast(message));
146             } else if (message instanceof Abort) {
147                 return abort();
148             }
149             throw new UnsupportedOperationException();
150         }
151
152         abstract CohortBehaviour<?> abort();
153
154         abstract CohortBehaviour<?> process(E message);
155
156     }
157
158     private class Idle extends CohortBehaviour<CanCommit> {
159
160         @Override
161         Class<CanCommit> getHandledMessageType() {
162             return CanCommit.class;
163         }
164
165         @Override
166         CohortBehaviour<?> process(CanCommit message) {
167             final PostCanCommitStep nextStep;
168             try {
169                 nextStep = cohort.canCommit(message.getTxId(), message.getCandidate(), message.getSchema()).get();
170             } catch (final Exception e) {
171                 getSender().tell(new Status.Failure(e), getSelf());
172                 return this;
173             }
174             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
175             return new PostCanCommit(message.getTxId(), nextStep);
176         }
177
178         @Override
179         CohortBehaviour<?> abort() {
180             return this;
181         }
182
183     }
184
185
186     private abstract class CohortStateWithStep<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep>
187             extends CohortBehaviour<M> {
188
189         private final S step;
190         private final TransactionIdentifier txId;
191
192         CohortStateWithStep(TransactionIdentifier txId, S step) {
193             this.txId = Preconditions.checkNotNull(txId);
194             this.step = Preconditions.checkNotNull(step);
195         }
196
197         final S getStep() {
198             return step;
199         }
200
201         final TransactionIdentifier getTxId() {
202             return txId;
203         }
204
205         @Override
206         final CohortBehaviour<?> abort() {
207             try {
208                 getStep().abort().get();
209             } catch (final Exception e) {
210                 LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, e);
211                 getSender().tell(new Status.Failure(e), getSelf());
212                 return idleState;
213             }
214             getSender().tell(new Success(getSelf(), txId), getSelf());
215             return idleState;
216         }
217
218     }
219
220     private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep> {
221
222         PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
223             super(txId, nextStep);
224         }
225
226         @Override
227         Class<PreCommit> getHandledMessageType() {
228             return PreCommit.class;
229         }
230
231         @Override
232         CohortBehaviour<?> process(PreCommit message) {
233             final PostPreCommitStep nextStep;
234             try {
235                 nextStep = getStep().preCommit().get();
236             } catch (final Exception e) {
237                 getSender().tell(new Status.Failure(e), getSelf());
238                 return idleState;
239             }
240             getSender().tell(new Success(getSelf(), message.getTxId()), getSelf());
241             return new PostPreCommit(getTxId(), nextStep);
242         }
243
244     }
245
246     private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep> {
247
248         PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
249             super(txId, step);
250         }
251
252         @Override
253         CohortBehaviour<?> process(Commit message) {
254             try {
255                 getStep().commit().get();
256             } catch (final Exception e) {
257                 getSender().tell(new Status.Failure(e), getSender());
258                 return idleState;
259             }
260             getSender().tell(new Success(getSelf(), getTxId()), getSelf());
261             return idleState;
262         }
263
264         @Override
265         Class<Commit> getHandledMessageType() {
266             return Commit.class;
267         }
268
269     }
270
271     static Props props(final DOMDataTreeCommitCohort cohort) {
272         return Props.create(DataTreeCohortActor.class, cohort);
273     }
274 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.