043333ce2fb8f8530b9c3dd7c487203abad00708
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.Map;
20 import java.util.Queue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
24 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.modification.Modification;
32 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
34 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
35 import org.slf4j.Logger;
36
37 /**
38  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
39  *
40  * @author Thomas Pantelis
41  */
42 class ShardCommitCoordinator {
43
44     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
45     public interface CohortDecorator {
46         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
47     }
48
49     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
50
51     private CohortEntry currentCohortEntry;
52
53     private final ShardDataTree dataTree;
54
55     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
56     // since this should only be accessed on the shard's dispatcher.
57     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
58
59     private int queueCapacity;
60
61     private final Logger log;
62
63     private final String name;
64
65     private final long cacheExpiryTimeoutInMillis;
66
67     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
68     private CohortDecorator cohortDecorator;
69
70     private ReadyTransactionReply readyTransactionReply;
71
72     ShardCommitCoordinator(ShardDataTree dataTree,
73             long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
74
75         this.queueCapacity = queueCapacity;
76         this.log = log;
77         this.name = name;
78         this.dataTree = Preconditions.checkNotNull(dataTree);
79         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
80     }
81
82     int getQueueSize() {
83         return queuedCohortEntries.size();
84     }
85
86     void setQueueCapacity(int queueCapacity) {
87         this.queueCapacity = queueCapacity;
88     }
89
90     private ReadyTransactionReply readyTransactionReply(Shard shard) {
91         if(readyTransactionReply == null) {
92             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
93         }
94
95         return readyTransactionReply;
96     }
97
98     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
99         if(queuedCohortEntries.size() < queueCapacity) {
100             queuedCohortEntries.offer(cohortEntry);
101
102             log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
103                     queuedCohortEntries.size());
104
105             return true;
106         } else {
107             cohortCache.remove(cohortEntry.getTransactionID());
108
109             RuntimeException ex = new RuntimeException(
110                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
111                                   " capacity %d has been reached.",
112                                   name, cohortEntry.getTransactionID(), queueCapacity));
113             log.error(ex.getMessage());
114             sender.tell(new Status.Failure(ex), shard.self());
115             return false;
116         }
117     }
118
119     /**
120      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
121      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
122      *
123      * @param ready the ForwardedReadyTransaction message to process
124      * @param sender the sender of the message
125      * @param shard the transaction's shard actor
126      */
127     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
128         log.debug("{}: Readying transaction {}, client version {}", name,
129                 ready.getTransactionID(), ready.getTxnClientVersion());
130
131         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
132                 (MutableCompositeModification) ready.getModification());
133         cohortCache.put(ready.getTransactionID(), cohortEntry);
134
135         if(!queueCohortEntry(cohortEntry, sender, shard)) {
136             return;
137         }
138
139         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
140             // Return our actor path as we'll handle the three phase commit except if the Tx client
141             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
142             // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
143             // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
144             ActorRef replyActorPath = shard.self();
145             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
146                 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
147                 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
148                         ready.getTransactionID()));
149             }
150
151             ReadyTransactionReply readyTransactionReply =
152                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
153                             ready.getTxnClientVersion());
154             sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
155                 readyTransactionReply, shard.self());
156         } else {
157             if(ready.isDoImmediateCommit()) {
158                 cohortEntry.setDoImmediateCommit(true);
159                 cohortEntry.setReplySender(sender);
160                 cohortEntry.setShard(shard);
161                 handleCanCommit(cohortEntry);
162             } else {
163                 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
164                 // front-end so send back a ReadyTransactionReply with our actor path.
165                 sender.tell(readyTransactionReply(shard), shard.self());
166             }
167         }
168     }
169
170     /**
171      * This method handles a BatchedModifications message for a transaction being prepared directly on the
172      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
173      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
174      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
175      *
176      * @param batched the BatchedModifications message to process
177      * @param sender the sender of the message
178      * @param shard the transaction's shard actor
179      */
180     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
181         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
182         if(cohortEntry == null) {
183             cohortEntry = new CohortEntry(batched.getTransactionID(),
184                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
185                         batched.getTransactionChainID()));
186             cohortCache.put(batched.getTransactionID(), cohortEntry);
187         }
188
189         if(log.isDebugEnabled()) {
190             log.debug("{}: Applying {} batched modifications for Tx {}", name,
191                     batched.getModifications().size(), batched.getTransactionID());
192         }
193
194         cohortEntry.applyModifications(batched.getModifications());
195
196         if(batched.isReady()) {
197             if(cohortEntry.getLastBatchedModificationsException() != null) {
198                 cohortCache.remove(cohortEntry.getTransactionID());
199                 throw cohortEntry.getLastBatchedModificationsException();
200             }
201
202             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
203                 cohortCache.remove(cohortEntry.getTransactionID());
204                 throw new IllegalStateException(String.format(
205                         "The total number of batched messages received %d does not match the number sent %d",
206                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
207             }
208
209             if(!queueCohortEntry(cohortEntry, sender, shard)) {
210                 return;
211             }
212
213             if(log.isDebugEnabled()) {
214                 log.debug("{}: Readying Tx {}, client version {}", name,
215                         batched.getTransactionID(), batched.getVersion());
216             }
217
218             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
219
220             if(batched.isDoCommitOnReady()) {
221                 cohortEntry.setReplySender(sender);
222                 cohortEntry.setShard(shard);
223                 handleCanCommit(cohortEntry);
224             } else {
225                 sender.tell(readyTransactionReply(shard), shard.self());
226             }
227         } else {
228             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
229         }
230     }
231
232     /**
233      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
234      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
235      *
236      * @param message the ReadyLocalTransaction message to process
237      * @param sender the sender of the message
238      * @param shard the transaction's shard actor
239      */
240     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
241         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
242                 message.getTransactionID());
243         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
244         cohortCache.put(message.getTransactionID(), cohortEntry);
245         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
246
247         if(!queueCohortEntry(cohortEntry, sender, shard)) {
248             return;
249         }
250
251         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
252
253         if (message.isDoCommitOnReady()) {
254             cohortEntry.setReplySender(sender);
255             cohortEntry.setShard(shard);
256             handleCanCommit(cohortEntry);
257         } else {
258             sender.tell(readyTransactionReply(shard), shard.self());
259         }
260     }
261
262     private void handleCanCommit(CohortEntry cohortEntry) {
263         String transactionID = cohortEntry.getTransactionID();
264
265         cohortEntry.updateLastAccessTime();
266
267         if(currentCohortEntry != null) {
268             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
269             // queue and will get processed after all prior entries complete.
270
271             if(log.isDebugEnabled()) {
272                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
273                         name, currentCohortEntry.getTransactionID(), transactionID);
274             }
275
276             return;
277         }
278
279         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
280         // it the current entry and proceed with canCommit.
281         // Purposely checking reference equality here.
282         if(queuedCohortEntries.peek() == cohortEntry) {
283             currentCohortEntry = queuedCohortEntries.poll();
284             doCanCommit(currentCohortEntry);
285         } else {
286             if(log.isDebugEnabled()) {
287                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
288                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
289             }
290         }
291     }
292
293     /**
294      * This method handles the canCommit phase for a transaction.
295      *
296      * @param transactionID the ID of the transaction to canCommit
297      * @param sender the actor to which to send the response
298      * @param shard the transaction's shard actor
299      */
300     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
301         // Lookup the cohort entry that was cached previously (or should have been) by
302         // transactionReady (via the ForwardedReadyTransaction message).
303         final CohortEntry cohortEntry = cohortCache.get(transactionID);
304         if(cohortEntry == null) {
305             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
306             // between canCommit and ready and the entry was expired from the cache.
307             IllegalStateException ex = new IllegalStateException(
308                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
309             log.error(ex.getMessage());
310             sender.tell(new Status.Failure(ex), shard.self());
311             return;
312         }
313
314         cohortEntry.setReplySender(sender);
315         cohortEntry.setShard(shard);
316
317         handleCanCommit(cohortEntry);
318     }
319
320     private void doCanCommit(final CohortEntry cohortEntry) {
321         boolean canCommit = false;
322         try {
323             canCommit = cohortEntry.canCommit();
324
325             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
326
327             if(cohortEntry.isDoImmediateCommit()) {
328                 if(canCommit) {
329                     doCommit(cohortEntry);
330                 } else {
331                     cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
332                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
333                 }
334             } else {
335                 cohortEntry.getReplySender().tell(
336                         canCommit ? CanCommitTransactionReply.YES.toSerializable() :
337                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
338             }
339         } catch (Exception e) {
340             log.debug("{}: An exception occurred during canCommit", name, e);
341
342             Throwable failure = e;
343             if(e instanceof ExecutionException) {
344                 failure = e.getCause();
345             }
346
347             cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
348         } finally {
349             if(!canCommit) {
350                 // Remove the entry from the cache now.
351                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
352             }
353         }
354     }
355
356     private boolean doCommit(CohortEntry cohortEntry) {
357         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
358
359         boolean success = false;
360
361         // We perform the preCommit phase here atomically with the commit phase. This is an
362         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
363         // coordination of preCommit across shards in case of failure but preCommit should not
364         // normally fail since we ensure only one concurrent 3-phase commit.
365
366         try {
367             cohortEntry.preCommit();
368
369             cohortEntry.getShard().continueCommit(cohortEntry);
370
371             cohortEntry.updateLastAccessTime();
372
373             success = true;
374         } catch (Exception e) {
375             log.error("{} An exception occurred while preCommitting transaction {}",
376                     name, cohortEntry.getTransactionID(), e);
377             cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
378
379             currentTransactionComplete(cohortEntry.getTransactionID(), true);
380         }
381
382         return success;
383     }
384
385     /**
386      * This method handles the preCommit and commit phases for a transaction.
387      *
388      * @param transactionID the ID of the transaction to commit
389      * @param sender the actor to which to send the response
390      * @param shard the transaction's shard actor
391      * @return true if the transaction was successfully prepared, false otherwise.
392      */
393     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
394         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
395         // this transaction.
396         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
397         if(cohortEntry == null) {
398             // We're not the current Tx - the Tx was likely expired b/c it took too long in
399             // between the canCommit and commit messages.
400             IllegalStateException ex = new IllegalStateException(
401                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
402                             name, transactionID));
403             log.error(ex.getMessage());
404             sender.tell(new akka.actor.Status.Failure(ex), shard.self());
405             return false;
406         }
407
408         cohortEntry.setReplySender(sender);
409         return doCommit(cohortEntry);
410     }
411
412     void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
413         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
414         if(cohortEntry != null) {
415             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
416             // aborted during replication in which case we may still commit locally if replication
417             // succeeds.
418             currentTransactionComplete(transactionID, false);
419         } else {
420             cohortEntry = getAndRemoveCohortEntry(transactionID);
421         }
422
423         if(cohortEntry == null) {
424             return;
425         }
426
427         log.debug("{}: Aborting transaction {}", name, transactionID);
428
429         final ActorRef self = shard.getSelf();
430         try {
431             cohortEntry.abort();
432
433             shard.getShardMBean().incrementAbortTransactionsCount();
434
435             if(sender != null) {
436                 sender.tell(new AbortTransactionReply().toSerializable(), self);
437             }
438         } catch (Exception e) {
439             log.error("{}: An exception happened during abort", name, e);
440
441             if(sender != null) {
442                 sender.tell(new akka.actor.Status.Failure(e), self);
443             }
444         }
445     }
446
447     /**
448      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
449      * matches the current entry.
450      *
451      * @param transactionID the ID of the transaction
452      * @return the current CohortEntry or null if the given transaction ID does not match the
453      *         current entry.
454      */
455     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
456         if(isCurrentTransaction(transactionID)) {
457             return currentCohortEntry;
458         }
459
460         return null;
461     }
462
463     public CohortEntry getCurrentCohortEntry() {
464         return currentCohortEntry;
465     }
466
467     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
468         return cohortCache.remove(transactionID);
469     }
470
471     public boolean isCurrentTransaction(String transactionID) {
472         return currentCohortEntry != null &&
473                 currentCohortEntry.getTransactionID().equals(transactionID);
474     }
475
476     /**
477      * This method is called when a transaction is complete, successful or not. If the given
478      * given transaction ID matches the current in-progress transaction, the next cohort entry,
479      * if any, is dequeued and processed.
480      *
481      * @param transactionID the ID of the completed transaction
482      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
483      *        the cache.
484      */
485     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
486         if(removeCohortEntry) {
487             cohortCache.remove(transactionID);
488         }
489
490         if(isCurrentTransaction(transactionID)) {
491             currentCohortEntry = null;
492
493             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
494
495             maybeProcessNextCohortEntry();
496         }
497     }
498
499     private void maybeProcessNextCohortEntry() {
500         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
501         // clean out expired entries.
502         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
503         while(iter.hasNext()) {
504             CohortEntry next = iter.next();
505             if(next.isReadyToCommit()) {
506                 if(currentCohortEntry == null) {
507                     if(log.isDebugEnabled()) {
508                         log.debug("{}: Next entry to canCommit {}", name, next);
509                     }
510
511                     iter.remove();
512                     currentCohortEntry = next;
513                     currentCohortEntry.updateLastAccessTime();
514                     doCanCommit(currentCohortEntry);
515                 }
516
517                 break;
518             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
519                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
520                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
521             } else if(!next.isAborted()) {
522                 break;
523             }
524
525             iter.remove();
526             cohortCache.remove(next.getTransactionID());
527         }
528     }
529
530     void cleanupExpiredCohortEntries() {
531         maybeProcessNextCohortEntry();
532     }
533
534     @VisibleForTesting
535     void setCohortDecorator(CohortDecorator cohortDecorator) {
536         this.cohortDecorator = cohortDecorator;
537     }
538
539     static class CohortEntry {
540         private final String transactionID;
541         private ShardDataTreeCohort cohort;
542         private final ReadWriteShardDataTreeTransaction transaction;
543         private RuntimeException lastBatchedModificationsException;
544         private ActorRef replySender;
545         private Shard shard;
546         private boolean doImmediateCommit;
547         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
548         private int totalBatchedModificationsReceived;
549         private boolean aborted;
550
551         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
552             this.transaction = Preconditions.checkNotNull(transaction);
553             this.transactionID = transactionID;
554         }
555
556         CohortEntry(String transactionID, ShardDataTreeCohort cohort,
557                 MutableCompositeModification compositeModification) {
558             this.transactionID = transactionID;
559             this.cohort = cohort;
560             this.transaction = null;
561         }
562
563         CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
564             this.transactionID = transactionID;
565             this.cohort = cohort;
566             this.transaction = null;
567         }
568
569         void updateLastAccessTime() {
570             lastAccessTimer.reset();
571             lastAccessTimer.start();
572         }
573
574         String getTransactionID() {
575             return transactionID;
576         }
577
578         DataTreeCandidate getCandidate() {
579             return cohort.getCandidate();
580         }
581
582         int getTotalBatchedModificationsReceived() {
583             return totalBatchedModificationsReceived;
584         }
585
586         RuntimeException getLastBatchedModificationsException() {
587             return lastBatchedModificationsException;
588         }
589
590         void applyModifications(Iterable<Modification> modifications) {
591             totalBatchedModificationsReceived++;
592             if(lastBatchedModificationsException == null) {
593                 for (Modification modification : modifications) {
594                         try {
595                             modification.apply(transaction.getSnapshot());
596                         } catch (RuntimeException e) {
597                             lastBatchedModificationsException = e;
598                             throw e;
599                         }
600                 }
601             }
602         }
603
604         boolean canCommit() throws InterruptedException, ExecutionException {
605             // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
606             // about possibly accessing our state on a different thread outside of our dispatcher.
607             // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
608             // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
609             // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
610             return cohort.canCommit().get();
611         }
612
613         void preCommit() throws InterruptedException, ExecutionException {
614             cohort.preCommit().get();
615         }
616
617         void commit() throws InterruptedException, ExecutionException {
618             cohort.commit().get();
619         }
620
621         void abort() throws InterruptedException, ExecutionException {
622             aborted = true;
623             cohort.abort().get();
624         }
625
626         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
627             Preconditions.checkState(cohort == null, "cohort was already set");
628
629             setDoImmediateCommit(doImmediateCommit);
630
631             cohort = transaction.ready();
632
633             if(cohortDecorator != null) {
634                 // Call the hook for unit tests.
635                 cohort = cohortDecorator.decorate(transactionID, cohort);
636             }
637         }
638
639         boolean isReadyToCommit() {
640             return replySender != null;
641         }
642
643         boolean isExpired(long expireTimeInMillis) {
644             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
645         }
646
647         boolean isDoImmediateCommit() {
648             return doImmediateCommit;
649         }
650
651         void setDoImmediateCommit(boolean doImmediateCommit) {
652             this.doImmediateCommit = doImmediateCommit;
653         }
654
655         ActorRef getReplySender() {
656             return replySender;
657         }
658
659         void setReplySender(ActorRef replySender) {
660             this.replySender = replySender;
661         }
662
663         Shard getShard() {
664             return shard;
665         }
666
667         void setShard(Shard shard) {
668             this.shard = shard;
669         }
670
671
672         boolean isAborted() {
673             return aborted;
674         }
675
676         @Override
677         public String toString() {
678             StringBuilder builder = new StringBuilder();
679             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
680                     .append(doImmediateCommit).append("]");
681             return builder.toString();
682         }
683     }
684 }