Address comments in https://git.opendaylight.org/gerrit/#/c/18392/
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.Map;
20 import java.util.Queue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.modification.Modification;
31 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.slf4j.Logger;
34
35 /**
36  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
37  *
38  * @author Thomas Pantelis
39  */
40 class ShardCommitCoordinator {
41
42     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
43     public interface CohortDecorator {
44         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
45     }
46
47     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
48
49     private CohortEntry currentCohortEntry;
50
51     private final ShardDataTree dataTree;
52
53     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
54     // since this should only be accessed on the shard's dispatcher.
55     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
56
57     private int queueCapacity;
58
59     private final Logger log;
60
61     private final String name;
62
63     private final long cacheExpiryTimeoutInMillis;
64
65     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
66     private CohortDecorator cohortDecorator;
67
68     private ReadyTransactionReply readyTransactionReply;
69
70     ShardCommitCoordinator(ShardDataTree dataTree,
71             long cacheExpiryTimeoutInMillis, int queueCapacity, ActorRef shardActor, Logger log, String name) {
72
73         this.queueCapacity = queueCapacity;
74         this.log = log;
75         this.name = name;
76         this.dataTree = Preconditions.checkNotNull(dataTree);
77         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
78     }
79
80     void setQueueCapacity(int queueCapacity) {
81         this.queueCapacity = queueCapacity;
82     }
83
84     private ReadyTransactionReply readyTransactionReply(Shard shard) {
85         if(readyTransactionReply == null) {
86             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
87         }
88
89         return readyTransactionReply;
90     }
91
92     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
93         if(queuedCohortEntries.size() < queueCapacity) {
94             queuedCohortEntries.offer(cohortEntry);
95             return true;
96         } else {
97             cohortCache.remove(cohortEntry.getTransactionID());
98
99             RuntimeException ex = new RuntimeException(
100                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
101                                   " capacity %d has been reached.",
102                                   name, cohortEntry.getTransactionID(), queueCapacity));
103             log.error(ex.getMessage());
104             sender.tell(new Status.Failure(ex), shard.self());
105             return false;
106         }
107     }
108
109     /**
110      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
111      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
112      *
113      * @param ready the ForwardedReadyTransaction message to process
114      * @param sender the sender of the message
115      * @param shard the transaction's shard actor
116      */
117     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
118         log.debug("{}: Readying transaction {}, client version {}", name,
119                 ready.getTransactionID(), ready.getTxnClientVersion());
120
121         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
122                 (MutableCompositeModification) ready.getModification());
123         cohortCache.put(ready.getTransactionID(), cohortEntry);
124
125         if(!queueCohortEntry(cohortEntry, sender, shard)) {
126             return;
127         }
128
129         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
130             // Return our actor path as we'll handle the three phase commit except if the Tx client
131             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
132             // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
133             // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
134             ActorRef replyActorPath = shard.self();
135             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
136                 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
137                 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
138                         ready.getTransactionID()));
139             }
140
141             ReadyTransactionReply readyTransactionReply =
142                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
143                             ready.getTxnClientVersion());
144             sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
145                 readyTransactionReply, shard.self());
146         } else {
147             if(ready.isDoImmediateCommit()) {
148                 cohortEntry.setDoImmediateCommit(true);
149                 cohortEntry.setReplySender(sender);
150                 cohortEntry.setShard(shard);
151                 handleCanCommit(cohortEntry);
152             } else {
153                 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
154                 // front-end so send back a ReadyTransactionReply with our actor path.
155                 sender.tell(readyTransactionReply(shard), shard.self());
156             }
157         }
158     }
159
160     /**
161      * This method handles a BatchedModifications message for a transaction being prepared directly on the
162      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
163      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
164      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
165      *
166      * @param batched the BatchedModifications message to process
167      * @param sender the sender of the message
168      * @param shard the transaction's shard actor
169      */
170     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
171         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
172         if(cohortEntry == null) {
173             cohortEntry = new CohortEntry(batched.getTransactionID(),
174                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
175                         batched.getTransactionChainID()));
176             cohortCache.put(batched.getTransactionID(), cohortEntry);
177         }
178
179         if(log.isDebugEnabled()) {
180             log.debug("{}: Applying {} batched modifications for Tx {}", name,
181                     batched.getModifications().size(), batched.getTransactionID());
182         }
183
184         cohortEntry.applyModifications(batched.getModifications());
185
186         if(batched.isReady()) {
187             if(!queueCohortEntry(cohortEntry, sender, shard)) {
188                 return;
189             }
190
191             if(log.isDebugEnabled()) {
192                 log.debug("{}: Readying Tx {}, client version {}", name,
193                         batched.getTransactionID(), batched.getVersion());
194             }
195
196             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
197
198             if(batched.isDoCommitOnReady()) {
199                 cohortEntry.setReplySender(sender);
200                 cohortEntry.setShard(shard);
201                 handleCanCommit(cohortEntry);
202             } else {
203                 sender.tell(readyTransactionReply(shard), shard.self());
204             }
205         } else {
206             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
207         }
208     }
209
210     /**
211      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
212      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
213      *
214      * @param message the ReadyLocalTransaction message to process
215      * @param sender the sender of the message
216      * @param shard the transaction's shard actor
217      */
218     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
219         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification());
220         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
221         cohortCache.put(message.getTransactionID(), cohortEntry);
222         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
223
224         if(!queueCohortEntry(cohortEntry, sender, shard)) {
225             return;
226         }
227
228         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
229
230         if (message.isDoCommitOnReady()) {
231             cohortEntry.setReplySender(sender);
232             cohortEntry.setShard(shard);
233             handleCanCommit(cohortEntry);
234         } else {
235             sender.tell(readyTransactionReply(shard), shard.self());
236         }
237     }
238
239     private void handleCanCommit(CohortEntry cohortEntry) {
240         String transactionID = cohortEntry.getTransactionID();
241
242         cohortEntry.updateLastAccessTime();
243
244         if(currentCohortEntry != null) {
245             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
246             // queue and will get processed after all prior entries complete.
247
248             if(log.isDebugEnabled()) {
249                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
250                         name, currentCohortEntry.getTransactionID(), transactionID);
251             }
252
253             return;
254         }
255
256         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
257         // it the current entry and proceed with canCommit.
258         // Purposely checking reference equality here.
259         if(queuedCohortEntries.peek() == cohortEntry) {
260             currentCohortEntry = queuedCohortEntries.poll();
261             doCanCommit(currentCohortEntry);
262         } else {
263             if(log.isDebugEnabled()) {
264                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
265                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
266             }
267         }
268     }
269
270     /**
271      * This method handles the canCommit phase for a transaction.
272      *
273      * @param transactionID the ID of the transaction to canCommit
274      * @param sender the actor to which to send the response
275      * @param shard the transaction's shard actor
276      */
277     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
278         // Lookup the cohort entry that was cached previously (or should have been) by
279         // transactionReady (via the ForwardedReadyTransaction message).
280         final CohortEntry cohortEntry = cohortCache.get(transactionID);
281         if(cohortEntry == null) {
282             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
283             // between canCommit and ready and the entry was expired from the cache.
284             IllegalStateException ex = new IllegalStateException(
285                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
286             log.error(ex.getMessage());
287             sender.tell(new Status.Failure(ex), shard.self());
288             return;
289         }
290
291         cohortEntry.setReplySender(sender);
292         cohortEntry.setShard(shard);
293
294         handleCanCommit(cohortEntry);
295     }
296
297     private void doCanCommit(final CohortEntry cohortEntry) {
298         boolean canCommit = false;
299         try {
300             // We block on the future here so we don't have to worry about possibly accessing our
301             // state on a different thread outside of our dispatcher. Also, the data store
302             // currently uses a same thread executor anyway.
303             canCommit = cohortEntry.getCohort().canCommit().get();
304
305             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
306
307             if(cohortEntry.isDoImmediateCommit()) {
308                 if(canCommit) {
309                     doCommit(cohortEntry);
310                 } else {
311                     cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
312                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
313                 }
314             } else {
315                 cohortEntry.getReplySender().tell(
316                         canCommit ? CanCommitTransactionReply.YES.toSerializable() :
317                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
318             }
319         } catch (Exception e) {
320             log.debug("{}: An exception occurred during canCommit", name, e);
321
322             Throwable failure = e;
323             if(e instanceof ExecutionException) {
324                 failure = e.getCause();
325             }
326
327             cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
328         } finally {
329             if(!canCommit) {
330                 // Remove the entry from the cache now.
331                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
332             }
333         }
334     }
335
336     private boolean doCommit(CohortEntry cohortEntry) {
337         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
338
339         boolean success = false;
340
341         // We perform the preCommit phase here atomically with the commit phase. This is an
342         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
343         // coordination of preCommit across shards in case of failure but preCommit should not
344         // normally fail since we ensure only one concurrent 3-phase commit.
345
346         try {
347             // We block on the future here so we don't have to worry about possibly accessing our
348             // state on a different thread outside of our dispatcher. Also, the data store
349             // currently uses a same thread executor anyway.
350             cohortEntry.getCohort().preCommit().get();
351
352             cohortEntry.getShard().continueCommit(cohortEntry);
353
354             cohortEntry.updateLastAccessTime();
355
356             success = true;
357         } catch (Exception e) {
358             log.error("{} An exception occurred while preCommitting transaction {}",
359                     name, cohortEntry.getTransactionID(), e);
360             cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
361
362             currentTransactionComplete(cohortEntry.getTransactionID(), true);
363         }
364
365         return success;
366     }
367
368     /**
369      * This method handles the preCommit and commit phases for a transaction.
370      *
371      * @param transactionID the ID of the transaction to commit
372      * @param sender the actor to which to send the response
373      * @param shard the transaction's shard actor
374      * @return true if the transaction was successfully prepared, false otherwise.
375      */
376     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
377         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
378         // this transaction.
379         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
380         if(cohortEntry == null) {
381             // We're not the current Tx - the Tx was likely expired b/c it took too long in
382             // between the canCommit and commit messages.
383             IllegalStateException ex = new IllegalStateException(
384                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
385                             name, transactionID));
386             log.error(ex.getMessage());
387             sender.tell(new akka.actor.Status.Failure(ex), shard.self());
388             return false;
389         }
390
391         cohortEntry.setReplySender(sender);
392         return doCommit(cohortEntry);
393     }
394
395     /**
396      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
397      * matches the current entry.
398      *
399      * @param transactionID the ID of the transaction
400      * @return the current CohortEntry or null if the given transaction ID does not match the
401      *         current entry.
402      */
403     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
404         if(isCurrentTransaction(transactionID)) {
405             return currentCohortEntry;
406         }
407
408         return null;
409     }
410
411     public CohortEntry getCurrentCohortEntry() {
412         return currentCohortEntry;
413     }
414
415     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
416         return cohortCache.remove(transactionID);
417     }
418
419     public boolean isCurrentTransaction(String transactionID) {
420         return currentCohortEntry != null &&
421                 currentCohortEntry.getTransactionID().equals(transactionID);
422     }
423
424     /**
425      * This method is called when a transaction is complete, successful or not. If the given
426      * given transaction ID matches the current in-progress transaction, the next cohort entry,
427      * if any, is dequeued and processed.
428      *
429      * @param transactionID the ID of the completed transaction
430      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
431      *        the cache.
432      */
433     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
434         if(removeCohortEntry) {
435             cohortCache.remove(transactionID);
436         }
437
438         if(isCurrentTransaction(transactionID)) {
439             currentCohortEntry = null;
440
441             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
442
443             maybeProcessNextCohortEntry();
444         }
445     }
446
447     private void maybeProcessNextCohortEntry() {
448         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
449         // clean out expired entries.
450         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
451         while(iter.hasNext()) {
452             CohortEntry next = iter.next();
453             if(next.isReadyToCommit()) {
454                 if(currentCohortEntry == null) {
455                     if(log.isDebugEnabled()) {
456                         log.debug("{}: Next entry to canCommit {}", name, next);
457                     }
458
459                     iter.remove();
460                     currentCohortEntry = next;
461                     currentCohortEntry.updateLastAccessTime();
462                     doCanCommit(currentCohortEntry);
463                 }
464
465                 break;
466             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
467                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
468                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
469
470                 iter.remove();
471                 cohortCache.remove(next.getTransactionID());
472             } else {
473                 break;
474             }
475         }
476     }
477
478     void cleanupExpiredCohortEntries() {
479         maybeProcessNextCohortEntry();
480     }
481
482     @VisibleForTesting
483     void setCohortDecorator(CohortDecorator cohortDecorator) {
484         this.cohortDecorator = cohortDecorator;
485     }
486
487     static class CohortEntry {
488         private final String transactionID;
489         private ShardDataTreeCohort cohort;
490         private final ReadWriteShardDataTreeTransaction transaction;
491         private ActorRef replySender;
492         private Shard shard;
493         private boolean doImmediateCommit;
494         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
495
496         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
497             this.transaction = Preconditions.checkNotNull(transaction);
498             this.transactionID = transactionID;
499         }
500
501         CohortEntry(String transactionID, ShardDataTreeCohort cohort,
502                 MutableCompositeModification compositeModification) {
503             this.transactionID = transactionID;
504             this.cohort = cohort;
505             this.transaction = null;
506         }
507
508         CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
509             this.transactionID = transactionID;
510             this.cohort = cohort;
511             this.transaction = null;
512         }
513
514         void updateLastAccessTime() {
515             lastAccessTimer.reset();
516             lastAccessTimer.start();
517         }
518
519         String getTransactionID() {
520             return transactionID;
521         }
522
523         ShardDataTreeCohort getCohort() {
524             return cohort;
525         }
526
527         void applyModifications(Iterable<Modification> modifications) {
528             for (Modification modification : modifications) {
529                 modification.apply(transaction.getSnapshot());
530             }
531         }
532
533         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
534             Preconditions.checkState(cohort == null, "cohort was already set");
535
536             setDoImmediateCommit(doImmediateCommit);
537
538             cohort = transaction.ready();
539
540             if(cohortDecorator != null) {
541                 // Call the hook for unit tests.
542                 cohort = cohortDecorator.decorate(transactionID, cohort);
543             }
544         }
545
546         boolean isReadyToCommit() {
547             return replySender != null;
548         }
549
550         boolean isExpired(long expireTimeInMillis) {
551             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
552         }
553
554         boolean isDoImmediateCommit() {
555             return doImmediateCommit;
556         }
557
558         void setDoImmediateCommit(boolean doImmediateCommit) {
559             this.doImmediateCommit = doImmediateCommit;
560         }
561
562         ActorRef getReplySender() {
563             return replySender;
564         }
565
566         void setReplySender(ActorRef replySender) {
567             this.replySender = replySender;
568         }
569
570         Shard getShard() {
571             return shard;
572         }
573
574         void setShard(Shard shard) {
575             this.shard = shard;
576         }
577
578         @Override
579         public String toString() {
580             StringBuilder builder = new StringBuilder();
581             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
582                     .append(doImmediateCommit).append("]");
583             return builder.toString();
584         }
585     }
586 }