06d3ec9d67381e3709617bc4535f40ac37378fc9
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CohortEntry.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Stopwatch;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.TimeoutException;
17 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
18 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
19 import org.opendaylight.controller.cluster.datastore.modification.Modification;
20 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
21 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
22 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
23 import scala.concurrent.duration.Duration;
24
25 final class CohortEntry {
26     enum State {
27         PENDING,
28         CAN_COMMITTED,
29         PRE_COMMITTED,
30         COMMITTED,
31         ABORTED
32     }
33
34     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
35
36     private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
37     private final ReadWriteShardDataTreeTransaction transaction;
38     private final TransactionIdentifier transactionID;
39     private final CompositeDataTreeCohort userCohorts;
40     private final short clientVersion;
41
42     private State state = State.PENDING;
43     private RuntimeException lastBatchedModificationsException;
44     private int totalBatchedModificationsReceived;
45     private ShardDataTreeCohort cohort;
46     private boolean doImmediateCommit;
47     private ActorRef replySender;
48     private Shard shard;
49
50     private CohortEntry(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
51             DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
52         this.transaction = Preconditions.checkNotNull(transaction);
53         this.transactionID = Preconditions.checkNotNull(transactionID);
54         this.clientVersion = clientVersion;
55         this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
56     }
57
58     private CohortEntry(TransactionIdentifier transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
59             SchemaContext schema, short clientVersion) {
60         this.transactionID = Preconditions.checkNotNull(transactionID);
61         this.cohort = cohort;
62         this.transaction = null;
63         this.clientVersion = clientVersion;
64         this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
65     }
66
67     static CohortEntry createOpen(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
68             DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
69         return new CohortEntry(transactionID, transaction, cohortRegistry, schema, clientVersion);
70     }
71
72     static CohortEntry createReady(TransactionIdentifier transactionID, ShardDataTreeCohort cohort,
73             DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
74         return new CohortEntry(transactionID, cohort, cohortRegistry, schema, clientVersion);
75     }
76
77     void updateLastAccessTime() {
78         lastAccessTimer.reset();
79         lastAccessTimer.start();
80     }
81
82     TransactionIdentifier getTransactionID() {
83         return transactionID;
84     }
85
86     short getClientVersion() {
87         return clientVersion;
88     }
89
90     State getState() {
91         return state;
92     }
93
94     DataTreeCandidate getCandidate() {
95         return cohort.getCandidate();
96     }
97
98     DataTreeModification getDataTreeModification() {
99         return cohort.getDataTreeModification();
100     }
101
102     ReadWriteShardDataTreeTransaction getTransaction() {
103         return transaction;
104     }
105
106     int getTotalBatchedModificationsReceived() {
107         return totalBatchedModificationsReceived;
108     }
109
110     RuntimeException getLastBatchedModificationsException() {
111         return lastBatchedModificationsException;
112     }
113
114     void applyModifications(Iterable<Modification> modifications) {
115         totalBatchedModificationsReceived++;
116         if(lastBatchedModificationsException == null) {
117             for (Modification modification : modifications) {
118                     try {
119                         modification.apply(transaction.getSnapshot());
120                     } catch (RuntimeException e) {
121                         lastBatchedModificationsException = e;
122                         throw e;
123                     }
124             }
125         }
126     }
127
128     boolean canCommit() throws InterruptedException, ExecutionException {
129         state = State.CAN_COMMITTED;
130
131         // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
132         // about possibly accessing our state on a different thread outside of our dispatcher.
133         // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
134         // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
135         // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
136         return cohort.canCommit().get();
137     }
138
139
140
141     void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
142         state = State.PRE_COMMITTED;
143         cohort.preCommit().get();
144         userCohorts.canCommit(cohort.getCandidate());
145         userCohorts.preCommit();
146     }
147
148     void commit() throws InterruptedException, ExecutionException, TimeoutException {
149         state = State.COMMITTED;
150         cohort.commit().get();
151         userCohorts.commit();
152     }
153
154     void abort() throws InterruptedException, ExecutionException, TimeoutException {
155         state = State.ABORTED;
156         cohort.abort().get();
157         userCohorts.abort();
158     }
159
160     void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
161         Preconditions.checkState(cohort == null, "cohort was already set");
162
163         setDoImmediateCommit(doImmediateCommit);
164
165         cohort = transaction.ready();
166
167         if(cohortDecorator != null) {
168             // Call the hook for unit tests.
169             cohort = cohortDecorator.decorate(transactionID, cohort);
170         }
171     }
172
173     boolean isReadyToCommit() {
174         return replySender != null;
175     }
176
177     boolean isExpired(long expireTimeInMillis) {
178         return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
179     }
180
181     boolean isDoImmediateCommit() {
182         return doImmediateCommit;
183     }
184
185     void setDoImmediateCommit(boolean doImmediateCommit) {
186         this.doImmediateCommit = doImmediateCommit;
187     }
188
189     ActorRef getReplySender() {
190         return replySender;
191     }
192
193     void setReplySender(ActorRef replySender) {
194         this.replySender = replySender;
195     }
196
197     Shard getShard() {
198         return shard;
199     }
200
201     void setShard(Shard shard) {
202         this.shard = shard;
203     }
204
205
206     boolean isAborted() {
207         return state == State.ABORTED;
208     }
209
210     @Override
211     public String toString() {
212         final StringBuilder builder = new StringBuilder();
213         builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
214                 .append(doImmediateCommit).append("]");
215         return builder.toString();
216     }
217 }