BUG-5280: refactor CohortEntry
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status.Failure;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Queue;
24 import java.util.concurrent.ExecutionException;
25 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
28 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
37 import org.opendaylight.yangtools.concepts.Identifier;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import org.slf4j.Logger;
40
41 /**
42  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
43  *
44  * @author Thomas Pantelis
45  */
46 final class ShardCommitCoordinator {
47
48     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
49     public interface CohortDecorator {
50         ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
51     }
52
53     private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
54
55     private CohortEntry currentCohortEntry;
56
57     private final ShardDataTree dataTree;
58
59     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
60
61     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
62     // since this should only be accessed on the shard's dispatcher.
63     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
64
65     private int queueCapacity;
66
67     private final Logger log;
68
69     private final String name;
70
71     private final long cacheExpiryTimeoutInMillis;
72
73     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
74     private CohortDecorator cohortDecorator;
75
76     private ReadyTransactionReply readyTransactionReply;
77
78     private Runnable runOnPendingTransactionsComplete;
79
80     ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
81             String name) {
82
83         this.queueCapacity = queueCapacity;
84         this.log = log;
85         this.name = name;
86         this.dataTree = Preconditions.checkNotNull(dataTree);
87         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
88     }
89
90     int getQueueSize() {
91         return queuedCohortEntries.size();
92     }
93
94     int getCohortCacheSize() {
95         return cohortCache.size();
96     }
97
98     void setQueueCapacity(int queueCapacity) {
99         this.queueCapacity = queueCapacity;
100     }
101
102     private ReadyTransactionReply readyTransactionReply(Shard shard) {
103         if(readyTransactionReply == null) {
104             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
105         }
106
107         return readyTransactionReply;
108     }
109
110     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
111         if(queuedCohortEntries.size() < queueCapacity) {
112             queuedCohortEntries.offer(cohortEntry);
113
114             log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
115                     queuedCohortEntries.size());
116
117             return true;
118         } else {
119             cohortCache.remove(cohortEntry.getTransactionID());
120
121             final RuntimeException ex = new RuntimeException(
122                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
123                                   " capacity %d has been reached.",
124                                   name, cohortEntry.getTransactionID(), queueCapacity));
125             log.error(ex.getMessage());
126             sender.tell(new Failure(ex), shard.self());
127             return false;
128         }
129     }
130
131     /**
132      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
133      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
134      *
135      * @param ready the ForwardedReadyTransaction message to process
136      * @param sender the sender of the message
137      * @param shard the transaction's shard actor
138      * @param schema
139      */
140     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
141             SchemaContext schema) {
142         log.debug("{}: Readying transaction {}, client version {}", name,
143                 ready.getTransactionID(), ready.getTxnClientVersion());
144
145         final ShardDataTreeCohort cohort = ready.getTransaction().ready();
146         final CohortEntry cohortEntry = CohortEntry.createReady(ready.getTransactionID(), cohort, cohortRegistry,
147             schema, ready.getTxnClientVersion());
148         cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
149
150         if(!queueCohortEntry(cohortEntry, sender, shard)) {
151             return;
152         }
153
154         if(ready.isDoImmediateCommit()) {
155             cohortEntry.setDoImmediateCommit(true);
156             cohortEntry.setReplySender(sender);
157             cohortEntry.setShard(shard);
158             handleCanCommit(cohortEntry);
159         } else {
160             // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
161             // front-end so send back a ReadyTransactionReply with our actor path.
162             sender.tell(readyTransactionReply(shard), shard.self());
163         }
164     }
165
166     /**
167      * This method handles a BatchedModifications message for a transaction being prepared directly on the
168      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
169      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
170      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
171      *
172      * @param batched the BatchedModifications message to process
173      * @param sender the sender of the message
174      */
175     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
176         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
177         if(cohortEntry == null) {
178             cohortEntry = CohortEntry.createOpen(batched.getTransactionID(),
179                     dataTree.newReadWriteTransaction(batched.getTransactionID()),
180                     cohortRegistry, dataTree.getSchemaContext(), batched.getVersion());
181             cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
182         }
183
184         if(log.isDebugEnabled()) {
185             log.debug("{}: Applying {} batched modifications for Tx {}", name,
186                     batched.getModifications().size(), batched.getTransactionID());
187         }
188
189         cohortEntry.applyModifications(batched.getModifications());
190
191         if(batched.isReady()) {
192             if(cohortEntry.getLastBatchedModificationsException() != null) {
193                 cohortCache.remove(cohortEntry.getTransactionID());
194                 throw cohortEntry.getLastBatchedModificationsException();
195             }
196
197             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
198                 cohortCache.remove(cohortEntry.getTransactionID());
199                 throw new IllegalStateException(String.format(
200                         "The total number of batched messages received %d does not match the number sent %d",
201                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
202             }
203
204             if(!queueCohortEntry(cohortEntry, sender, shard)) {
205                 return;
206             }
207
208             if(log.isDebugEnabled()) {
209                 log.debug("{}: Readying Tx {}, client version {}", name,
210                         batched.getTransactionID(), batched.getVersion());
211             }
212
213             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
214
215             if(batched.isDoCommitOnReady()) {
216                 cohortEntry.setReplySender(sender);
217                 cohortEntry.setShard(shard);
218                 handleCanCommit(cohortEntry);
219             } else {
220                 sender.tell(readyTransactionReply(shard), shard.self());
221             }
222         } else {
223             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
224         }
225     }
226
227     /**
228      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
229      * been prepared beforehand by the sender and we just need to drive them through into the
230      * dataTree.
231      *
232      * @param message the ReadyLocalTransaction message to process
233      * @param sender the sender of the message
234      * @param shard the transaction's shard actor
235      */
236     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
237         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
238                 message.getTransactionID());
239         final CohortEntry cohortEntry = CohortEntry.createReady(message.getTransactionID(), cohort, cohortRegistry,
240             dataTree.getSchemaContext(), DataStoreVersions.CURRENT_VERSION);
241         cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
242         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
243
244         if(!queueCohortEntry(cohortEntry, sender, shard)) {
245             return;
246         }
247
248         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
249
250         if (message.isDoCommitOnReady()) {
251             cohortEntry.setReplySender(sender);
252             cohortEntry.setShard(shard);
253             handleCanCommit(cohortEntry);
254         } else {
255             sender.tell(readyTransactionReply(shard), shard.self());
256         }
257     }
258
259     Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
260             final int maxModificationsPerBatch) {
261         CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
262         if(cohortEntry == null || cohortEntry.getTransaction() == null) {
263             return Collections.singletonList(from);
264         }
265
266         cohortEntry.applyModifications(from.getModifications());
267
268         final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
269         cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
270             @Override
271             protected BatchedModifications getModifications() {
272                 if(newModifications.isEmpty() ||
273                         newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
274                     newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
275                 }
276
277                 return newModifications.getLast();
278             }
279         });
280
281         BatchedModifications last = newModifications.getLast();
282         last.setDoCommitOnReady(from.isDoCommitOnReady());
283         last.setReady(from.isReady());
284         last.setTotalMessagesSent(newModifications.size());
285         return newModifications;
286     }
287
288     private void handleCanCommit(CohortEntry cohortEntry) {
289         cohortEntry.updateLastAccessTime();
290
291         if(currentCohortEntry != null) {
292             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
293             // queue and will get processed after all prior entries complete.
294
295             if(log.isDebugEnabled()) {
296                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
297                         name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
298             }
299
300             return;
301         }
302
303         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
304         // it the current entry and proceed with canCommit.
305         // Purposely checking reference equality here.
306         if(queuedCohortEntries.peek() == cohortEntry) {
307             currentCohortEntry = queuedCohortEntries.poll();
308             doCanCommit(currentCohortEntry);
309         } else {
310             if(log.isDebugEnabled()) {
311                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
312                         queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
313                                 cohortEntry.getTransactionID());
314             }
315         }
316     }
317
318     /**
319      * This method handles the canCommit phase for a transaction.
320      *
321      * @param transactionID the ID of the transaction to canCommit
322      * @param sender the actor to which to send the response
323      * @param shard the transaction's shard actor
324      */
325     void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
326         // Lookup the cohort entry that was cached previously (or should have been) by
327         // transactionReady (via the ForwardedReadyTransaction message).
328         final CohortEntry cohortEntry = cohortCache.get(transactionID);
329         if(cohortEntry == null) {
330             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
331             // between canCommit and ready and the entry was expired from the cache.
332             IllegalStateException ex = new IllegalStateException(
333                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
334             log.error(ex.getMessage());
335             sender.tell(new Failure(ex), shard.self());
336             return;
337         }
338
339         cohortEntry.setReplySender(sender);
340         cohortEntry.setShard(shard);
341
342         handleCanCommit(cohortEntry);
343     }
344
345     private void doCanCommit(final CohortEntry cohortEntry) {
346         boolean canCommit = false;
347         try {
348             canCommit = cohortEntry.canCommit();
349
350             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
351
352             if(cohortEntry.isDoImmediateCommit()) {
353                 if(canCommit) {
354                     doCommit(cohortEntry);
355                 } else {
356                     cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
357                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
358                 }
359             } else {
360                 cohortEntry.getReplySender().tell(
361                         canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
362                             CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
363                         cohortEntry.getShard().self());
364             }
365         } catch (Exception e) {
366             log.debug("{}: An exception occurred during canCommit", name, e);
367
368             Throwable failure = e;
369             if(e instanceof ExecutionException) {
370                 failure = e.getCause();
371             }
372
373             cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
374         } finally {
375             if(!canCommit) {
376                 // Remove the entry from the cache now.
377                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
378             }
379         }
380     }
381
382     private boolean doCommit(CohortEntry cohortEntry) {
383         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
384
385         boolean success = false;
386
387         // We perform the preCommit phase here atomically with the commit phase. This is an
388         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
389         // coordination of preCommit across shards in case of failure but preCommit should not
390         // normally fail since we ensure only one concurrent 3-phase commit.
391
392         try {
393             cohortEntry.preCommit();
394
395             cohortEntry.getShard().continueCommit(cohortEntry);
396
397             cohortEntry.updateLastAccessTime();
398
399             success = true;
400         } catch (Exception e) {
401             log.error("{} An exception occurred while preCommitting transaction {}",
402                     name, cohortEntry.getTransactionID(), e);
403             cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
404
405             currentTransactionComplete(cohortEntry.getTransactionID(), true);
406         }
407
408         return success;
409     }
410
411     /**
412      * This method handles the preCommit and commit phases for a transaction.
413      *
414      * @param transactionID the ID of the transaction to commit
415      * @param sender the actor to which to send the response
416      * @param shard the transaction's shard actor
417      * @return true if the transaction was successfully prepared, false otherwise.
418      */
419     boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
420         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
421         // this transaction.
422         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
423         if(cohortEntry == null) {
424             // We're not the current Tx - the Tx was likely expired b/c it took too long in
425             // between the canCommit and commit messages.
426             IllegalStateException ex = new IllegalStateException(
427                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
428                             name, transactionID));
429             log.error(ex.getMessage());
430             sender.tell(new Failure(ex), shard.self());
431             return false;
432         }
433
434         cohortEntry.setReplySender(sender);
435         return doCommit(cohortEntry);
436     }
437
438     void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
439         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
440         if(cohortEntry != null) {
441             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
442             // aborted during replication in which case we may still commit locally if replication
443             // succeeds.
444             currentTransactionComplete(transactionID, false);
445         } else {
446             cohortEntry = getAndRemoveCohortEntry(transactionID);
447         }
448
449         if(cohortEntry == null) {
450             return;
451         }
452
453         log.debug("{}: Aborting transaction {}", name, transactionID);
454
455         final ActorRef self = shard.getSelf();
456         try {
457             cohortEntry.abort();
458
459             shard.getShardMBean().incrementAbortTransactionsCount();
460
461             if(sender != null) {
462                 sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
463             }
464         } catch (Exception e) {
465             log.error("{}: An exception happened during abort", name, e);
466
467             if(sender != null) {
468                 sender.tell(new Failure(e), self);
469             }
470         }
471     }
472
473     void checkForExpiredTransactions(final long timeout, final Shard shard) {
474         CohortEntry cohortEntry = getCurrentCohortEntry();
475         if(cohortEntry != null) {
476             if(cohortEntry.isExpired(timeout)) {
477                 log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
478                         name, cohortEntry.getTransactionID(), timeout);
479
480                 handleAbort(cohortEntry.getTransactionID(), null, shard);
481             }
482         }
483
484         cleanupExpiredCohortEntries();
485     }
486
487     void abortPendingTransactions(final String reason, final Shard shard) {
488         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
489             return;
490         }
491
492         List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
493
494         log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
495
496         for(CohortEntry cohortEntry: cohortEntries) {
497             if(cohortEntry.getReplySender() != null) {
498                 cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
499             }
500         }
501     }
502
503     private List<CohortEntry> getAndClearPendingCohortEntries() {
504         List<CohortEntry> cohortEntries = new ArrayList<>();
505
506         if(currentCohortEntry != null) {
507             cohortEntries.add(currentCohortEntry);
508             cohortCache.remove(currentCohortEntry.getTransactionID());
509             currentCohortEntry = null;
510         }
511
512         for(CohortEntry cohortEntry: queuedCohortEntries) {
513             cohortEntries.add(cohortEntry);
514             cohortCache.remove(cohortEntry.getTransactionID());
515         }
516
517         queuedCohortEntries.clear();
518         return cohortEntries;
519     }
520
521     Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
522         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
523             return Collections.emptyList();
524         }
525
526         Collection<Object> messages = new ArrayList<>();
527         List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
528         for(CohortEntry cohortEntry: cohortEntries) {
529             if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
530                 continue;
531             }
532
533             final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
534             cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
535                 @Override
536                 protected BatchedModifications getModifications() {
537                     if(newModifications.isEmpty() ||
538                             newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
539                         newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
540                                 cohortEntry.getClientVersion()));
541         }
542
543                     return newModifications.getLast();
544                 }
545             });
546
547             if(!newModifications.isEmpty()) {
548                 BatchedModifications last = newModifications.getLast();
549                 last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
550                 last.setReady(true);
551                 last.setTotalMessagesSent(newModifications.size());
552                 messages.addAll(newModifications);
553
554                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
555                     messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
556                             cohortEntry.getClientVersion()));
557                 }
558
559                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
560                     messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
561                             cohortEntry.getClientVersion()));
562                 }
563             }
564         }
565
566         return messages;
567     }
568
569     /**
570      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
571      * matches the current entry.
572      *
573      * @param transactionID the ID of the transaction
574      * @return the current CohortEntry or null if the given transaction ID does not match the
575      *         current entry.
576      */
577     CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
578         if(isCurrentTransaction(transactionID)) {
579             return currentCohortEntry;
580         }
581
582         return null;
583     }
584
585     CohortEntry getCurrentCohortEntry() {
586         return currentCohortEntry;
587     }
588
589     CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
590         return cohortCache.remove(transactionID);
591     }
592
593     boolean isCurrentTransaction(Identifier transactionID) {
594         return currentCohortEntry != null &&
595                 currentCohortEntry.getTransactionID().equals(transactionID);
596     }
597
598     /**
599      * This method is called when a transaction is complete, successful or not. If the given
600      * given transaction ID matches the current in-progress transaction, the next cohort entry,
601      * if any, is dequeued and processed.
602      *
603      * @param transactionID the ID of the completed transaction
604      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
605      *        the cache.
606      */
607     void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
608         if(removeCohortEntry) {
609             cohortCache.remove(transactionID);
610         }
611
612         if(isCurrentTransaction(transactionID)) {
613             currentCohortEntry = null;
614
615             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
616
617             maybeProcessNextCohortEntry();
618         }
619     }
620
621     private void maybeProcessNextCohortEntry() {
622         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
623         // clean out expired entries.
624         final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
625         while(iter.hasNext()) {
626             final CohortEntry next = iter.next();
627             if(next.isReadyToCommit()) {
628                 if(currentCohortEntry == null) {
629                     if(log.isDebugEnabled()) {
630                         log.debug("{}: Next entry to canCommit {}", name, next);
631                     }
632
633                     iter.remove();
634                     currentCohortEntry = next;
635                     currentCohortEntry.updateLastAccessTime();
636                     doCanCommit(currentCohortEntry);
637                 }
638
639                 break;
640             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
641                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
642                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
643             } else if(!next.isAborted()) {
644                 break;
645             }
646
647             iter.remove();
648             cohortCache.remove(next.getTransactionID());
649         }
650
651         maybeRunOperationOnPendingTransactionsComplete();
652     }
653
654     void cleanupExpiredCohortEntries() {
655         maybeProcessNextCohortEntry();
656     }
657
658     void setRunOnPendingTransactionsComplete(Runnable operation) {
659         runOnPendingTransactionsComplete = operation;
660         maybeRunOperationOnPendingTransactionsComplete();
661     }
662
663     private void maybeRunOperationOnPendingTransactionsComplete() {
664         if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
665             log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
666
667             runOnPendingTransactionsComplete.run();
668             runOnPendingTransactionsComplete = null;
669         }
670     }
671
672     @VisibleForTesting
673     void setCohortDecorator(CohortDecorator cohortDecorator) {
674         this.cohortDecorator = cohortDecorator;
675     }
676
677    void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
678         cohortRegistry.process(sender, message);
679     }
680 }