2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Stopwatch;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.TimeoutException;
17 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
18 import org.opendaylight.controller.cluster.datastore.modification.Modification;
19 import org.opendaylight.yangtools.util.StringIdentifier;
20 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
21 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
22 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
23 import scala.concurrent.duration.Duration;
25 final class CohortEntry {
34 private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
36 private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
37 private final ReadWriteShardDataTreeTransaction transaction;
38 private final StringIdentifier transactionID;
39 private final CompositeDataTreeCohort userCohorts;
40 private final short clientVersion;
42 private State state = State.PENDING;
43 private RuntimeException lastBatchedModificationsException;
44 private int totalBatchedModificationsReceived;
45 private ShardDataTreeCohort cohort;
46 private boolean doImmediateCommit;
47 private ActorRef replySender;
50 CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
51 DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
52 this.transaction = Preconditions.checkNotNull(transaction);
53 this.transactionID = new StringIdentifier(transactionID);
54 this.clientVersion = clientVersion;
55 this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
58 CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
59 SchemaContext schema, short clientVersion) {
60 this.transactionID = new StringIdentifier(transactionID);
62 this.transaction = null;
63 this.clientVersion = clientVersion;
64 this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
67 void updateLastAccessTime() {
68 lastAccessTimer.reset();
69 lastAccessTimer.start();
72 StringIdentifier getTransactionID() {
76 short getClientVersion() {
84 DataTreeCandidate getCandidate() {
85 return cohort.getCandidate();
88 DataTreeModification getDataTreeModification() {
89 return cohort.getDataTreeModification();
92 ReadWriteShardDataTreeTransaction getTransaction() {
96 int getTotalBatchedModificationsReceived() {
97 return totalBatchedModificationsReceived;
100 RuntimeException getLastBatchedModificationsException() {
101 return lastBatchedModificationsException;
104 void applyModifications(Iterable<Modification> modifications) {
105 totalBatchedModificationsReceived++;
106 if(lastBatchedModificationsException == null) {
107 for (Modification modification : modifications) {
109 modification.apply(transaction.getSnapshot());
110 } catch (RuntimeException e) {
111 lastBatchedModificationsException = e;
118 boolean canCommit() throws InterruptedException, ExecutionException {
119 state = State.CAN_COMMITTED;
121 // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
122 // about possibly accessing our state on a different thread outside of our dispatcher.
123 // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
124 // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
125 // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
126 return cohort.canCommit().get();
131 void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
132 state = State.PRE_COMMITTED;
133 cohort.preCommit().get();
134 userCohorts.canCommit(cohort.getCandidate());
135 userCohorts.preCommit();
138 void commit() throws InterruptedException, ExecutionException, TimeoutException {
139 state = State.COMMITTED;
140 cohort.commit().get();
141 userCohorts.commit();
144 void abort() throws InterruptedException, ExecutionException, TimeoutException {
145 state = State.ABORTED;
146 cohort.abort().get();
150 void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
151 Preconditions.checkState(cohort == null, "cohort was already set");
153 setDoImmediateCommit(doImmediateCommit);
155 cohort = transaction.ready();
157 if(cohortDecorator != null) {
158 // Call the hook for unit tests.
159 cohort = cohortDecorator.decorate(transactionID, cohort);
163 boolean isReadyToCommit() {
164 return replySender != null;
167 boolean isExpired(long expireTimeInMillis) {
168 return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
171 boolean isDoImmediateCommit() {
172 return doImmediateCommit;
175 void setDoImmediateCommit(boolean doImmediateCommit) {
176 this.doImmediateCommit = doImmediateCommit;
179 ActorRef getReplySender() {
183 void setReplySender(ActorRef replySender) {
184 this.replySender = replySender;
191 void setShard(Shard shard) {
196 boolean isAborted() {
197 return state == State.ABORTED;
201 public String toString() {
202 final StringBuilder builder = new StringBuilder();
203 builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
204 .append(doImmediateCommit).append("]");
205 return builder.toString();