Bug 3195: Cleanup on error paths and error handling
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.Map;
20 import java.util.Queue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.modification.Modification;
31 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.slf4j.Logger;
34
35 /**
36  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
37  *
38  * @author Thomas Pantelis
39  */
40 class ShardCommitCoordinator {
41
42     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
43     public interface CohortDecorator {
44         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
45     }
46
47     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
48
49     private CohortEntry currentCohortEntry;
50
51     private final ShardDataTree dataTree;
52
53     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
54     // since this should only be accessed on the shard's dispatcher.
55     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
56
57     private int queueCapacity;
58
59     private final Logger log;
60
61     private final String name;
62
63     private final long cacheExpiryTimeoutInMillis;
64
65     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
66     private CohortDecorator cohortDecorator;
67
68     private ReadyTransactionReply readyTransactionReply;
69
70     ShardCommitCoordinator(ShardDataTree dataTree,
71             long cacheExpiryTimeoutInMillis, int queueCapacity, ActorRef shardActor, Logger log, String name) {
72
73         this.queueCapacity = queueCapacity;
74         this.log = log;
75         this.name = name;
76         this.dataTree = Preconditions.checkNotNull(dataTree);
77         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
78     }
79
80     void setQueueCapacity(int queueCapacity) {
81         this.queueCapacity = queueCapacity;
82     }
83
84     private ReadyTransactionReply readyTransactionReply(Shard shard) {
85         if(readyTransactionReply == null) {
86             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
87         }
88
89         return readyTransactionReply;
90     }
91
92     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
93         if(queuedCohortEntries.size() < queueCapacity) {
94             queuedCohortEntries.offer(cohortEntry);
95             return true;
96         } else {
97             cohortCache.remove(cohortEntry.getTransactionID());
98
99             RuntimeException ex = new RuntimeException(
100                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
101                                   " capacity %d has been reached.",
102                                   name, cohortEntry.getTransactionID(), queueCapacity));
103             log.error(ex.getMessage());
104             sender.tell(new Status.Failure(ex), shard.self());
105             return false;
106         }
107     }
108
109     /**
110      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
111      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
112      *
113      * @param ready the ForwardedReadyTransaction message to process
114      * @param sender the sender of the message
115      * @param shard the transaction's shard actor
116      */
117     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
118         log.debug("{}: Readying transaction {}, client version {}", name,
119                 ready.getTransactionID(), ready.getTxnClientVersion());
120
121         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
122                 (MutableCompositeModification) ready.getModification());
123         cohortCache.put(ready.getTransactionID(), cohortEntry);
124
125         if(!queueCohortEntry(cohortEntry, sender, shard)) {
126             return;
127         }
128
129         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
130             // Return our actor path as we'll handle the three phase commit except if the Tx client
131             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
132             // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
133             // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
134             ActorRef replyActorPath = shard.self();
135             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
136                 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
137                 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
138                         ready.getTransactionID()));
139             }
140
141             ReadyTransactionReply readyTransactionReply =
142                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
143                             ready.getTxnClientVersion());
144             sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
145                 readyTransactionReply, shard.self());
146         } else {
147             if(ready.isDoImmediateCommit()) {
148                 cohortEntry.setDoImmediateCommit(true);
149                 cohortEntry.setReplySender(sender);
150                 cohortEntry.setShard(shard);
151                 handleCanCommit(cohortEntry);
152             } else {
153                 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
154                 // front-end so send back a ReadyTransactionReply with our actor path.
155                 sender.tell(readyTransactionReply(shard), shard.self());
156             }
157         }
158     }
159
160     /**
161      * This method handles a BatchedModifications message for a transaction being prepared directly on the
162      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
163      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
164      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
165      *
166      * @param batched the BatchedModifications message to process
167      * @param sender the sender of the message
168      * @param shard the transaction's shard actor
169      */
170     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
171         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
172         if(cohortEntry == null) {
173             cohortEntry = new CohortEntry(batched.getTransactionID(),
174                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
175                         batched.getTransactionChainID()));
176             cohortCache.put(batched.getTransactionID(), cohortEntry);
177         }
178
179         if(log.isDebugEnabled()) {
180             log.debug("{}: Applying {} batched modifications for Tx {}", name,
181                     batched.getModifications().size(), batched.getTransactionID());
182         }
183
184         cohortEntry.applyModifications(batched.getModifications());
185
186         if(batched.isReady()) {
187             if(cohortEntry.getLastBatchedModificationsException() != null) {
188                 cohortCache.remove(cohortEntry.getTransactionID());
189                 throw cohortEntry.getLastBatchedModificationsException();
190             }
191
192             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
193                 cohortCache.remove(cohortEntry.getTransactionID());
194                 throw new IllegalStateException(String.format(
195                         "The total number of batched messages received %d does not match the number sent %d",
196                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
197             }
198
199             if(!queueCohortEntry(cohortEntry, sender, shard)) {
200                 return;
201             }
202
203             if(log.isDebugEnabled()) {
204                 log.debug("{}: Readying Tx {}, client version {}", name,
205                         batched.getTransactionID(), batched.getVersion());
206             }
207
208             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
209
210             if(batched.isDoCommitOnReady()) {
211                 cohortEntry.setReplySender(sender);
212                 cohortEntry.setShard(shard);
213                 handleCanCommit(cohortEntry);
214             } else {
215                 sender.tell(readyTransactionReply(shard), shard.self());
216             }
217         } else {
218             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
219         }
220     }
221
222     /**
223      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
224      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
225      *
226      * @param message the ReadyLocalTransaction message to process
227      * @param sender the sender of the message
228      * @param shard the transaction's shard actor
229      */
230     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
231         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
232                 message.getTransactionID());
233         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
234         cohortCache.put(message.getTransactionID(), cohortEntry);
235         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
236
237         if(!queueCohortEntry(cohortEntry, sender, shard)) {
238             return;
239         }
240
241         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
242
243         if (message.isDoCommitOnReady()) {
244             cohortEntry.setReplySender(sender);
245             cohortEntry.setShard(shard);
246             handleCanCommit(cohortEntry);
247         } else {
248             sender.tell(readyTransactionReply(shard), shard.self());
249         }
250     }
251
252     private void handleCanCommit(CohortEntry cohortEntry) {
253         String transactionID = cohortEntry.getTransactionID();
254
255         cohortEntry.updateLastAccessTime();
256
257         if(currentCohortEntry != null) {
258             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
259             // queue and will get processed after all prior entries complete.
260
261             if(log.isDebugEnabled()) {
262                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
263                         name, currentCohortEntry.getTransactionID(), transactionID);
264             }
265
266             return;
267         }
268
269         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
270         // it the current entry and proceed with canCommit.
271         // Purposely checking reference equality here.
272         if(queuedCohortEntries.peek() == cohortEntry) {
273             currentCohortEntry = queuedCohortEntries.poll();
274             doCanCommit(currentCohortEntry);
275         } else {
276             if(log.isDebugEnabled()) {
277                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
278                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
279             }
280         }
281     }
282
283     /**
284      * This method handles the canCommit phase for a transaction.
285      *
286      * @param transactionID the ID of the transaction to canCommit
287      * @param sender the actor to which to send the response
288      * @param shard the transaction's shard actor
289      */
290     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
291         // Lookup the cohort entry that was cached previously (or should have been) by
292         // transactionReady (via the ForwardedReadyTransaction message).
293         final CohortEntry cohortEntry = cohortCache.get(transactionID);
294         if(cohortEntry == null) {
295             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
296             // between canCommit and ready and the entry was expired from the cache.
297             IllegalStateException ex = new IllegalStateException(
298                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
299             log.error(ex.getMessage());
300             sender.tell(new Status.Failure(ex), shard.self());
301             return;
302         }
303
304         cohortEntry.setReplySender(sender);
305         cohortEntry.setShard(shard);
306
307         handleCanCommit(cohortEntry);
308     }
309
310     private void doCanCommit(final CohortEntry cohortEntry) {
311         boolean canCommit = false;
312         try {
313             // We block on the future here so we don't have to worry about possibly accessing our
314             // state on a different thread outside of our dispatcher. Also, the data store
315             // currently uses a same thread executor anyway.
316             canCommit = cohortEntry.getCohort().canCommit().get();
317
318             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
319
320             if(cohortEntry.isDoImmediateCommit()) {
321                 if(canCommit) {
322                     doCommit(cohortEntry);
323                 } else {
324                     cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
325                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
326                 }
327             } else {
328                 cohortEntry.getReplySender().tell(
329                         canCommit ? CanCommitTransactionReply.YES.toSerializable() :
330                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
331             }
332         } catch (Exception e) {
333             log.debug("{}: An exception occurred during canCommit", name, e);
334
335             Throwable failure = e;
336             if(e instanceof ExecutionException) {
337                 failure = e.getCause();
338             }
339
340             cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
341         } finally {
342             if(!canCommit) {
343                 // Remove the entry from the cache now.
344                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
345             }
346         }
347     }
348
349     private boolean doCommit(CohortEntry cohortEntry) {
350         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
351
352         boolean success = false;
353
354         // We perform the preCommit phase here atomically with the commit phase. This is an
355         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
356         // coordination of preCommit across shards in case of failure but preCommit should not
357         // normally fail since we ensure only one concurrent 3-phase commit.
358
359         try {
360             // We block on the future here so we don't have to worry about possibly accessing our
361             // state on a different thread outside of our dispatcher. Also, the data store
362             // currently uses a same thread executor anyway.
363             cohortEntry.getCohort().preCommit().get();
364
365             cohortEntry.getShard().continueCommit(cohortEntry);
366
367             cohortEntry.updateLastAccessTime();
368
369             success = true;
370         } catch (Exception e) {
371             log.error("{} An exception occurred while preCommitting transaction {}",
372                     name, cohortEntry.getTransactionID(), e);
373             cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
374
375             currentTransactionComplete(cohortEntry.getTransactionID(), true);
376         }
377
378         return success;
379     }
380
381     /**
382      * This method handles the preCommit and commit phases for a transaction.
383      *
384      * @param transactionID the ID of the transaction to commit
385      * @param sender the actor to which to send the response
386      * @param shard the transaction's shard actor
387      * @return true if the transaction was successfully prepared, false otherwise.
388      */
389     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
390         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
391         // this transaction.
392         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
393         if(cohortEntry == null) {
394             // We're not the current Tx - the Tx was likely expired b/c it took too long in
395             // between the canCommit and commit messages.
396             IllegalStateException ex = new IllegalStateException(
397                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
398                             name, transactionID));
399             log.error(ex.getMessage());
400             sender.tell(new akka.actor.Status.Failure(ex), shard.self());
401             return false;
402         }
403
404         cohortEntry.setReplySender(sender);
405         return doCommit(cohortEntry);
406     }
407
408     /**
409      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
410      * matches the current entry.
411      *
412      * @param transactionID the ID of the transaction
413      * @return the current CohortEntry or null if the given transaction ID does not match the
414      *         current entry.
415      */
416     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
417         if(isCurrentTransaction(transactionID)) {
418             return currentCohortEntry;
419         }
420
421         return null;
422     }
423
424     public CohortEntry getCurrentCohortEntry() {
425         return currentCohortEntry;
426     }
427
428     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
429         return cohortCache.remove(transactionID);
430     }
431
432     public boolean isCurrentTransaction(String transactionID) {
433         return currentCohortEntry != null &&
434                 currentCohortEntry.getTransactionID().equals(transactionID);
435     }
436
437     /**
438      * This method is called when a transaction is complete, successful or not. If the given
439      * given transaction ID matches the current in-progress transaction, the next cohort entry,
440      * if any, is dequeued and processed.
441      *
442      * @param transactionID the ID of the completed transaction
443      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
444      *        the cache.
445      */
446     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
447         if(removeCohortEntry) {
448             cohortCache.remove(transactionID);
449         }
450
451         if(isCurrentTransaction(transactionID)) {
452             currentCohortEntry = null;
453
454             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
455
456             maybeProcessNextCohortEntry();
457         }
458     }
459
460     private void maybeProcessNextCohortEntry() {
461         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
462         // clean out expired entries.
463         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
464         while(iter.hasNext()) {
465             CohortEntry next = iter.next();
466             if(next.isReadyToCommit()) {
467                 if(currentCohortEntry == null) {
468                     if(log.isDebugEnabled()) {
469                         log.debug("{}: Next entry to canCommit {}", name, next);
470                     }
471
472                     iter.remove();
473                     currentCohortEntry = next;
474                     currentCohortEntry.updateLastAccessTime();
475                     doCanCommit(currentCohortEntry);
476                 }
477
478                 break;
479             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
480                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
481                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
482
483                 iter.remove();
484                 cohortCache.remove(next.getTransactionID());
485             } else {
486                 break;
487             }
488         }
489     }
490
491     void cleanupExpiredCohortEntries() {
492         maybeProcessNextCohortEntry();
493     }
494
495     @VisibleForTesting
496     void setCohortDecorator(CohortDecorator cohortDecorator) {
497         this.cohortDecorator = cohortDecorator;
498     }
499
500     static class CohortEntry {
501         private final String transactionID;
502         private ShardDataTreeCohort cohort;
503         private final ReadWriteShardDataTreeTransaction transaction;
504         private RuntimeException lastBatchedModificationsException;
505         private ActorRef replySender;
506         private Shard shard;
507         private boolean doImmediateCommit;
508         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
509         private int totalBatchedModificationsReceived;
510
511         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
512             this.transaction = Preconditions.checkNotNull(transaction);
513             this.transactionID = transactionID;
514         }
515
516         CohortEntry(String transactionID, ShardDataTreeCohort cohort,
517                 MutableCompositeModification compositeModification) {
518             this.transactionID = transactionID;
519             this.cohort = cohort;
520             this.transaction = null;
521         }
522
523         CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
524             this.transactionID = transactionID;
525             this.cohort = cohort;
526             this.transaction = null;
527         }
528
529         void updateLastAccessTime() {
530             lastAccessTimer.reset();
531             lastAccessTimer.start();
532         }
533
534         String getTransactionID() {
535             return transactionID;
536         }
537
538         ShardDataTreeCohort getCohort() {
539             return cohort;
540         }
541
542         int getTotalBatchedModificationsReceived() {
543             return totalBatchedModificationsReceived;
544         }
545
546         RuntimeException getLastBatchedModificationsException() {
547             return lastBatchedModificationsException;
548         }
549
550         void applyModifications(Iterable<Modification> modifications) {
551             totalBatchedModificationsReceived++;
552             if(lastBatchedModificationsException == null) {
553                 for (Modification modification : modifications) {
554                         try {
555                             modification.apply(transaction.getSnapshot());
556                         } catch (RuntimeException e) {
557                             lastBatchedModificationsException = e;
558                             throw e;
559                         }
560                 }
561             }
562         }
563
564         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
565             Preconditions.checkState(cohort == null, "cohort was already set");
566
567             setDoImmediateCommit(doImmediateCommit);
568
569             cohort = transaction.ready();
570
571             if(cohortDecorator != null) {
572                 // Call the hook for unit tests.
573                 cohort = cohortDecorator.decorate(transactionID, cohort);
574             }
575         }
576
577         boolean isReadyToCommit() {
578             return replySender != null;
579         }
580
581         boolean isExpired(long expireTimeInMillis) {
582             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
583         }
584
585         boolean isDoImmediateCommit() {
586             return doImmediateCommit;
587         }
588
589         void setDoImmediateCommit(boolean doImmediateCommit) {
590             this.doImmediateCommit = doImmediateCommit;
591         }
592
593         ActorRef getReplySender() {
594             return replySender;
595         }
596
597         void setReplySender(ActorRef replySender) {
598             this.replySender = replySender;
599         }
600
601         Shard getShard() {
602             return shard;
603         }
604
605         void setShard(Shard shard) {
606             this.shard = shard;
607         }
608
609         @Override
610         public String toString() {
611             StringBuilder builder = new StringBuilder();
612             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
613                     .append(doImmediateCommit).append("]");
614             return builder.toString();
615         }
616     }
617 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.