2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Stopwatch;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.TimeoutException;
17 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
18 import org.opendaylight.controller.cluster.datastore.modification.Modification;
19 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
20 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
21 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
22 import scala.concurrent.duration.Duration;
24 final class CohortEntry {
33 private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
35 private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
36 private final ReadWriteShardDataTreeTransaction transaction;
37 private final String transactionID;
38 private final CompositeDataTreeCohort userCohorts;
39 private final short clientVersion;
41 private State state = State.PENDING;
42 private RuntimeException lastBatchedModificationsException;
43 private int totalBatchedModificationsReceived;
44 private ShardDataTreeCohort cohort;
45 private boolean doImmediateCommit;
46 private ActorRef replySender;
49 CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
50 DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
51 this.transaction = Preconditions.checkNotNull(transaction);
52 this.transactionID = transactionID;
53 this.clientVersion = clientVersion;
54 this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
57 CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
58 SchemaContext schema, short clientVersion) {
59 this.transactionID = transactionID;
61 this.transaction = null;
62 this.clientVersion = clientVersion;
63 this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
66 void updateLastAccessTime() {
67 lastAccessTimer.reset();
68 lastAccessTimer.start();
71 String getTransactionID() {
75 short getClientVersion() {
83 DataTreeCandidate getCandidate() {
84 return cohort.getCandidate();
87 DataTreeModification getDataTreeModification() {
88 return cohort.getDataTreeModification();
91 ReadWriteShardDataTreeTransaction getTransaction() {
95 int getTotalBatchedModificationsReceived() {
96 return totalBatchedModificationsReceived;
99 RuntimeException getLastBatchedModificationsException() {
100 return lastBatchedModificationsException;
103 void applyModifications(Iterable<Modification> modifications) {
104 totalBatchedModificationsReceived++;
105 if(lastBatchedModificationsException == null) {
106 for (Modification modification : modifications) {
108 modification.apply(transaction.getSnapshot());
109 } catch (RuntimeException e) {
110 lastBatchedModificationsException = e;
117 boolean canCommit() throws InterruptedException, ExecutionException {
118 state = State.CAN_COMMITTED;
120 // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
121 // about possibly accessing our state on a different thread outside of our dispatcher.
122 // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
123 // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
124 // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
125 return cohort.canCommit().get();
130 void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
131 state = State.PRE_COMMITTED;
132 cohort.preCommit().get();
133 userCohorts.canCommit(cohort.getCandidate());
134 userCohorts.preCommit();
137 void commit() throws InterruptedException, ExecutionException, TimeoutException {
138 state = State.COMMITTED;
139 cohort.commit().get();
140 userCohorts.commit();
143 void abort() throws InterruptedException, ExecutionException, TimeoutException {
144 state = State.ABORTED;
145 cohort.abort().get();
149 void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
150 Preconditions.checkState(cohort == null, "cohort was already set");
152 setDoImmediateCommit(doImmediateCommit);
154 cohort = transaction.ready();
156 if(cohortDecorator != null) {
157 // Call the hook for unit tests.
158 cohort = cohortDecorator.decorate(transactionID, cohort);
162 boolean isReadyToCommit() {
163 return replySender != null;
166 boolean isExpired(long expireTimeInMillis) {
167 return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
170 boolean isDoImmediateCommit() {
171 return doImmediateCommit;
174 void setDoImmediateCommit(boolean doImmediateCommit) {
175 this.doImmediateCommit = doImmediateCommit;
178 ActorRef getReplySender() {
182 void setReplySender(ActorRef replySender) {
183 this.replySender = replySender;
190 void setShard(Shard shard) {
195 boolean isAborted() {
196 return state == State.ABORTED;
200 public String toString() {
201 final StringBuilder builder = new StringBuilder();
202 builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
203 .append(doImmediateCommit).append("]");
204 return builder.toString();