2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.util.Timeout;
12 import com.google.common.base.Preconditions;
13 import com.google.common.base.Stopwatch;
14 import java.util.concurrent.ExecutionException;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.TimeoutException;
17 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
18 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
19 import org.opendaylight.controller.cluster.datastore.modification.Modification;
20 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
21 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
22 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
23 import scala.concurrent.duration.Duration;
25 final class CohortEntry {
34 private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
36 private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
37 private final ReadWriteShardDataTreeTransaction transaction;
38 private final TransactionIdentifier transactionID;
39 private final CompositeDataTreeCohort userCohorts;
40 private final short clientVersion;
42 private State state = State.PENDING;
43 private RuntimeException lastBatchedModificationsException;
44 private int totalBatchedModificationsReceived;
45 private ShardDataTreeCohort cohort;
46 private boolean doImmediateCommit;
47 private ActorRef replySender;
50 private CohortEntry(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
51 DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
52 this.transaction = Preconditions.checkNotNull(transaction);
53 this.transactionID = Preconditions.checkNotNull(transactionID);
54 this.clientVersion = clientVersion;
55 this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
58 private CohortEntry(TransactionIdentifier transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
59 SchemaContext schema, short clientVersion) {
60 this.transactionID = Preconditions.checkNotNull(transactionID);
62 this.transaction = null;
63 this.clientVersion = clientVersion;
64 this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
67 static CohortEntry createOpen(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
68 DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
69 return new CohortEntry(transactionID, transaction, cohortRegistry, schema, clientVersion);
72 static CohortEntry createReady(TransactionIdentifier transactionID, ShardDataTreeCohort cohort,
73 DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
74 return new CohortEntry(transactionID, cohort, cohortRegistry, schema, clientVersion);
77 void updateLastAccessTime() {
78 lastAccessTimer.reset();
79 lastAccessTimer.start();
82 TransactionIdentifier getTransactionID() {
86 short getClientVersion() {
94 DataTreeCandidate getCandidate() {
95 return cohort.getCandidate();
98 DataTreeModification getDataTreeModification() {
99 return cohort.getDataTreeModification();
102 ReadWriteShardDataTreeTransaction getTransaction() {
106 int getTotalBatchedModificationsReceived() {
107 return totalBatchedModificationsReceived;
110 RuntimeException getLastBatchedModificationsException() {
111 return lastBatchedModificationsException;
114 void applyModifications(Iterable<Modification> modifications) {
115 totalBatchedModificationsReceived++;
116 if(lastBatchedModificationsException == null) {
117 for (Modification modification : modifications) {
119 modification.apply(transaction.getSnapshot());
120 } catch (RuntimeException e) {
121 lastBatchedModificationsException = e;
128 boolean canCommit() throws InterruptedException, ExecutionException {
129 state = State.CAN_COMMITTED;
131 // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
132 // about possibly accessing our state on a different thread outside of our dispatcher.
133 // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
134 // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
135 // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
136 return cohort.canCommit().get();
141 void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
142 state = State.PRE_COMMITTED;
143 cohort.preCommit().get();
144 userCohorts.canCommit(cohort.getCandidate());
145 userCohorts.preCommit();
148 void commit() throws InterruptedException, ExecutionException, TimeoutException {
149 state = State.COMMITTED;
150 cohort.commit().get();
151 userCohorts.commit();
154 void abort() throws InterruptedException, ExecutionException, TimeoutException {
155 state = State.ABORTED;
156 cohort.abort().get();
160 void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
161 Preconditions.checkState(cohort == null, "cohort was already set");
163 setDoImmediateCommit(doImmediateCommit);
165 cohort = transaction.ready();
167 if(cohortDecorator != null) {
168 // Call the hook for unit tests.
169 cohort = cohortDecorator.decorate(transactionID, cohort);
173 boolean isReadyToCommit() {
174 return replySender != null;
177 boolean isExpired(long expireTimeInMillis) {
178 return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
181 boolean isDoImmediateCommit() {
182 return doImmediateCommit;
185 void setDoImmediateCommit(boolean doImmediateCommit) {
186 this.doImmediateCommit = doImmediateCommit;
189 ActorRef getReplySender() {
193 void setReplySender(ActorRef replySender) {
194 this.replySender = replySender;
201 void setShard(Shard shard) {
206 boolean isAborted() {
207 return state == State.ABORTED;
211 public String toString() {
212 final StringBuilder builder = new StringBuilder();
213 builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
214 .append(doImmediateCommit).append("]");
215 return builder.toString();