Bug 3206: CDS - issues with direct commit
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.Map;
20 import java.util.Queue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.modification.Modification;
31 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.slf4j.Logger;
34
35 /**
36  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
37  *
38  * @author Thomas Pantelis
39  */
40 public class ShardCommitCoordinator {
41
42     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
43     public interface CohortDecorator {
44         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
45     }
46
47     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
48
49     private CohortEntry currentCohortEntry;
50
51     private final ShardDataTree dataTree;
52
53     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
54     // since this should only be accessed on the shard's dispatcher.
55     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
56
57     private int queueCapacity;
58
59     private final Logger log;
60
61     private final String name;
62
63     private final long cacheExpiryTimeoutInMillis;
64
65     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
66     private CohortDecorator cohortDecorator;
67
68     private ReadyTransactionReply readyTransactionReply;
69
70     public ShardCommitCoordinator(ShardDataTree dataTree,
71             long cacheExpiryTimeoutInMillis, int queueCapacity, ActorRef shardActor, Logger log, String name) {
72
73         this.queueCapacity = queueCapacity;
74         this.log = log;
75         this.name = name;
76         this.dataTree = Preconditions.checkNotNull(dataTree);
77         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
78     }
79
80     public void setQueueCapacity(int queueCapacity) {
81         this.queueCapacity = queueCapacity;
82     }
83
84     private ReadyTransactionReply readyTransactionReply(Shard shard) {
85         if(readyTransactionReply == null) {
86             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
87         }
88
89         return readyTransactionReply;
90     }
91
92     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
93         if(queuedCohortEntries.size() < queueCapacity) {
94             queuedCohortEntries.offer(cohortEntry);
95             return true;
96         } else {
97             cohortCache.remove(cohortEntry.getTransactionID());
98
99             RuntimeException ex = new RuntimeException(
100                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
101                                   " capacity %d has been reached.",
102                                   name, cohortEntry.getTransactionID(), queueCapacity));
103             log.error(ex.getMessage());
104             sender.tell(new Status.Failure(ex), shard.self());
105             return false;
106         }
107     }
108
109     /**
110      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
111      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
112      */
113     public void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
114         log.debug("{}: Readying transaction {}, client version {}", name,
115                 ready.getTransactionID(), ready.getTxnClientVersion());
116
117         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
118                 (MutableCompositeModification) ready.getModification());
119         cohortCache.put(ready.getTransactionID(), cohortEntry);
120
121         if(!queueCohortEntry(cohortEntry, sender, shard)) {
122             return;
123         }
124
125         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
126             // Return our actor path as we'll handle the three phase commit except if the Tx client
127             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
128             // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
129             // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
130             ActorRef replyActorPath = shard.self();
131             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
132                 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
133                 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
134                         ready.getTransactionID()));
135             }
136
137             ReadyTransactionReply readyTransactionReply =
138                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
139                             ready.getTxnClientVersion());
140             sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
141                 readyTransactionReply, shard.self());
142         } else {
143             if(ready.isDoImmediateCommit()) {
144                 cohortEntry.setDoImmediateCommit(true);
145                 cohortEntry.setReplySender(sender);
146                 cohortEntry.setShard(shard);
147                 handleCanCommit(cohortEntry);
148             } else {
149                 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
150                 // front-end so send back a ReadyTransactionReply with our actor path.
151                 sender.tell(readyTransactionReply(shard), shard.self());
152             }
153         }
154     }
155
156     /**
157      * This method handles a BatchedModifications message for a transaction being prepared directly on the
158      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
159      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
160      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
161      *
162      * @param batched the BatchedModifications
163      * @param shardActor the transaction's shard actor
164      *
165      * @throws ExecutionException if an error occurs loading the cache
166      */
167     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard)
168             throws ExecutionException {
169         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
170         if(cohortEntry == null) {
171             cohortEntry = new CohortEntry(batched.getTransactionID(),
172                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
173                         batched.getTransactionChainID()));
174             cohortCache.put(batched.getTransactionID(), cohortEntry);
175         }
176
177         if(log.isDebugEnabled()) {
178             log.debug("{}: Applying {} batched modifications for Tx {}", name,
179                     batched.getModifications().size(), batched.getTransactionID());
180         }
181
182         cohortEntry.applyModifications(batched.getModifications());
183
184         if(batched.isReady()) {
185             if(!queueCohortEntry(cohortEntry, sender, shard)) {
186                 return;
187             }
188
189             if(log.isDebugEnabled()) {
190                 log.debug("{}: Readying Tx {}, client version {}", name,
191                         batched.getTransactionID(), batched.getVersion());
192             }
193
194             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
195
196             if(batched.isDoCommitOnReady()) {
197                 cohortEntry.setReplySender(sender);
198                 cohortEntry.setShard(shard);
199                 handleCanCommit(cohortEntry);
200             } else {
201                 sender.tell(readyTransactionReply(shard), shard.self());
202             }
203         } else {
204             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
205         }
206     }
207
208     /**
209      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
210      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
211      *
212      * @param message
213      * @param sender
214      * @param shard
215      */
216     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
217         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification());
218         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
219         cohortCache.put(message.getTransactionID(), cohortEntry);
220         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
221
222         if(!queueCohortEntry(cohortEntry, sender, shard)) {
223             return;
224         }
225
226         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
227
228         if (message.isDoCommitOnReady()) {
229             cohortEntry.setReplySender(sender);
230             cohortEntry.setShard(shard);
231             handleCanCommit(cohortEntry);
232         } else {
233             sender.tell(readyTransactionReply(shard), shard.self());
234         }
235     }
236
237     private void handleCanCommit(CohortEntry cohortEntry) {
238         String transactionID = cohortEntry.getTransactionID();
239
240         cohortEntry.updateLastAccessTime();
241
242         if(currentCohortEntry != null) {
243             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
244             // queue and will get processed after all prior entries complete.
245
246             if(log.isDebugEnabled()) {
247                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
248                         name, currentCohortEntry.getTransactionID(), transactionID);
249             }
250
251             return;
252         }
253
254         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
255         // it the current entry and proceed with canCommit.
256         // Purposely checking reference equality here.
257         if(queuedCohortEntries.peek() == cohortEntry) {
258             currentCohortEntry = queuedCohortEntries.poll();
259             doCanCommit(currentCohortEntry);
260         } else {
261             if(log.isDebugEnabled()) {
262                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
263                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
264             }
265         }
266     }
267
268     /**
269      * This method handles the canCommit phase for a transaction.
270      *
271      * @param canCommit the CanCommitTransaction message
272      * @param sender the actor that sent the message
273      * @param shard the transaction's shard actor
274      */
275     public void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
276         // Lookup the cohort entry that was cached previously (or should have been) by
277         // transactionReady (via the ForwardedReadyTransaction message).
278         final CohortEntry cohortEntry = cohortCache.get(transactionID);
279         if(cohortEntry == null) {
280             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
281             // between canCommit and ready and the entry was expired from the cache.
282             IllegalStateException ex = new IllegalStateException(
283                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
284             log.error(ex.getMessage());
285             sender.tell(new Status.Failure(ex), shard.self());
286             return;
287         }
288
289         cohortEntry.setReplySender(sender);
290         cohortEntry.setShard(shard);
291
292         handleCanCommit(cohortEntry);
293     }
294
295     private void doCanCommit(final CohortEntry cohortEntry) {
296         boolean canCommit = false;
297         try {
298             // We block on the future here so we don't have to worry about possibly accessing our
299             // state on a different thread outside of our dispatcher. Also, the data store
300             // currently uses a same thread executor anyway.
301             canCommit = cohortEntry.getCohort().canCommit().get();
302
303             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
304
305             if(cohortEntry.isDoImmediateCommit()) {
306                 if(canCommit) {
307                     doCommit(cohortEntry);
308                 } else {
309                     cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
310                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
311                 }
312             } else {
313                 cohortEntry.getReplySender().tell(
314                         canCommit ? CanCommitTransactionReply.YES.toSerializable() :
315                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
316             }
317         } catch (Exception e) {
318             log.debug("{}: An exception occurred during canCommit", name, e);
319
320             Throwable failure = e;
321             if(e instanceof ExecutionException) {
322                 failure = e.getCause();
323             }
324
325             cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
326         } finally {
327             if(!canCommit) {
328                 // Remove the entry from the cache now.
329                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
330             }
331         }
332     }
333
334     private boolean doCommit(CohortEntry cohortEntry) {
335         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
336
337         boolean success = false;
338
339         // We perform the preCommit phase here atomically with the commit phase. This is an
340         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
341         // coordination of preCommit across shards in case of failure but preCommit should not
342         // normally fail since we ensure only one concurrent 3-phase commit.
343
344         try {
345             // We block on the future here so we don't have to worry about possibly accessing our
346             // state on a different thread outside of our dispatcher. Also, the data store
347             // currently uses a same thread executor anyway.
348             cohortEntry.getCohort().preCommit().get();
349
350             cohortEntry.getShard().continueCommit(cohortEntry);
351
352             cohortEntry.updateLastAccessTime();
353
354             success = true;
355         } catch (Exception e) {
356             log.error("{} An exception occurred while preCommitting transaction {}",
357                     name, cohortEntry.getTransactionID(), e);
358             cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
359
360             currentTransactionComplete(cohortEntry.getTransactionID(), true);
361         }
362
363         return success;
364     }
365
366     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
367         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
368         // this transaction.
369         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
370         if(cohortEntry == null) {
371             // We're not the current Tx - the Tx was likely expired b/c it took too long in
372             // between the canCommit and commit messages.
373             IllegalStateException ex = new IllegalStateException(
374                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
375                             name, transactionID));
376             log.error(ex.getMessage());
377             sender.tell(new akka.actor.Status.Failure(ex), shard.self());
378             return false;
379         }
380
381         cohortEntry.setReplySender(sender);
382         return doCommit(cohortEntry);
383     }
384
385     /**
386      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
387      * matches the current entry.
388      *
389      * @param transactionID the ID of the transaction
390      * @return the current CohortEntry or null if the given transaction ID does not match the
391      *         current entry.
392      */
393     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
394         if(isCurrentTransaction(transactionID)) {
395             return currentCohortEntry;
396         }
397
398         return null;
399     }
400
401     public CohortEntry getCurrentCohortEntry() {
402         return currentCohortEntry;
403     }
404
405     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
406         return cohortCache.remove(transactionID);
407     }
408
409     public boolean isCurrentTransaction(String transactionID) {
410         return currentCohortEntry != null &&
411                 currentCohortEntry.getTransactionID().equals(transactionID);
412     }
413
414     /**
415      * This method is called when a transaction is complete, successful or not. If the given
416      * given transaction ID matches the current in-progress transaction, the next cohort entry,
417      * if any, is dequeued and processed.
418      *
419      * @param transactionID the ID of the completed transaction
420      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
421      *        the cache.
422      */
423     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
424         if(removeCohortEntry) {
425             cohortCache.remove(transactionID);
426         }
427
428         if(isCurrentTransaction(transactionID)) {
429             currentCohortEntry = null;
430
431             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
432
433             maybeProcessNextCohortEntry();
434         }
435     }
436
437     private void maybeProcessNextCohortEntry() {
438         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
439         // clean out expired entries.
440         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
441         while(iter.hasNext()) {
442             CohortEntry next = iter.next();
443             if(next.isReadyToCommit()) {
444                 if(currentCohortEntry == null) {
445                     if(log.isDebugEnabled()) {
446                         log.debug("{}: Next entry to canCommit {}", name, next);
447                     }
448
449                     iter.remove();
450                     currentCohortEntry = next;
451                     currentCohortEntry.updateLastAccessTime();
452                     doCanCommit(currentCohortEntry);
453                 }
454
455                 break;
456             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
457                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
458                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
459
460                 iter.remove();
461                 cohortCache.remove(next.getTransactionID());
462             } else {
463                 break;
464             }
465         }
466     }
467
468     void cleanupExpiredCohortEntries() {
469         maybeProcessNextCohortEntry();
470     }
471
472     @VisibleForTesting
473     void setCohortDecorator(CohortDecorator cohortDecorator) {
474         this.cohortDecorator = cohortDecorator;
475     }
476
477     static class CohortEntry {
478         private final String transactionID;
479         private ShardDataTreeCohort cohort;
480         private final ReadWriteShardDataTreeTransaction transaction;
481         private ActorRef replySender;
482         private Shard shard;
483         private boolean doImmediateCommit;
484         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
485
486         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
487             this.transaction = Preconditions.checkNotNull(transaction);
488             this.transactionID = transactionID;
489         }
490
491         CohortEntry(String transactionID, ShardDataTreeCohort cohort,
492                 MutableCompositeModification compositeModification) {
493             this.transactionID = transactionID;
494             this.cohort = cohort;
495             this.transaction = null;
496         }
497
498         CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
499             this.transactionID = transactionID;
500             this.cohort = cohort;
501             this.transaction = null;
502         }
503
504         void updateLastAccessTime() {
505             lastAccessTimer.reset();
506             lastAccessTimer.start();
507         }
508
509         String getTransactionID() {
510             return transactionID;
511         }
512
513         ShardDataTreeCohort getCohort() {
514             return cohort;
515         }
516
517         void applyModifications(Iterable<Modification> modifications) {
518             for (Modification modification : modifications) {
519                 modification.apply(transaction.getSnapshot());
520             }
521         }
522
523         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
524             Preconditions.checkState(cohort == null, "cohort was already set");
525
526             setDoImmediateCommit(doImmediateCommit);
527
528             cohort = transaction.ready();
529
530             if(cohortDecorator != null) {
531                 // Call the hook for unit tests.
532                 cohort = cohortDecorator.decorate(transactionID, cohort);
533             }
534         }
535
536         boolean isReadyToCommit() {
537             return replySender != null;
538         }
539
540         boolean isExpired(long expireTimeInMillis) {
541             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
542         }
543
544         boolean isDoImmediateCommit() {
545             return doImmediateCommit;
546         }
547
548         void setDoImmediateCommit(boolean doImmediateCommit) {
549             this.doImmediateCommit = doImmediateCommit;
550         }
551
552         ActorRef getReplySender() {
553             return replySender;
554         }
555
556         void setReplySender(ActorRef replySender) {
557             this.replySender = replySender;
558         }
559
560         Shard getShard() {
561             return shard;
562         }
563
564         void setShard(Shard shard) {
565             this.shard = shard;
566         }
567
568         @Override
569         public String toString() {
570             StringBuilder builder = new StringBuilder();
571             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
572                     .append(doImmediateCommit).append("]");
573             return builder.toString();
574         }
575     }
576 }