45fa7727e4f125e11d73658c699ecbc779715ceb
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status.Failure;
12 import akka.serialization.Serialization;
13 import akka.util.Timeout;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Preconditions;
16 import com.google.common.base.Stopwatch;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.Collections;
20 import java.util.HashMap;
21 import java.util.Iterator;
22 import java.util.LinkedList;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Queue;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
30 import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State;
31 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
33 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
34 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.modification.Modification;
41 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
42 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
46 import org.slf4j.Logger;
47 import scala.concurrent.duration.Duration;
48
49 /**
50  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
51  *
52  * @author Thomas Pantelis
53  */
54 class ShardCommitCoordinator {
55
56     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
57     public interface CohortDecorator {
58         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
59     }
60
61     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
62
63     private CohortEntry currentCohortEntry;
64
65     private final ShardDataTree dataTree;
66
67     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
68
69     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
70     // since this should only be accessed on the shard's dispatcher.
71     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
72
73     private int queueCapacity;
74
75     private final Logger log;
76
77     private final String name;
78
79     private final long cacheExpiryTimeoutInMillis;
80
81     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
82     private CohortDecorator cohortDecorator;
83
84     private ReadyTransactionReply readyTransactionReply;
85
86     private Runnable runOnPendingTransactionsComplete;
87
88
89     private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
90
91     ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
92             String name) {
93
94         this.queueCapacity = queueCapacity;
95         this.log = log;
96         this.name = name;
97         this.dataTree = Preconditions.checkNotNull(dataTree);
98         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
99     }
100
101     int getQueueSize() {
102         return queuedCohortEntries.size();
103     }
104
105     int getCohortCacheSize() {
106         return cohortCache.size();
107     }
108
109     void setQueueCapacity(int queueCapacity) {
110         this.queueCapacity = queueCapacity;
111     }
112
113     private ReadyTransactionReply readyTransactionReply(Shard shard) {
114         if(readyTransactionReply == null) {
115             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
116         }
117
118         return readyTransactionReply;
119     }
120
121     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
122         if(queuedCohortEntries.size() < queueCapacity) {
123             queuedCohortEntries.offer(cohortEntry);
124
125             log.debug("{}: Enqueued transaction {}, queue size {}", name, cohortEntry.getTransactionID(),
126                     queuedCohortEntries.size());
127
128             return true;
129         } else {
130             cohortCache.remove(cohortEntry.getTransactionID());
131
132             final RuntimeException ex = new RuntimeException(
133                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
134                                   " capacity %d has been reached.",
135                                   name, cohortEntry.getTransactionID(), queueCapacity));
136             log.error(ex.getMessage());
137             sender.tell(new Failure(ex), shard.self());
138             return false;
139         }
140     }
141
142     /**
143      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
144      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
145      *
146      * @param ready the ForwardedReadyTransaction message to process
147      * @param sender the sender of the message
148      * @param shard the transaction's shard actor
149      * @param schema
150      */
151     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
152             SchemaContext schema) {
153         log.debug("{}: Readying transaction {}, client version {}", name,
154                 ready.getTransactionID(), ready.getTxnClientVersion());
155
156         final ShardDataTreeCohort cohort = ready.getTransaction().ready();
157         final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
158         cohortCache.put(ready.getTransactionID(), cohortEntry);
159
160         if(!queueCohortEntry(cohortEntry, sender, shard)) {
161             return;
162         }
163
164         if(ready.isDoImmediateCommit()) {
165             cohortEntry.setDoImmediateCommit(true);
166             cohortEntry.setReplySender(sender);
167             cohortEntry.setShard(shard);
168             handleCanCommit(cohortEntry);
169         } else {
170             // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
171             // front-end so send back a ReadyTransactionReply with our actor path.
172             sender.tell(readyTransactionReply(shard), shard.self());
173         }
174     }
175
176     /**
177      * This method handles a BatchedModifications message for a transaction being prepared directly on the
178      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
179      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
180      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
181      *
182      * @param batched the BatchedModifications message to process
183      * @param sender the sender of the message
184      * @param shard the transaction's shard actor
185      */
186     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
187         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
188         if(cohortEntry == null) {
189             cohortEntry = new CohortEntry(batched.getTransactionID(),
190                     dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
191                     cohortRegistry, schema,  batched.getVersion());
192             cohortCache.put(batched.getTransactionID(), cohortEntry);
193         }
194
195         if(log.isDebugEnabled()) {
196             log.debug("{}: Applying {} batched modifications for Tx {}", name,
197                     batched.getModifications().size(), batched.getTransactionID());
198         }
199
200         cohortEntry.applyModifications(batched.getModifications());
201
202         if(batched.isReady()) {
203             if(cohortEntry.getLastBatchedModificationsException() != null) {
204                 cohortCache.remove(cohortEntry.getTransactionID());
205                 throw cohortEntry.getLastBatchedModificationsException();
206             }
207
208             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
209                 cohortCache.remove(cohortEntry.getTransactionID());
210                 throw new IllegalStateException(String.format(
211                         "The total number of batched messages received %d does not match the number sent %d",
212                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
213             }
214
215             if(!queueCohortEntry(cohortEntry, sender, shard)) {
216                 return;
217             }
218
219             if(log.isDebugEnabled()) {
220                 log.debug("{}: Readying Tx {}, client version {}", name,
221                         batched.getTransactionID(), batched.getVersion());
222             }
223
224             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
225
226             if(batched.isDoCommitOnReady()) {
227                 cohortEntry.setReplySender(sender);
228                 cohortEntry.setShard(shard);
229                 handleCanCommit(cohortEntry);
230             } else {
231                 sender.tell(readyTransactionReply(shard), shard.self());
232             }
233         } else {
234             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
235         }
236     }
237
238     /**
239      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
240      * been prepared beforehand by the sender and we just need to drive them through into the
241      * dataTree.
242      *
243      * @param message the ReadyLocalTransaction message to process
244      * @param sender the sender of the message
245      * @param shard the transaction's shard actor
246      */
247     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard,
248             SchemaContext schema) {
249         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
250                 message.getTransactionID());
251         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
252                 DataStoreVersions.CURRENT_VERSION);
253         cohortCache.put(message.getTransactionID(), cohortEntry);
254         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
255
256         if(!queueCohortEntry(cohortEntry, sender, shard)) {
257             return;
258         }
259
260         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
261
262         if (message.isDoCommitOnReady()) {
263             cohortEntry.setReplySender(sender);
264             cohortEntry.setShard(shard);
265             handleCanCommit(cohortEntry);
266         } else {
267             sender.tell(readyTransactionReply(shard), shard.self());
268         }
269     }
270
271     Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
272             final int maxModificationsPerBatch) {
273         CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
274         if(cohortEntry == null || cohortEntry.getTransaction() == null) {
275             return Collections.singletonList(from);
276         }
277
278         cohortEntry.applyModifications(from.getModifications());
279
280         final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
281         cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
282             @Override
283             protected BatchedModifications getModifications() {
284                 if(newModifications.isEmpty() ||
285                         newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
286                     newModifications.add(new BatchedModifications(from.getTransactionID(),
287                             from.getVersion(), from.getTransactionChainID()));
288                 }
289
290                 return newModifications.getLast();
291             }
292         });
293
294         BatchedModifications last = newModifications.getLast();
295         last.setDoCommitOnReady(from.isDoCommitOnReady());
296         last.setReady(from.isReady());
297         last.setTotalMessagesSent(newModifications.size());
298         return newModifications;
299     }
300
301     private void handleCanCommit(CohortEntry cohortEntry) {
302         String transactionID = cohortEntry.getTransactionID();
303
304         cohortEntry.updateLastAccessTime();
305
306         if(currentCohortEntry != null) {
307             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
308             // queue and will get processed after all prior entries complete.
309
310             if(log.isDebugEnabled()) {
311                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
312                         name, currentCohortEntry.getTransactionID(), transactionID);
313             }
314
315             return;
316         }
317
318         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
319         // it the current entry and proceed with canCommit.
320         // Purposely checking reference equality here.
321         if(queuedCohortEntries.peek() == cohortEntry) {
322             currentCohortEntry = queuedCohortEntries.poll();
323             doCanCommit(currentCohortEntry);
324         } else {
325             if(log.isDebugEnabled()) {
326                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
327                         queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
328                                 transactionID);
329             }
330         }
331     }
332
333     /**
334      * This method handles the canCommit phase for a transaction.
335      *
336      * @param transactionID the ID of the transaction to canCommit
337      * @param sender the actor to which to send the response
338      * @param shard the transaction's shard actor
339      */
340     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
341         // Lookup the cohort entry that was cached previously (or should have been) by
342         // transactionReady (via the ForwardedReadyTransaction message).
343         final CohortEntry cohortEntry = cohortCache.get(transactionID);
344         if(cohortEntry == null) {
345             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
346             // between canCommit and ready and the entry was expired from the cache.
347             IllegalStateException ex = new IllegalStateException(
348                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
349             log.error(ex.getMessage());
350             sender.tell(new Failure(ex), shard.self());
351             return;
352         }
353
354         cohortEntry.setReplySender(sender);
355         cohortEntry.setShard(shard);
356
357         handleCanCommit(cohortEntry);
358     }
359
360     private void doCanCommit(final CohortEntry cohortEntry) {
361         boolean canCommit = false;
362         try {
363             canCommit = cohortEntry.canCommit();
364
365             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
366
367             if(cohortEntry.isDoImmediateCommit()) {
368                 if(canCommit) {
369                     doCommit(cohortEntry);
370                 } else {
371                     cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
372                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
373                 }
374             } else {
375                 cohortEntry.getReplySender().tell(
376                         canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
377                             CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
378                         cohortEntry.getShard().self());
379             }
380         } catch (Exception e) {
381             log.debug("{}: An exception occurred during canCommit", name, e);
382
383             Throwable failure = e;
384             if(e instanceof ExecutionException) {
385                 failure = e.getCause();
386             }
387
388             cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
389         } finally {
390             if(!canCommit) {
391                 // Remove the entry from the cache now.
392                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
393             }
394         }
395     }
396
397     private boolean doCommit(CohortEntry cohortEntry) {
398         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
399
400         boolean success = false;
401
402         // We perform the preCommit phase here atomically with the commit phase. This is an
403         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
404         // coordination of preCommit across shards in case of failure but preCommit should not
405         // normally fail since we ensure only one concurrent 3-phase commit.
406
407         try {
408             cohortEntry.preCommit();
409
410             cohortEntry.getShard().continueCommit(cohortEntry);
411
412             cohortEntry.updateLastAccessTime();
413
414             success = true;
415         } catch (Exception e) {
416             log.error("{} An exception occurred while preCommitting transaction {}",
417                     name, cohortEntry.getTransactionID(), e);
418             cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
419
420             currentTransactionComplete(cohortEntry.getTransactionID(), true);
421         }
422
423         return success;
424     }
425
426     /**
427      * This method handles the preCommit and commit phases for a transaction.
428      *
429      * @param transactionID the ID of the transaction to commit
430      * @param sender the actor to which to send the response
431      * @param shard the transaction's shard actor
432      * @return true if the transaction was successfully prepared, false otherwise.
433      */
434     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
435         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
436         // this transaction.
437         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
438         if(cohortEntry == null) {
439             // We're not the current Tx - the Tx was likely expired b/c it took too long in
440             // between the canCommit and commit messages.
441             IllegalStateException ex = new IllegalStateException(
442                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
443                             name, transactionID));
444             log.error(ex.getMessage());
445             sender.tell(new Failure(ex), shard.self());
446             return false;
447         }
448
449         cohortEntry.setReplySender(sender);
450         return doCommit(cohortEntry);
451     }
452
453     void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
454         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
455         if(cohortEntry != null) {
456             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
457             // aborted during replication in which case we may still commit locally if replication
458             // succeeds.
459             currentTransactionComplete(transactionID, false);
460         } else {
461             cohortEntry = getAndRemoveCohortEntry(transactionID);
462         }
463
464         if(cohortEntry == null) {
465             return;
466         }
467
468         log.debug("{}: Aborting transaction {}", name, transactionID);
469
470         final ActorRef self = shard.getSelf();
471         try {
472             cohortEntry.abort();
473
474             shard.getShardMBean().incrementAbortTransactionsCount();
475
476             if(sender != null) {
477                 sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
478             }
479         } catch (Exception e) {
480             log.error("{}: An exception happened during abort", name, e);
481
482             if(sender != null) {
483                 sender.tell(new Failure(e), self);
484             }
485         }
486     }
487
488     void checkForExpiredTransactions(final long timeout, final Shard shard) {
489         CohortEntry cohortEntry = getCurrentCohortEntry();
490         if(cohortEntry != null) {
491             if(cohortEntry.isExpired(timeout)) {
492                 log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
493                         name, cohortEntry.getTransactionID(), timeout);
494
495                 handleAbort(cohortEntry.getTransactionID(), null, shard);
496             }
497         }
498
499         cleanupExpiredCohortEntries();
500     }
501
502     void abortPendingTransactions(final String reason, final Shard shard) {
503         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
504             return;
505         }
506
507         List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
508
509         log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
510
511         for(CohortEntry cohortEntry: cohortEntries) {
512             if(cohortEntry.getReplySender() != null) {
513                 cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
514             }
515         }
516     }
517
518     private List<CohortEntry> getAndClearPendingCohortEntries() {
519         List<CohortEntry> cohortEntries = new ArrayList<>();
520
521         if(currentCohortEntry != null) {
522             cohortEntries.add(currentCohortEntry);
523             cohortCache.remove(currentCohortEntry.getTransactionID());
524             currentCohortEntry = null;
525         }
526
527         for(CohortEntry cohortEntry: queuedCohortEntries) {
528             cohortEntries.add(cohortEntry);
529             cohortCache.remove(cohortEntry.getTransactionID());
530         }
531
532         queuedCohortEntries.clear();
533         return cohortEntries;
534     }
535
536     Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
537         if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
538             return Collections.emptyList();
539         }
540
541         Collection<Object> messages = new ArrayList<>();
542         List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
543         for(CohortEntry cohortEntry: cohortEntries) {
544             if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
545                 continue;
546             }
547
548             final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
549             cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
550                 @Override
551                 protected BatchedModifications getModifications() {
552                     if(newModifications.isEmpty() ||
553                             newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
554                         newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
555                                 cohortEntry.getClientVersion(), ""));
556         }
557
558                     return newModifications.getLast();
559                 }
560             });
561
562             if(!newModifications.isEmpty()) {
563                 BatchedModifications last = newModifications.getLast();
564                 last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
565                 last.setReady(true);
566                 last.setTotalMessagesSent(newModifications.size());
567                 messages.addAll(newModifications);
568
569                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.CAN_COMMITTED) {
570                     messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
571                             cohortEntry.getClientVersion()));
572                 }
573
574                 if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.PRE_COMMITTED) {
575                     messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
576                             cohortEntry.getClientVersion()));
577                 }
578             }
579         }
580
581         return messages;
582     }
583
584     /**
585      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
586      * matches the current entry.
587      *
588      * @param transactionID the ID of the transaction
589      * @return the current CohortEntry or null if the given transaction ID does not match the
590      *         current entry.
591      */
592     CohortEntry getCohortEntryIfCurrent(String transactionID) {
593         if(isCurrentTransaction(transactionID)) {
594             return currentCohortEntry;
595         }
596
597         return null;
598     }
599
600     CohortEntry getCurrentCohortEntry() {
601         return currentCohortEntry;
602     }
603
604     CohortEntry getAndRemoveCohortEntry(String transactionID) {
605         return cohortCache.remove(transactionID);
606     }
607
608     boolean isCurrentTransaction(String transactionID) {
609         return currentCohortEntry != null &&
610                 currentCohortEntry.getTransactionID().equals(transactionID);
611     }
612
613     /**
614      * This method is called when a transaction is complete, successful or not. If the given
615      * given transaction ID matches the current in-progress transaction, the next cohort entry,
616      * if any, is dequeued and processed.
617      *
618      * @param transactionID the ID of the completed transaction
619      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
620      *        the cache.
621      */
622     void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
623         if(removeCohortEntry) {
624             cohortCache.remove(transactionID);
625         }
626
627         if(isCurrentTransaction(transactionID)) {
628             currentCohortEntry = null;
629
630             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
631
632             maybeProcessNextCohortEntry();
633         }
634     }
635
636     private void maybeProcessNextCohortEntry() {
637         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
638         // clean out expired entries.
639         final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
640         while(iter.hasNext()) {
641             final CohortEntry next = iter.next();
642             if(next.isReadyToCommit()) {
643                 if(currentCohortEntry == null) {
644                     if(log.isDebugEnabled()) {
645                         log.debug("{}: Next entry to canCommit {}", name, next);
646                     }
647
648                     iter.remove();
649                     currentCohortEntry = next;
650                     currentCohortEntry.updateLastAccessTime();
651                     doCanCommit(currentCohortEntry);
652                 }
653
654                 break;
655             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
656                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
657                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
658             } else if(!next.isAborted()) {
659                 break;
660             }
661
662             iter.remove();
663             cohortCache.remove(next.getTransactionID());
664         }
665
666         maybeRunOperationOnPendingTransactionsComplete();
667     }
668
669     void cleanupExpiredCohortEntries() {
670         maybeProcessNextCohortEntry();
671     }
672
673     void setRunOnPendingTransactionsComplete(Runnable operation) {
674         runOnPendingTransactionsComplete = operation;
675         maybeRunOperationOnPendingTransactionsComplete();
676     }
677
678     private void maybeRunOperationOnPendingTransactionsComplete() {
679         if(runOnPendingTransactionsComplete != null && currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
680             log.debug("{}: Pending transactions complete - running operation {}", name, runOnPendingTransactionsComplete);
681
682             runOnPendingTransactionsComplete.run();
683             runOnPendingTransactionsComplete = null;
684         }
685     }
686
687     @VisibleForTesting
688     void setCohortDecorator(CohortDecorator cohortDecorator) {
689         this.cohortDecorator = cohortDecorator;
690     }
691
692    void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
693         cohortRegistry.process(sender, message);
694     }
695
696     static class CohortEntry {
697         enum State {
698             PENDING,
699             CAN_COMMITTED,
700             PRE_COMMITTED,
701             COMMITTED,
702             ABORTED
703     }
704
705         private final String transactionID;
706         private ShardDataTreeCohort cohort;
707         private final ReadWriteShardDataTreeTransaction transaction;
708         private RuntimeException lastBatchedModificationsException;
709         private ActorRef replySender;
710         private Shard shard;
711         private boolean doImmediateCommit;
712         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
713         private int totalBatchedModificationsReceived;
714         private State state = State.PENDING;
715         private final short clientVersion;
716         private final CompositeDataTreeCohort userCohorts;
717
718         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
719                 DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
720             this.transaction = Preconditions.checkNotNull(transaction);
721             this.transactionID = transactionID;
722             this.clientVersion = clientVersion;
723             this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
724         }
725
726         CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
727                 SchemaContext schema, short clientVersion) {
728             this.transactionID = transactionID;
729             this.cohort = cohort;
730             this.transaction = null;
731             this.clientVersion = clientVersion;
732             this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
733         }
734
735         void updateLastAccessTime() {
736             lastAccessTimer.reset();
737             lastAccessTimer.start();
738         }
739
740         String getTransactionID() {
741             return transactionID;
742         }
743
744         short getClientVersion() {
745             return clientVersion;
746         }
747
748         State getState() {
749             return state;
750         }
751
752         DataTreeCandidate getCandidate() {
753             return cohort.getCandidate();
754         }
755
756         DataTreeModification getDataTreeModification() {
757             return cohort.getDataTreeModification();
758         }
759
760         ReadWriteShardDataTreeTransaction getTransaction() {
761             return transaction;
762         }
763
764         int getTotalBatchedModificationsReceived() {
765             return totalBatchedModificationsReceived;
766         }
767
768         RuntimeException getLastBatchedModificationsException() {
769             return lastBatchedModificationsException;
770         }
771
772         void applyModifications(Iterable<Modification> modifications) {
773             totalBatchedModificationsReceived++;
774             if(lastBatchedModificationsException == null) {
775                 for (Modification modification : modifications) {
776                         try {
777                             modification.apply(transaction.getSnapshot());
778                         } catch (RuntimeException e) {
779                             lastBatchedModificationsException = e;
780                             throw e;
781                         }
782                 }
783             }
784         }
785
786         boolean canCommit() throws InterruptedException, ExecutionException {
787             state = State.CAN_COMMITTED;
788
789             // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
790             // about possibly accessing our state on a different thread outside of our dispatcher.
791             // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
792             // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
793             // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
794             return cohort.canCommit().get();
795         }
796
797
798
799         void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
800             state = State.PRE_COMMITTED;
801             cohort.preCommit().get();
802             userCohorts.canCommit(cohort.getCandidate());
803             userCohorts.preCommit();
804         }
805
806         void commit() throws InterruptedException, ExecutionException, TimeoutException {
807             state = State.COMMITTED;
808             cohort.commit().get();
809             userCohorts.commit();
810         }
811
812         void abort() throws InterruptedException, ExecutionException, TimeoutException {
813             state = State.ABORTED;
814             cohort.abort().get();
815             userCohorts.abort();
816         }
817
818         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
819             Preconditions.checkState(cohort == null, "cohort was already set");
820
821             setDoImmediateCommit(doImmediateCommit);
822
823             cohort = transaction.ready();
824
825             if(cohortDecorator != null) {
826                 // Call the hook for unit tests.
827                 cohort = cohortDecorator.decorate(transactionID, cohort);
828             }
829         }
830
831         boolean isReadyToCommit() {
832             return replySender != null;
833         }
834
835         boolean isExpired(long expireTimeInMillis) {
836             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
837         }
838
839         boolean isDoImmediateCommit() {
840             return doImmediateCommit;
841         }
842
843         void setDoImmediateCommit(boolean doImmediateCommit) {
844             this.doImmediateCommit = doImmediateCommit;
845         }
846
847         ActorRef getReplySender() {
848             return replySender;
849         }
850
851         void setReplySender(ActorRef replySender) {
852             this.replySender = replySender;
853         }
854
855         Shard getShard() {
856             return shard;
857         }
858
859         void setShard(Shard shard) {
860             this.shard = shard;
861         }
862
863
864         boolean isAborted() {
865             return state == State.ABORTED;
866         }
867
868         @Override
869         public String toString() {
870             final StringBuilder builder = new StringBuilder();
871             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
872                     .append(doImmediateCommit).append("]");
873             return builder.toString();
874         }
875     }
876 }