08ba31f79b65f025e6232cbd07c6e69fb4ef2e3e
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status.Failure;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.Collections;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.LinkedList;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Queue;
24 import java.util.concurrent.ExecutionException;
25 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
26 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
28 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
29 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
32 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
36 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
37 import org.opendaylight.yangtools.concepts.Identifier;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import org.slf4j.Logger;
40
41 /**
42  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
43  *
44  * @author Thomas Pantelis
45  */
46 final class ShardCommitCoordinator {
47
48     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
49     public interface CohortDecorator {
50         ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
51     }
52
53     private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
54
55     private CohortEntry currentCohortEntry;
56
57     private final ShardDataTree dataTree;
58
59     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
60
61     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
62     // since this should only be accessed on the shard's dispatcher.
63     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
64
65     private int queueCapacity;
66
67     private final Logger log;
68
69     private final String name;
70
71     private final long cacheExpiryTimeoutInMillis;
72
73     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
74     private CohortDecorator cohortDecorator;
75
76     private ReadyTransactionReply readyTransactionReply;
77
78     private Runnable runOnPendingTransactionsComplete;
79
80     ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
81             String name) {
82
83         this.queueCapacity = queueCapacity;
84         this.log = log;
85         this.name = name;
86         this.dataTree = Preconditions.checkNotNull(dataTree);
87         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
88     }
89
90     int getQueueSize() {
91         return queuedCohortEntries.size();
92     }
93
94     int getCohortCacheSize() {
95         return cohortCache.size();
96     }
97
98     void setQueueCapacity(int queueCapacity) {
99         this.queueCapacity = queueCapacity;
100     }
101
102     private ReadyTransactionReply readyTransactionReply(Shard shard) {
103         if(readyTransactionReply == null) {
104             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
105         }
106
107         return readyTransactionReply;
108     }
109
110     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
111         if(queuedCohortEntries.size() < queueCapacity) {
112             queuedCohortEntries.offer(cohortEntry);
113
114             log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
115                     queuedCohortEntries.size());
116
117             return true;
118         } else {
119             cohortCache.remove(cohortEntry.getTransactionID());
120
121             final RuntimeException ex = new RuntimeException(
122                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
123                                   " capacity %d has been reached.",
124                                   name, cohortEntry.getTransactionID(), queueCapacity));
125             log.error(ex.getMessage());
126             sender.tell(new Failure(ex), shard.self());
127             return false;
128         }
129     }
130
131     /**
132      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
133      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
134      *
135      * @param ready the ForwardedReadyTransaction message to process
136      * @param sender the sender of the message
137      * @param shard the transaction's shard actor
138      * @param schema
139      */
140     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
141             SchemaContext schema) {
142         log.debug("{}: Readying transaction {}, client version {}", name,
143                 ready.getTransactionID(), ready.getTxnClientVersion());
144
145         final ShardDataTreeCohort cohort = ready.getTransaction().ready();
146         final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
147         cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
148
149         if(!queueCohortEntry(cohortEntry, sender, shard)) {
150             return;
151         }
152
153         if(ready.isDoImmediateCommit()) {
154             cohortEntry.setDoImmediateCommit(true);
155             cohortEntry.setReplySender(sender);
156             cohortEntry.setShard(shard);
157             handleCanCommit(cohortEntry);
158         } else {
159             // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
160             // front-end so send back a ReadyTransactionReply with our actor path.
161             sender.tell(readyTransactionReply(shard), shard.self());
162         }
163     }
164
165     /**
166      * This method handles a BatchedModifications message for a transaction being prepared directly on the
167      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
168      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
169      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
170      *
171      * @param batched the BatchedModifications message to process
172      * @param sender the sender of the message
173      */
174     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
175         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
176         if(cohortEntry == null) {
177             cohortEntry = new CohortEntry(batched.getTransactionID(),
178                     dataTree.newReadWriteTransaction(batched.getTransactionID()),
179                     cohortRegistry, dataTree.getSchemaContext(), batched.getVersion());
180             cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
181         }
182
183         if(log.isDebugEnabled()) {
184             log.debug("{}: Applying {} batched modifications for Tx {}", name,
185                     batched.getModifications().size(), batched.getTransactionID());
186         }
187
188         cohortEntry.applyModifications(batched.getModifications());
189
190         if(batched.isReady()) {
191             if(cohortEntry.getLastBatchedModificationsException() != null) {
192                 cohortCache.remove(cohortEntry.getTransactionID());
193                 throw cohortEntry.getLastBatchedModificationsException();
194             }
195
196             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
197                 cohortCache.remove(cohortEntry.getTransactionID());
198                 throw new IllegalStateException(String.format(
199                         "The total number of batched messages received %d does not match the number sent %d",
200                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
201             }
202
203             if(!queueCohortEntry(cohortEntry, sender, shard)) {
204                 return;
205             }
206
207             if(log.isDebugEnabled()) {
208                 log.debug("{}: Readying Tx {}, client version {}", name,
209                         batched.getTransactionID(), batched.getVersion());
210             }
211
212             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
213
214             if(batched.isDoCommitOnReady()) {
215                 cohortEntry.setReplySender(sender);
216                 cohortEntry.setShard(shard);
217                 handleCanCommit(cohortEntry);
218             } else {
219                 sender.tell(readyTransactionReply(shard), shard.self());
220             }
221         } else {
222             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
223         }
224     }
225
226     /**
227      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
228      * been prepared beforehand by the sender and we just need to drive them through into the
229      * dataTree.
230      *
231      * @param message the ReadyLocalTransaction message to process
232      * @param sender the sender of the message
233      * @param shard the transaction's shard actor
234      */
235     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
236         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
237                 message.getTransactionID());
238         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry,
239             dataTree.getSchemaContext(), DataStoreVersions.CURRENT_VERSION);
240         cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
241         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
242
243         if(!queueCohortEntry(cohortEntry, sender, shard)) {
244             return;
245         }
246
247         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
248
249         if (message.isDoCommitOnReady()) {
250             cohortEntry.setReplySender(sender);
251             cohortEntry.setShard(shard);
252             handleCanCommit(cohortEntry);
253         } else {
254             sender.tell(readyTransactionReply(shard), shard.self());
255         }
256     }
257
258     Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
259             final int maxModificationsPerBatch) {
260         CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
261         if(cohortEntry == null || cohortEntry.getTransaction() == null) {
262             return Collections.singletonList(from);
263         }
264
265         cohortEntry.applyModifications(from.getModifications());
266
267         final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
268         cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
269             @Override
270             protected BatchedModifications getModifications() {
271                 if(newModifications.isEmpty() ||
272                         newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
273                     newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
274                 }
275
276                 return newModifications.getLast();
277             }
278         });
279
280         BatchedModifications last = newModifications.getLast();
281         last.setDoCommitOnReady(from.isDoCommitOnReady());
282         last.setReady(from.isReady());
283         last.setTotalMessagesSent(newModifications.size());
284         return newModifications;
285     }
286
287     private void handleCanCommit(CohortEntry cohortEntry) {
288         cohortEntry.updateLastAccessTime();
289
290         if(currentCohortEntry != null) {
291             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
292             // queue and will get processed after all prior entries complete.
293
294             if(log.isDebugEnabled()) {
295                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
296                         name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
297             }
298
299             return;
300         }
301
302         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
303         // it the current entry and proceed with canCommit.
304         // Purposely checking reference equality here.
305         if(queuedCohortEntries.peek() == cohortEntry) {
306             currentCohortEntry = queuedCohortEntries.poll();
307             doCanCommit(currentCohortEntry);
308         } else {
309             if(log.isDebugEnabled()) {
310                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
311                         queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
312                                 cohortEntry.getTransactionID());
313             }
314         }
315     }
316
317     /**
318      * This method handles the canCommit phase for a transaction.
319      *
320      * @param transactionID the ID of the transaction to canCommit
321      * @param sender the actor to which to send the response
322      * @param shard the transaction's shard actor
323      */
324     void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
325         // Lookup the cohort entry that was cached previously (or should have been) by
326         // transactionReady (via the ForwardedReadyTransaction message).
327         final CohortEntry cohortEntry = cohortCache.get(transactionID);
328         if(cohortEntry == null) {
329             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
330             // between canCommit and ready and the entry was expired from the cache.
331             IllegalStateException ex = new IllegalStateException(
332                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
333             log.error(ex.getMessage());
334             sender.tell(new Failure(ex), shard.self());
335             return;
336         }
337
338         cohortEntry.setReplySender(sender);
339         cohortEntry.setShard(shard);
340
341         handleCanCommit(cohortEntry);
342     }
343
344     private void doCanCommit(final CohortEntry cohortEntry) {
345         boolean canCommit = false;
346         try {
347             canCommit = cohortEntry.canCommit();
348
349             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
350
351             if(cohortEntry.isDoImmediateCommit()) {
352                 if(canCommit) {
353                     doCommit(cohortEntry);
354                 } else {
355                     cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
356                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
357                 }
358             } else {
359                 cohortEntry.getReplySender().tell(
360                         canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
361                             CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
362                         cohortEntry.getShard().self());
363             }
364         } catch (Exception e) {
365             log.debug("{}: An exception occurred during canCommit", name, e);
366
367             Throwable failure = e;
368             if(e instanceof ExecutionException) {
369                 failure = e.getCause();
370             }
371
372             cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
373         } finally {
374             if(!canCommit) {
375                 // Remove the entry from the cache now.
376                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
377             }
378         }
379     }
380
381     private boolean doCommit(CohortEntry cohortEntry) {
382         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
383
384         boolean success = false;
385
386         // We perform the preCommit phase here atomically with the commit phase. This is an
387         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
388         // coordination of preCommit across shards in case of failure but preCommit should not
389         // normally fail since we ensure only one concurrent 3-phase commit.
390
391         try {
392             cohortEntry.preCommit();
393
394             cohortEntry.getShard().continueCommit(cohortEntry);
395
396             cohortEntry.updateLastAccessTime();
397
398             success = true;
399         } catch (Exception e) {
400             log.error("{} An exception occurred while preCommitting transaction {}",
401                     name, cohortEntry.getTransactionID(), e);
402             cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
403
404             currentTransactionComplete(cohortEntry.getTransactionID(), true);
405         }
406
407         return success;
408     }
409
410     /**
411      * This method handles the preCommit and commit phases for a transaction.
412      *
413      * @param transactionID the ID of the transaction to commit
414      * @param sender the actor to which to send the response
415      * @param shard the transaction's shard actor
416      * @return true if the transaction was successfully prepared, false otherwise.
417      */
418     boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
419         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
420         // this transaction.
421         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
422         if(cohortEntry == null) {
423             // We're not the current Tx - the Tx was likely expired b/c it took too long in
424             // between the canCommit and commit messages.
425             IllegalStateException ex = new IllegalStateException(
426                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
427                             name, transactionID));
428             log.error(ex.getMessage());
429             sender.tell(new Failure(ex), shard.self());
430             return false;
431         }
432
433         cohortEntry.setReplySender(sender);
434         return doCommit(cohortEntry);
435     }
436
437     void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
438         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
439         if(cohortEntry != null) {
440             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
441             // aborted during replication in which case we may still commit locally if replication
442             // succeeds.
443             currentTransactionComplete(transactionID, false);
444         } else {
445             cohortEntry = getAndRemoveCohortEntry(transactionID);
446         }
447
448         if(cohortEntry == null) {
449             return;
450         }
451
452         log.debug("{}: Aborting transaction {}", name, transactionID);
453
454         final ActorRef self = shard.getSelf();
455         try {
456             cohortEntry.abort();
457
458             shard.getShardMBean().incrementAbortTransactionsCount();
459
460             if(sender != null) {
461                 sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
462             }
463         } catch (Exception e) {
464             log.error("{}: An exception happened during abort", name, e);
465
466             if(sender != null) {
467                 sender.tell(new Failure(e), self);
468             }
469         }
470     }
471
472     void checkForExpiredTransactions(final long timeout, final Shard shard) {
473         CohortEntry cohortEntry = getCurrentCohortEntry();
474         if(cohortEntry != null) {
475             if(cohortEntry.isExpired(timeout)) {
476                 log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
477                         name, cohortEntry.getTransactionID(), timeout);
478
479                 handleAbort(cohortEntry.getTransactionID(), null, shard);
480             }
481         }
482
483         cleanupExpiredCohortEntries();
484     }
485
486     void abortPendingTransactions(final String reason, final Shard shard) {
487         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
488             return;
489         }
490
491         List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
492
493         log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
494
495         for(CohortEntry cohortEntry: cohortEntries) {
496             if(cohortEntry.getReplySender() != null) {
497                 cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
498             }
499         }
500     }
501
502     private List<CohortEntry> getAndClearPendingCohortEntries() {
503         List<CohortEntry> cohortEntries = new ArrayList<>();
504
505         if(currentCohortEntry != null) {
506             cohortEntries.add(currentCohortEntry);
507             cohortCache.remove(currentCohortEntry.getTransactionID());
508             currentCohortEntry = null;
509         }
510
511         for(CohortEntry cohortEntry: queuedCohortEntries) {
512             cohortEntries.add(cohortEntry);
513             cohortCache.remove(cohortEntry.getTransactionID());
514         }
515
516         queuedCohortEntries.clear();
517         return cohortEntries;
518     }
519
520     Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
521         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
522             return Collections.emptyList();
523         }
524
525         Collection<Object> messages = new ArrayList<>();
526         List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
527         for(CohortEntry cohortEntry: cohortEntries) {
528             if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
529                 continue;
530             }
531
532             final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
533             cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
534                 @Override
535                 protected BatchedModifications getModifications() {
536                     if(newModifications.isEmpty() ||
537                             newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
538                         newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
539                                 cohortEntry.getClientVersion()));
540         }
541
542                     return newModifications.getLast();
543                 }
544             });
545
546             if(!newModifications.isEmpty()) {
547                 BatchedModifications last = newModifications.getLast();
548                 last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
549                 last.setReady(true);
550                 last.setTotalMessagesSent(newModifications.size());
551                 messages.addAll(newModifications);
552
553                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
554                     messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
555                             cohortEntry.getClientVersion()));
556                 }
557
558                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
559                     messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
560                             cohortEntry.getClientVersion()));
561                 }
562             }
563         }
564
565         return messages;
566     }
567
568     /**
569      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
570      * matches the current entry.
571      *
572      * @param transactionID the ID of the transaction
573      * @return the current CohortEntry or null if the given transaction ID does not match the
574      *         current entry.
575      */
576     CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
577         if(isCurrentTransaction(transactionID)) {
578             return currentCohortEntry;
579         }
580
581         return null;
582     }
583
584     CohortEntry getCurrentCohortEntry() {
585         return currentCohortEntry;
586     }
587
588     CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
589         return cohortCache.remove(transactionID);
590     }
591
592     boolean isCurrentTransaction(Identifier transactionID) {
593         return currentCohortEntry != null &&
594                 currentCohortEntry.getTransactionID().equals(transactionID);
595     }
596
597     /**
598      * This method is called when a transaction is complete, successful or not. If the given
599      * given transaction ID matches the current in-progress transaction, the next cohort entry,
600      * if any, is dequeued and processed.
601      *
602      * @param transactionID the ID of the completed transaction
603      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
604      *        the cache.
605      */
606     void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
607         if(removeCohortEntry) {
608             cohortCache.remove(transactionID);
609         }
610
611         if(isCurrentTransaction(transactionID)) {
612             currentCohortEntry = null;
613
614             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
615
616             maybeProcessNextCohortEntry();
617         }
618     }
619
620     private void maybeProcessNextCohortEntry() {
621         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
622         // clean out expired entries.
623         final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
624         while(iter.hasNext()) {
625             final CohortEntry next = iter.next();
626             if(next.isReadyToCommit()) {
627                 if(currentCohortEntry == null) {
628                     if(log.isDebugEnabled()) {
629                         log.debug("{}: Next entry to canCommit {}", name, next);
630                     }
631
632                     iter.remove();
633                     currentCohortEntry = next;
634                     currentCohortEntry.updateLastAccessTime();
635                     doCanCommit(currentCohortEntry);
636                 }
637
638                 break;
639             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
640                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
641                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
642             } else if(!next.isAborted()) {
643                 break;
644             }
645
646             iter.remove();
647             cohortCache.remove(next.getTransactionID());
648         }
649
650         maybeRunOperationOnPendingTransactionsComplete();
651     }
652
653     void cleanupExpiredCohortEntries() {
654         maybeProcessNextCohortEntry();
655     }
656
657     void setRunOnPendingTransactionsComplete(Runnable operation) {
658         runOnPendingTransactionsComplete = operation;
659         maybeRunOperationOnPendingTransactionsComplete();
660     }
661
662     private void maybeRunOperationOnPendingTransactionsComplete() {
663         if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
664             log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
665
666             runOnPendingTransactionsComplete.run();
667             runOnPendingTransactionsComplete = null;
668         }
669     }
670
671     @VisibleForTesting
672     void setCohortDecorator(CohortDecorator cohortDecorator) {
673         this.cohortDecorator = cohortDecorator;
674     }
675
676    void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
677         cohortRegistry.process(sender, message);
678     }
679 }