BUG-5280: switch transaction IDs from String to TransactionIdentifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / CohortEntry.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Stopwatch;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.TimeoutException;
17 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
18 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
19 import org.opendaylight.controller.cluster.datastore.modification.Modification;
20 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
21 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
22 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
23 import scala.concurrent.duration.Duration;
24
25 final class CohortEntry {
26     enum State {
27         PENDING,
28         CAN_COMMITTED,
29         PRE_COMMITTED,
30         COMMITTED,
31         ABORTED
32     }
33
34     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
35
36     private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
37     private final ReadWriteShardDataTreeTransaction transaction;
38     private final TransactionIdentifier transactionID;
39     private final CompositeDataTreeCohort userCohorts;
40     private final short clientVersion;
41
42     private State state = State.PENDING;
43     private RuntimeException lastBatchedModificationsException;
44     private int totalBatchedModificationsReceived;
45     private ShardDataTreeCohort cohort;
46     private boolean doImmediateCommit;
47     private ActorRef replySender;
48     private Shard shard;
49
50     CohortEntry(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
51             DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
52         this.transaction = Preconditions.checkNotNull(transaction);
53         this.transactionID = Preconditions.checkNotNull(transactionID);
54         this.clientVersion = clientVersion;
55         this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
56     }
57
58     CohortEntry(TransactionIdentifier transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
59             SchemaContext schema, short clientVersion) {
60         this.transactionID = Preconditions.checkNotNull(transactionID);
61         this.cohort = cohort;
62         this.transaction = null;
63         this.clientVersion = clientVersion;
64         this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
65     }
66
67     void updateLastAccessTime() {
68         lastAccessTimer.reset();
69         lastAccessTimer.start();
70     }
71
72     TransactionIdentifier getTransactionID() {
73         return transactionID;
74     }
75
76     short getClientVersion() {
77         return clientVersion;
78     }
79
80     State getState() {
81         return state;
82     }
83
84     DataTreeCandidate getCandidate() {
85         return cohort.getCandidate();
86     }
87
88     DataTreeModification getDataTreeModification() {
89         return cohort.getDataTreeModification();
90     }
91
92     ReadWriteShardDataTreeTransaction getTransaction() {
93         return transaction;
94     }
95
96     int getTotalBatchedModificationsReceived() {
97         return totalBatchedModificationsReceived;
98     }
99
100     RuntimeException getLastBatchedModificationsException() {
101         return lastBatchedModificationsException;
102     }
103
104     void applyModifications(Iterable<Modification> modifications) {
105         totalBatchedModificationsReceived++;
106         if(lastBatchedModificationsException == null) {
107             for (Modification modification : modifications) {
108                     try {
109                         modification.apply(transaction.getSnapshot());
110                     } catch (RuntimeException e) {
111                         lastBatchedModificationsException = e;
112                         throw e;
113                     }
114             }
115         }
116     }
117
118     boolean canCommit() throws InterruptedException, ExecutionException {
119         state = State.CAN_COMMITTED;
120
121         // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
122         // about possibly accessing our state on a different thread outside of our dispatcher.
123         // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
124         // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
125         // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
126         return cohort.canCommit().get();
127     }
128
129
130
131     void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
132         state = State.PRE_COMMITTED;
133         cohort.preCommit().get();
134         userCohorts.canCommit(cohort.getCandidate());
135         userCohorts.preCommit();
136     }
137
138     void commit() throws InterruptedException, ExecutionException, TimeoutException {
139         state = State.COMMITTED;
140         cohort.commit().get();
141         userCohorts.commit();
142     }
143
144     void abort() throws InterruptedException, ExecutionException, TimeoutException {
145         state = State.ABORTED;
146         cohort.abort().get();
147         userCohorts.abort();
148     }
149
150     void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
151         Preconditions.checkState(cohort == null, "cohort was already set");
152
153         setDoImmediateCommit(doImmediateCommit);
154
155         cohort = transaction.ready();
156
157         if(cohortDecorator != null) {
158             // Call the hook for unit tests.
159             cohort = cohortDecorator.decorate(transactionID, cohort);
160         }
161     }
162
163     boolean isReadyToCommit() {
164         return replySender != null;
165     }
166
167     boolean isExpired(long expireTimeInMillis) {
168         return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
169     }
170
171     boolean isDoImmediateCommit() {
172         return doImmediateCommit;
173     }
174
175     void setDoImmediateCommit(boolean doImmediateCommit) {
176         this.doImmediateCommit = doImmediateCommit;
177     }
178
179     ActorRef getReplySender() {
180         return replySender;
181     }
182
183     void setReplySender(ActorRef replySender) {
184         this.replySender = replySender;
185     }
186
187     Shard getShard() {
188         return shard;
189     }
190
191     void setShard(Shard shard) {
192         this.shard = shard;
193     }
194
195
196     boolean isAborted() {
197         return state == State.ABORTED;
198     }
199
200     @Override
201     public String toString() {
202         final StringBuilder builder = new StringBuilder();
203         builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
204                 .append(doImmediateCommit).append("]");
205         return builder.toString();
206     }
207 }