CDS: Add pending tx queue size to ShardStats
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.Map;
20 import java.util.Queue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.modification.Modification;
31 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.slf4j.Logger;
34
35 /**
36  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
37  *
38  * @author Thomas Pantelis
39  */
40 class ShardCommitCoordinator {
41
42     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
43     public interface CohortDecorator {
44         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
45     }
46
47     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
48
49     private CohortEntry currentCohortEntry;
50
51     private final ShardDataTree dataTree;
52
53     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
54     // since this should only be accessed on the shard's dispatcher.
55     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
56
57     private int queueCapacity;
58
59     private final Logger log;
60
61     private final String name;
62
63     private final long cacheExpiryTimeoutInMillis;
64
65     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
66     private CohortDecorator cohortDecorator;
67
68     private ReadyTransactionReply readyTransactionReply;
69
70     ShardCommitCoordinator(ShardDataTree dataTree,
71             long cacheExpiryTimeoutInMillis, int queueCapacity, ActorRef shardActor, Logger log, String name) {
72
73         this.queueCapacity = queueCapacity;
74         this.log = log;
75         this.name = name;
76         this.dataTree = Preconditions.checkNotNull(dataTree);
77         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
78     }
79
80     int getQueueSize() {
81         return queuedCohortEntries.size();
82     }
83
84     void setQueueCapacity(int queueCapacity) {
85         this.queueCapacity = queueCapacity;
86     }
87
88     private ReadyTransactionReply readyTransactionReply(Shard shard) {
89         if(readyTransactionReply == null) {
90             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
91         }
92
93         return readyTransactionReply;
94     }
95
96     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
97         if(queuedCohortEntries.size() < queueCapacity) {
98             queuedCohortEntries.offer(cohortEntry);
99             return true;
100         } else {
101             cohortCache.remove(cohortEntry.getTransactionID());
102
103             RuntimeException ex = new RuntimeException(
104                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
105                                   " capacity %d has been reached.",
106                                   name, cohortEntry.getTransactionID(), queueCapacity));
107             log.error(ex.getMessage());
108             sender.tell(new Status.Failure(ex), shard.self());
109             return false;
110         }
111     }
112
113     /**
114      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
115      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
116      *
117      * @param ready the ForwardedReadyTransaction message to process
118      * @param sender the sender of the message
119      * @param shard the transaction's shard actor
120      */
121     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
122         log.debug("{}: Readying transaction {}, client version {}", name,
123                 ready.getTransactionID(), ready.getTxnClientVersion());
124
125         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
126                 (MutableCompositeModification) ready.getModification());
127         cohortCache.put(ready.getTransactionID(), cohortEntry);
128
129         if(!queueCohortEntry(cohortEntry, sender, shard)) {
130             return;
131         }
132
133         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
134             // Return our actor path as we'll handle the three phase commit except if the Tx client
135             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
136             // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
137             // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
138             ActorRef replyActorPath = shard.self();
139             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
140                 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
141                 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
142                         ready.getTransactionID()));
143             }
144
145             ReadyTransactionReply readyTransactionReply =
146                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
147                             ready.getTxnClientVersion());
148             sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
149                 readyTransactionReply, shard.self());
150         } else {
151             if(ready.isDoImmediateCommit()) {
152                 cohortEntry.setDoImmediateCommit(true);
153                 cohortEntry.setReplySender(sender);
154                 cohortEntry.setShard(shard);
155                 handleCanCommit(cohortEntry);
156             } else {
157                 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
158                 // front-end so send back a ReadyTransactionReply with our actor path.
159                 sender.tell(readyTransactionReply(shard), shard.self());
160             }
161         }
162     }
163
164     /**
165      * This method handles a BatchedModifications message for a transaction being prepared directly on the
166      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
167      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
168      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
169      *
170      * @param batched the BatchedModifications message to process
171      * @param sender the sender of the message
172      * @param shard the transaction's shard actor
173      */
174     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
175         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
176         if(cohortEntry == null) {
177             cohortEntry = new CohortEntry(batched.getTransactionID(),
178                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
179                         batched.getTransactionChainID()));
180             cohortCache.put(batched.getTransactionID(), cohortEntry);
181         }
182
183         if(log.isDebugEnabled()) {
184             log.debug("{}: Applying {} batched modifications for Tx {}", name,
185                     batched.getModifications().size(), batched.getTransactionID());
186         }
187
188         cohortEntry.applyModifications(batched.getModifications());
189
190         if(batched.isReady()) {
191             if(cohortEntry.getLastBatchedModificationsException() != null) {
192                 cohortCache.remove(cohortEntry.getTransactionID());
193                 throw cohortEntry.getLastBatchedModificationsException();
194             }
195
196             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
197                 cohortCache.remove(cohortEntry.getTransactionID());
198                 throw new IllegalStateException(String.format(
199                         "The total number of batched messages received %d does not match the number sent %d",
200                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
201             }
202
203             if(!queueCohortEntry(cohortEntry, sender, shard)) {
204                 return;
205             }
206
207             if(log.isDebugEnabled()) {
208                 log.debug("{}: Readying Tx {}, client version {}", name,
209                         batched.getTransactionID(), batched.getVersion());
210             }
211
212             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
213
214             if(batched.isDoCommitOnReady()) {
215                 cohortEntry.setReplySender(sender);
216                 cohortEntry.setShard(shard);
217                 handleCanCommit(cohortEntry);
218             } else {
219                 sender.tell(readyTransactionReply(shard), shard.self());
220             }
221         } else {
222             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
223         }
224     }
225
226     /**
227      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
228      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
229      *
230      * @param message the ReadyLocalTransaction message to process
231      * @param sender the sender of the message
232      * @param shard the transaction's shard actor
233      */
234     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
235         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
236                 message.getTransactionID());
237         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
238         cohortCache.put(message.getTransactionID(), cohortEntry);
239         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
240
241         if(!queueCohortEntry(cohortEntry, sender, shard)) {
242             return;
243         }
244
245         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
246
247         if (message.isDoCommitOnReady()) {
248             cohortEntry.setReplySender(sender);
249             cohortEntry.setShard(shard);
250             handleCanCommit(cohortEntry);
251         } else {
252             sender.tell(readyTransactionReply(shard), shard.self());
253         }
254     }
255
256     private void handleCanCommit(CohortEntry cohortEntry) {
257         String transactionID = cohortEntry.getTransactionID();
258
259         cohortEntry.updateLastAccessTime();
260
261         if(currentCohortEntry != null) {
262             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
263             // queue and will get processed after all prior entries complete.
264
265             if(log.isDebugEnabled()) {
266                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
267                         name, currentCohortEntry.getTransactionID(), transactionID);
268             }
269
270             return;
271         }
272
273         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
274         // it the current entry and proceed with canCommit.
275         // Purposely checking reference equality here.
276         if(queuedCohortEntries.peek() == cohortEntry) {
277             currentCohortEntry = queuedCohortEntries.poll();
278             doCanCommit(currentCohortEntry);
279         } else {
280             if(log.isDebugEnabled()) {
281                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
282                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
283             }
284         }
285     }
286
287     /**
288      * This method handles the canCommit phase for a transaction.
289      *
290      * @param transactionID the ID of the transaction to canCommit
291      * @param sender the actor to which to send the response
292      * @param shard the transaction's shard actor
293      */
294     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
295         // Lookup the cohort entry that was cached previously (or should have been) by
296         // transactionReady (via the ForwardedReadyTransaction message).
297         final CohortEntry cohortEntry = cohortCache.get(transactionID);
298         if(cohortEntry == null) {
299             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
300             // between canCommit and ready and the entry was expired from the cache.
301             IllegalStateException ex = new IllegalStateException(
302                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
303             log.error(ex.getMessage());
304             sender.tell(new Status.Failure(ex), shard.self());
305             return;
306         }
307
308         cohortEntry.setReplySender(sender);
309         cohortEntry.setShard(shard);
310
311         handleCanCommit(cohortEntry);
312     }
313
314     private void doCanCommit(final CohortEntry cohortEntry) {
315         boolean canCommit = false;
316         try {
317             // We block on the future here so we don't have to worry about possibly accessing our
318             // state on a different thread outside of our dispatcher. Also, the data store
319             // currently uses a same thread executor anyway.
320             canCommit = cohortEntry.getCohort().canCommit().get();
321
322             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
323
324             if(cohortEntry.isDoImmediateCommit()) {
325                 if(canCommit) {
326                     doCommit(cohortEntry);
327                 } else {
328                     cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
329                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
330                 }
331             } else {
332                 cohortEntry.getReplySender().tell(
333                         canCommit ? CanCommitTransactionReply.YES.toSerializable() :
334                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
335             }
336         } catch (Exception e) {
337             log.debug("{}: An exception occurred during canCommit", name, e);
338
339             Throwable failure = e;
340             if(e instanceof ExecutionException) {
341                 failure = e.getCause();
342             }
343
344             cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
345         } finally {
346             if(!canCommit) {
347                 // Remove the entry from the cache now.
348                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
349             }
350         }
351     }
352
353     private boolean doCommit(CohortEntry cohortEntry) {
354         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
355
356         boolean success = false;
357
358         // We perform the preCommit phase here atomically with the commit phase. This is an
359         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
360         // coordination of preCommit across shards in case of failure but preCommit should not
361         // normally fail since we ensure only one concurrent 3-phase commit.
362
363         try {
364             // We block on the future here so we don't have to worry about possibly accessing our
365             // state on a different thread outside of our dispatcher. Also, the data store
366             // currently uses a same thread executor anyway.
367             cohortEntry.getCohort().preCommit().get();
368
369             cohortEntry.getShard().continueCommit(cohortEntry);
370
371             cohortEntry.updateLastAccessTime();
372
373             success = true;
374         } catch (Exception e) {
375             log.error("{} An exception occurred while preCommitting transaction {}",
376                     name, cohortEntry.getTransactionID(), e);
377             cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
378
379             currentTransactionComplete(cohortEntry.getTransactionID(), true);
380         }
381
382         return success;
383     }
384
385     /**
386      * This method handles the preCommit and commit phases for a transaction.
387      *
388      * @param transactionID the ID of the transaction to commit
389      * @param sender the actor to which to send the response
390      * @param shard the transaction's shard actor
391      * @return true if the transaction was successfully prepared, false otherwise.
392      */
393     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
394         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
395         // this transaction.
396         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
397         if(cohortEntry == null) {
398             // We're not the current Tx - the Tx was likely expired b/c it took too long in
399             // between the canCommit and commit messages.
400             IllegalStateException ex = new IllegalStateException(
401                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
402                             name, transactionID));
403             log.error(ex.getMessage());
404             sender.tell(new akka.actor.Status.Failure(ex), shard.self());
405             return false;
406         }
407
408         cohortEntry.setReplySender(sender);
409         return doCommit(cohortEntry);
410     }
411
412     /**
413      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
414      * matches the current entry.
415      *
416      * @param transactionID the ID of the transaction
417      * @return the current CohortEntry or null if the given transaction ID does not match the
418      *         current entry.
419      */
420     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
421         if(isCurrentTransaction(transactionID)) {
422             return currentCohortEntry;
423         }
424
425         return null;
426     }
427
428     public CohortEntry getCurrentCohortEntry() {
429         return currentCohortEntry;
430     }
431
432     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
433         return cohortCache.remove(transactionID);
434     }
435
436     public boolean isCurrentTransaction(String transactionID) {
437         return currentCohortEntry != null &&
438                 currentCohortEntry.getTransactionID().equals(transactionID);
439     }
440
441     /**
442      * This method is called when a transaction is complete, successful or not. If the given
443      * given transaction ID matches the current in-progress transaction, the next cohort entry,
444      * if any, is dequeued and processed.
445      *
446      * @param transactionID the ID of the completed transaction
447      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
448      *        the cache.
449      */
450     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
451         if(removeCohortEntry) {
452             cohortCache.remove(transactionID);
453         }
454
455         if(isCurrentTransaction(transactionID)) {
456             currentCohortEntry = null;
457
458             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
459
460             maybeProcessNextCohortEntry();
461         }
462     }
463
464     private void maybeProcessNextCohortEntry() {
465         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
466         // clean out expired entries.
467         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
468         while(iter.hasNext()) {
469             CohortEntry next = iter.next();
470             if(next.isReadyToCommit()) {
471                 if(currentCohortEntry == null) {
472                     if(log.isDebugEnabled()) {
473                         log.debug("{}: Next entry to canCommit {}", name, next);
474                     }
475
476                     iter.remove();
477                     currentCohortEntry = next;
478                     currentCohortEntry.updateLastAccessTime();
479                     doCanCommit(currentCohortEntry);
480                 }
481
482                 break;
483             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
484                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
485                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
486
487                 iter.remove();
488                 cohortCache.remove(next.getTransactionID());
489             } else {
490                 break;
491             }
492         }
493     }
494
495     void cleanupExpiredCohortEntries() {
496         maybeProcessNextCohortEntry();
497     }
498
499     @VisibleForTesting
500     void setCohortDecorator(CohortDecorator cohortDecorator) {
501         this.cohortDecorator = cohortDecorator;
502     }
503
504     static class CohortEntry {
505         private final String transactionID;
506         private ShardDataTreeCohort cohort;
507         private final ReadWriteShardDataTreeTransaction transaction;
508         private RuntimeException lastBatchedModificationsException;
509         private ActorRef replySender;
510         private Shard shard;
511         private boolean doImmediateCommit;
512         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
513         private int totalBatchedModificationsReceived;
514
515         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
516             this.transaction = Preconditions.checkNotNull(transaction);
517             this.transactionID = transactionID;
518         }
519
520         CohortEntry(String transactionID, ShardDataTreeCohort cohort,
521                 MutableCompositeModification compositeModification) {
522             this.transactionID = transactionID;
523             this.cohort = cohort;
524             this.transaction = null;
525         }
526
527         CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
528             this.transactionID = transactionID;
529             this.cohort = cohort;
530             this.transaction = null;
531         }
532
533         void updateLastAccessTime() {
534             lastAccessTimer.reset();
535             lastAccessTimer.start();
536         }
537
538         String getTransactionID() {
539             return transactionID;
540         }
541
542         ShardDataTreeCohort getCohort() {
543             return cohort;
544         }
545
546         int getTotalBatchedModificationsReceived() {
547             return totalBatchedModificationsReceived;
548         }
549
550         RuntimeException getLastBatchedModificationsException() {
551             return lastBatchedModificationsException;
552         }
553
554         void applyModifications(Iterable<Modification> modifications) {
555             totalBatchedModificationsReceived++;
556             if(lastBatchedModificationsException == null) {
557                 for (Modification modification : modifications) {
558                         try {
559                             modification.apply(transaction.getSnapshot());
560                         } catch (RuntimeException e) {
561                             lastBatchedModificationsException = e;
562                             throw e;
563                         }
564                 }
565             }
566         }
567
568         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
569             Preconditions.checkState(cohort == null, "cohort was already set");
570
571             setDoImmediateCommit(doImmediateCommit);
572
573             cohort = transaction.ready();
574
575             if(cohortDecorator != null) {
576                 // Call the hook for unit tests.
577                 cohort = cohortDecorator.decorate(transactionID, cohort);
578             }
579         }
580
581         boolean isReadyToCommit() {
582             return replySender != null;
583         }
584
585         boolean isExpired(long expireTimeInMillis) {
586             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
587         }
588
589         boolean isDoImmediateCommit() {
590             return doImmediateCommit;
591         }
592
593         void setDoImmediateCommit(boolean doImmediateCommit) {
594             this.doImmediateCommit = doImmediateCommit;
595         }
596
597         ActorRef getReplySender() {
598             return replySender;
599         }
600
601         void setReplySender(ActorRef replySender) {
602             this.replySender = replySender;
603         }
604
605         Shard getShard() {
606             return shard;
607         }
608
609         void setShard(Shard shard) {
610             this.shard = shard;
611         }
612
613         @Override
614         public String toString() {
615             StringBuilder builder = new StringBuilder();
616             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
617                     .append(doImmediateCommit).append("]");
618             return builder.toString();
619         }
620     }
621 }