package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
-import akka.util.Timeout;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import com.google.common.primitives.UnsignedLong;
+import com.google.common.util.concurrent.FutureCallback;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import scala.concurrent.duration.Duration;
final class CohortEntry {
- enum State {
- PENDING,
- CAN_COMMITTED,
- PRE_COMMITTED,
- COMMITTED,
- ABORTED
- }
-
- private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
-
- private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
private final ReadWriteShardDataTreeTransaction transaction;
- private final TransactionIdentifier transactionID;
- private final CompositeDataTreeCohort userCohorts;
+ private final TransactionIdentifier transactionId;
private final short clientVersion;
- private State state = State.PENDING;
private RuntimeException lastBatchedModificationsException;
private int totalBatchedModificationsReceived;
private ShardDataTreeCohort cohort;
private ActorRef replySender;
private Shard shard;
- CohortEntry(TransactionIdentifier transactionID, ReadWriteShardDataTreeTransaction transaction,
- DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
+ private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
this.transaction = Preconditions.checkNotNull(transaction);
- this.transactionID = Preconditions.checkNotNull(transactionID);
+ this.transactionId = transaction.getIdentifier();
this.clientVersion = clientVersion;
- this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
- CohortEntry(TransactionIdentifier transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
- SchemaContext schema, short clientVersion) {
- this.transactionID = Preconditions.checkNotNull(transactionID);
- this.cohort = cohort;
+ private CohortEntry(final ShardDataTreeCohort cohort, final short clientVersion) {
+ this.cohort = Preconditions.checkNotNull(cohort);
+ this.transactionId = cohort.getIdentifier();
this.transaction = null;
this.clientVersion = clientVersion;
- this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
}
- void updateLastAccessTime() {
- lastAccessTimer.reset();
- lastAccessTimer.start();
+ static CohortEntry createOpen(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
+ return new CohortEntry(transaction, clientVersion);
}
- TransactionIdentifier getTransactionID() {
- return transactionID;
+ static CohortEntry createReady(final ShardDataTreeCohort cohort, final short clientVersion) {
+ return new CohortEntry(cohort, clientVersion);
}
- short getClientVersion() {
- return clientVersion;
+ TransactionIdentifier getTransactionId() {
+ return transactionId;
}
- State getState() {
- return state;
+ short getClientVersion() {
+ return clientVersion;
}
- DataTreeCandidate getCandidate() {
- return cohort.getCandidate();
+ boolean isFailed() {
+ return cohort != null && cohort.isFailed();
}
DataTreeModification getDataTreeModification() {
return lastBatchedModificationsException;
}
- void applyModifications(Iterable<Modification> modifications) {
+ @SuppressWarnings("checkstyle:IllegalCatch")
+ void applyModifications(final Iterable<Modification> modifications) {
totalBatchedModificationsReceived++;
- if(lastBatchedModificationsException == null) {
+ if (lastBatchedModificationsException == null) {
for (Modification modification : modifications) {
- try {
- modification.apply(transaction.getSnapshot());
- } catch (RuntimeException e) {
- lastBatchedModificationsException = e;
- throw e;
- }
+ try {
+ modification.apply(transaction.getSnapshot());
+ } catch (RuntimeException e) {
+ lastBatchedModificationsException = e;
+ throw e;
+ }
}
}
}
- boolean canCommit() throws InterruptedException, ExecutionException {
- state = State.CAN_COMMITTED;
-
- // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
- // about possibly accessing our state on a different thread outside of our dispatcher.
- // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
- // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
- // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
- return cohort.canCommit().get();
+ void canCommit(final FutureCallback<Void> callback) {
+ cohort.canCommit(callback);
}
-
-
- void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
- state = State.PRE_COMMITTED;
- cohort.preCommit().get();
- userCohorts.canCommit(cohort.getCandidate());
- userCohorts.preCommit();
+ void preCommit(final FutureCallback<DataTreeCandidate> callback) {
+ cohort.preCommit(callback);
}
- void commit() throws InterruptedException, ExecutionException, TimeoutException {
- state = State.COMMITTED;
- cohort.commit().get();
- userCohorts.commit();
+ void commit(final FutureCallback<UnsignedLong> callback) {
+ cohort.commit(callback);
}
- void abort() throws InterruptedException, ExecutionException, TimeoutException {
- state = State.ABORTED;
- cohort.abort().get();
- userCohorts.abort();
+ void abort(final FutureCallback<Void> callback) {
+ cohort.abort(callback);
}
- void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
+ void ready(final CohortDecorator cohortDecorator) {
Preconditions.checkState(cohort == null, "cohort was already set");
- setDoImmediateCommit(doImmediateCommit);
-
cohort = transaction.ready();
- if(cohortDecorator != null) {
+ if (cohortDecorator != null) {
// Call the hook for unit tests.
- cohort = cohortDecorator.decorate(transactionID, cohort);
+ cohort = cohortDecorator.decorate(transactionId, cohort);
}
}
- boolean isReadyToCommit() {
- return replySender != null;
- }
-
- boolean isExpired(long expireTimeInMillis) {
- return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
- }
-
boolean isDoImmediateCommit() {
return doImmediateCommit;
}
- void setDoImmediateCommit(boolean doImmediateCommit) {
+ void setDoImmediateCommit(final boolean doImmediateCommit) {
this.doImmediateCommit = doImmediateCommit;
}
return replySender;
}
- void setReplySender(ActorRef replySender) {
+ void setReplySender(final ActorRef replySender) {
this.replySender = replySender;
}
return shard;
}
- void setShard(Shard shard) {
+ void setShard(final Shard shard) {
this.shard = shard;
}
-
- boolean isAborted() {
- return state == State.ABORTED;
- }
-
@Override
public String toString() {
final StringBuilder builder = new StringBuilder();
- builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
+ builder.append("CohortEntry [transactionId=").append(transactionId).append(", doImmediateCommit=")
.append(doImmediateCommit).append("]");
return builder.toString();
}
-}
\ No newline at end of file
+}