-
- static class CohortEntry {
- private final String transactionID;
- private ShardDataTreeCohort cohort;
- private final ReadWriteShardDataTreeTransaction transaction;
- private RuntimeException lastBatchedModificationsException;
- private ActorRef replySender;
- private Shard shard;
- private boolean doImmediateCommit;
- private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
- private int totalBatchedModificationsReceived;
- private boolean aborted;
- private final short clientVersion;
-
- CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
- this.transaction = Preconditions.checkNotNull(transaction);
- this.transactionID = transactionID;
- this.clientVersion = clientVersion;
- }
-
- CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
- this.transactionID = transactionID;
- this.cohort = cohort;
- this.transaction = null;
- this.clientVersion = clientVersion;
- }
-
- void updateLastAccessTime() {
- lastAccessTimer.reset();
- lastAccessTimer.start();
- }
-
- String getTransactionID() {
- return transactionID;
- }
-
- short getClientVersion() {
- return clientVersion;
- }
-
- DataTreeCandidate getCandidate() {
- return cohort.getCandidate();
- }
-
- ReadWriteShardDataTreeTransaction getTransaction() {
- return transaction;
- }
-
- int getTotalBatchedModificationsReceived() {
- return totalBatchedModificationsReceived;
- }
-
- RuntimeException getLastBatchedModificationsException() {
- return lastBatchedModificationsException;
- }
-
- void applyModifications(Iterable<Modification> modifications) {
- totalBatchedModificationsReceived++;
- if(lastBatchedModificationsException == null) {
- for (Modification modification : modifications) {
- try {
- modification.apply(transaction.getSnapshot());
- } catch (RuntimeException e) {
- lastBatchedModificationsException = e;
- throw e;
- }
- }
- }
- }
-
- boolean canCommit() throws InterruptedException, ExecutionException {
- // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
- // about possibly accessing our state on a different thread outside of our dispatcher.
- // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
- // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
- // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
- return cohort.canCommit().get();
- }
-
- void preCommit() throws InterruptedException, ExecutionException {
- cohort.preCommit().get();
- }
-
- void commit() throws InterruptedException, ExecutionException {
- cohort.commit().get();
- }
-
- void abort() throws InterruptedException, ExecutionException {
- aborted = true;
- cohort.abort().get();
- }
-
- void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
- Preconditions.checkState(cohort == null, "cohort was already set");
-
- setDoImmediateCommit(doImmediateCommit);
-
- cohort = transaction.ready();
-
- if(cohortDecorator != null) {
- // Call the hook for unit tests.
- cohort = cohortDecorator.decorate(transactionID, cohort);
- }
- }
-
- boolean isReadyToCommit() {
- return replySender != null;
- }
-
- boolean isExpired(long expireTimeInMillis) {
- return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
- }
-
- boolean isDoImmediateCommit() {
- return doImmediateCommit;
- }
-
- void setDoImmediateCommit(boolean doImmediateCommit) {
- this.doImmediateCommit = doImmediateCommit;
- }
-
- ActorRef getReplySender() {
- return replySender;
- }
-
- void setReplySender(ActorRef replySender) {
- this.replySender = replySender;
- }
-
- Shard getShard() {
- return shard;
- }
-
- void setShard(Shard shard) {
- this.shard = shard;
- }
-
-
- boolean isAborted() {
- return aborted;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
- .append(doImmediateCommit).append("]");
- return builder.toString();
- }
- }