6821fa721fe8b30629106f1d14ba666a775d9da6
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.Map;
20 import java.util.Queue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.modification.Modification;
31 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.slf4j.Logger;
34
35 /**
36  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
37  *
38  * @author Thomas Pantelis
39  */
40 class ShardCommitCoordinator {
41
42     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
43     public interface CohortDecorator {
44         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
45     }
46
47     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
48
49     private CohortEntry currentCohortEntry;
50
51     private final ShardDataTree dataTree;
52
53     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
54     // since this should only be accessed on the shard's dispatcher.
55     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
56
57     private int queueCapacity;
58
59     private final Logger log;
60
61     private final String name;
62
63     private final long cacheExpiryTimeoutInMillis;
64
65     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
66     private CohortDecorator cohortDecorator;
67
68     private ReadyTransactionReply readyTransactionReply;
69
70     ShardCommitCoordinator(ShardDataTree dataTree,
71             long cacheExpiryTimeoutInMillis, int queueCapacity, ActorRef shardActor, Logger log, String name) {
72
73         this.queueCapacity = queueCapacity;
74         this.log = log;
75         this.name = name;
76         this.dataTree = Preconditions.checkNotNull(dataTree);
77         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
78     }
79
80     void setQueueCapacity(int queueCapacity) {
81         this.queueCapacity = queueCapacity;
82     }
83
84     private ReadyTransactionReply readyTransactionReply(Shard shard) {
85         if(readyTransactionReply == null) {
86             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
87         }
88
89         return readyTransactionReply;
90     }
91
92     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
93         if(queuedCohortEntries.size() < queueCapacity) {
94             queuedCohortEntries.offer(cohortEntry);
95             return true;
96         } else {
97             cohortCache.remove(cohortEntry.getTransactionID());
98
99             RuntimeException ex = new RuntimeException(
100                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
101                                   " capacity %d has been reached.",
102                                   name, cohortEntry.getTransactionID(), queueCapacity));
103             log.error(ex.getMessage());
104             sender.tell(new Status.Failure(ex), shard.self());
105             return false;
106         }
107     }
108
109     /**
110      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
111      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
112      *
113      * @param ready the ForwardedReadyTransaction message to process
114      * @param sender the sender of the message
115      * @param shard the transaction's shard actor
116      */
117     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
118         log.debug("{}: Readying transaction {}, client version {}", name,
119                 ready.getTransactionID(), ready.getTxnClientVersion());
120
121         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
122                 (MutableCompositeModification) ready.getModification());
123         cohortCache.put(ready.getTransactionID(), cohortEntry);
124
125         if(!queueCohortEntry(cohortEntry, sender, shard)) {
126             return;
127         }
128
129         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
130             // Return our actor path as we'll handle the three phase commit except if the Tx client
131             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
132             // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
133             // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
134             ActorRef replyActorPath = shard.self();
135             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
136                 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
137                 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
138                         ready.getTransactionID()));
139             }
140
141             ReadyTransactionReply readyTransactionReply =
142                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
143                             ready.getTxnClientVersion());
144             sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
145                 readyTransactionReply, shard.self());
146         } else {
147             if(ready.isDoImmediateCommit()) {
148                 cohortEntry.setDoImmediateCommit(true);
149                 cohortEntry.setReplySender(sender);
150                 cohortEntry.setShard(shard);
151                 handleCanCommit(cohortEntry);
152             } else {
153                 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
154                 // front-end so send back a ReadyTransactionReply with our actor path.
155                 sender.tell(readyTransactionReply(shard), shard.self());
156             }
157         }
158     }
159
160     /**
161      * This method handles a BatchedModifications message for a transaction being prepared directly on the
162      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
163      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
164      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
165      *
166      * @param batched the BatchedModifications message to process
167      * @param sender the sender of the message
168      * @param shard the transaction's shard actor
169      */
170     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
171         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
172         if(cohortEntry == null) {
173             cohortEntry = new CohortEntry(batched.getTransactionID(),
174                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
175                         batched.getTransactionChainID()));
176             cohortCache.put(batched.getTransactionID(), cohortEntry);
177         }
178
179         if(log.isDebugEnabled()) {
180             log.debug("{}: Applying {} batched modifications for Tx {}", name,
181                     batched.getModifications().size(), batched.getTransactionID());
182         }
183
184         cohortEntry.applyModifications(batched.getModifications());
185
186         if(batched.isReady()) {
187             if(!queueCohortEntry(cohortEntry, sender, shard)) {
188                 return;
189             }
190
191             if(log.isDebugEnabled()) {
192                 log.debug("{}: Readying Tx {}, client version {}", name,
193                         batched.getTransactionID(), batched.getVersion());
194             }
195
196             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
197
198             if(batched.isDoCommitOnReady()) {
199                 cohortEntry.setReplySender(sender);
200                 cohortEntry.setShard(shard);
201                 handleCanCommit(cohortEntry);
202             } else {
203                 sender.tell(readyTransactionReply(shard), shard.self());
204             }
205         } else {
206             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
207         }
208     }
209
210     /**
211      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
212      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
213      *
214      * @param message the ReadyLocalTransaction message to process
215      * @param sender the sender of the message
216      * @param shard the transaction's shard actor
217      */
218     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
219         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
220                 message.getTransactionID());
221         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
222         cohortCache.put(message.getTransactionID(), cohortEntry);
223         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
224
225         if(!queueCohortEntry(cohortEntry, sender, shard)) {
226             return;
227         }
228
229         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
230
231         if (message.isDoCommitOnReady()) {
232             cohortEntry.setReplySender(sender);
233             cohortEntry.setShard(shard);
234             handleCanCommit(cohortEntry);
235         } else {
236             sender.tell(readyTransactionReply(shard), shard.self());
237         }
238     }
239
240     private void handleCanCommit(CohortEntry cohortEntry) {
241         String transactionID = cohortEntry.getTransactionID();
242
243         cohortEntry.updateLastAccessTime();
244
245         if(currentCohortEntry != null) {
246             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
247             // queue and will get processed after all prior entries complete.
248
249             if(log.isDebugEnabled()) {
250                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
251                         name, currentCohortEntry.getTransactionID(), transactionID);
252             }
253
254             return;
255         }
256
257         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
258         // it the current entry and proceed with canCommit.
259         // Purposely checking reference equality here.
260         if(queuedCohortEntries.peek() == cohortEntry) {
261             currentCohortEntry = queuedCohortEntries.poll();
262             doCanCommit(currentCohortEntry);
263         } else {
264             if(log.isDebugEnabled()) {
265                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
266                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
267             }
268         }
269     }
270
271     /**
272      * This method handles the canCommit phase for a transaction.
273      *
274      * @param transactionID the ID of the transaction to canCommit
275      * @param sender the actor to which to send the response
276      * @param shard the transaction's shard actor
277      */
278     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
279         // Lookup the cohort entry that was cached previously (or should have been) by
280         // transactionReady (via the ForwardedReadyTransaction message).
281         final CohortEntry cohortEntry = cohortCache.get(transactionID);
282         if(cohortEntry == null) {
283             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
284             // between canCommit and ready and the entry was expired from the cache.
285             IllegalStateException ex = new IllegalStateException(
286                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
287             log.error(ex.getMessage());
288             sender.tell(new Status.Failure(ex), shard.self());
289             return;
290         }
291
292         cohortEntry.setReplySender(sender);
293         cohortEntry.setShard(shard);
294
295         handleCanCommit(cohortEntry);
296     }
297
298     private void doCanCommit(final CohortEntry cohortEntry) {
299         boolean canCommit = false;
300         try {
301             // We block on the future here so we don't have to worry about possibly accessing our
302             // state on a different thread outside of our dispatcher. Also, the data store
303             // currently uses a same thread executor anyway.
304             canCommit = cohortEntry.getCohort().canCommit().get();
305
306             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
307
308             if(cohortEntry.isDoImmediateCommit()) {
309                 if(canCommit) {
310                     doCommit(cohortEntry);
311                 } else {
312                     cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
313                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
314                 }
315             } else {
316                 cohortEntry.getReplySender().tell(
317                         canCommit ? CanCommitTransactionReply.YES.toSerializable() :
318                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
319             }
320         } catch (Exception e) {
321             log.debug("{}: An exception occurred during canCommit", name, e);
322
323             Throwable failure = e;
324             if(e instanceof ExecutionException) {
325                 failure = e.getCause();
326             }
327
328             cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
329         } finally {
330             if(!canCommit) {
331                 // Remove the entry from the cache now.
332                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
333             }
334         }
335     }
336
337     private boolean doCommit(CohortEntry cohortEntry) {
338         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
339
340         boolean success = false;
341
342         // We perform the preCommit phase here atomically with the commit phase. This is an
343         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
344         // coordination of preCommit across shards in case of failure but preCommit should not
345         // normally fail since we ensure only one concurrent 3-phase commit.
346
347         try {
348             // We block on the future here so we don't have to worry about possibly accessing our
349             // state on a different thread outside of our dispatcher. Also, the data store
350             // currently uses a same thread executor anyway.
351             cohortEntry.getCohort().preCommit().get();
352
353             cohortEntry.getShard().continueCommit(cohortEntry);
354
355             cohortEntry.updateLastAccessTime();
356
357             success = true;
358         } catch (Exception e) {
359             log.error("{} An exception occurred while preCommitting transaction {}",
360                     name, cohortEntry.getTransactionID(), e);
361             cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
362
363             currentTransactionComplete(cohortEntry.getTransactionID(), true);
364         }
365
366         return success;
367     }
368
369     /**
370      * This method handles the preCommit and commit phases for a transaction.
371      *
372      * @param transactionID the ID of the transaction to commit
373      * @param sender the actor to which to send the response
374      * @param shard the transaction's shard actor
375      * @return true if the transaction was successfully prepared, false otherwise.
376      */
377     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
378         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
379         // this transaction.
380         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
381         if(cohortEntry == null) {
382             // We're not the current Tx - the Tx was likely expired b/c it took too long in
383             // between the canCommit and commit messages.
384             IllegalStateException ex = new IllegalStateException(
385                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
386                             name, transactionID));
387             log.error(ex.getMessage());
388             sender.tell(new akka.actor.Status.Failure(ex), shard.self());
389             return false;
390         }
391
392         cohortEntry.setReplySender(sender);
393         return doCommit(cohortEntry);
394     }
395
396     /**
397      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
398      * matches the current entry.
399      *
400      * @param transactionID the ID of the transaction
401      * @return the current CohortEntry or null if the given transaction ID does not match the
402      *         current entry.
403      */
404     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
405         if(isCurrentTransaction(transactionID)) {
406             return currentCohortEntry;
407         }
408
409         return null;
410     }
411
412     public CohortEntry getCurrentCohortEntry() {
413         return currentCohortEntry;
414     }
415
416     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
417         return cohortCache.remove(transactionID);
418     }
419
420     public boolean isCurrentTransaction(String transactionID) {
421         return currentCohortEntry != null &&
422                 currentCohortEntry.getTransactionID().equals(transactionID);
423     }
424
425     /**
426      * This method is called when a transaction is complete, successful or not. If the given
427      * given transaction ID matches the current in-progress transaction, the next cohort entry,
428      * if any, is dequeued and processed.
429      *
430      * @param transactionID the ID of the completed transaction
431      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
432      *        the cache.
433      */
434     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
435         if(removeCohortEntry) {
436             cohortCache.remove(transactionID);
437         }
438
439         if(isCurrentTransaction(transactionID)) {
440             currentCohortEntry = null;
441
442             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
443
444             maybeProcessNextCohortEntry();
445         }
446     }
447
448     private void maybeProcessNextCohortEntry() {
449         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
450         // clean out expired entries.
451         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
452         while(iter.hasNext()) {
453             CohortEntry next = iter.next();
454             if(next.isReadyToCommit()) {
455                 if(currentCohortEntry == null) {
456                     if(log.isDebugEnabled()) {
457                         log.debug("{}: Next entry to canCommit {}", name, next);
458                     }
459
460                     iter.remove();
461                     currentCohortEntry = next;
462                     currentCohortEntry.updateLastAccessTime();
463                     doCanCommit(currentCohortEntry);
464                 }
465
466                 break;
467             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
468                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
469                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
470
471                 iter.remove();
472                 cohortCache.remove(next.getTransactionID());
473             } else {
474                 break;
475             }
476         }
477     }
478
479     void cleanupExpiredCohortEntries() {
480         maybeProcessNextCohortEntry();
481     }
482
483     @VisibleForTesting
484     void setCohortDecorator(CohortDecorator cohortDecorator) {
485         this.cohortDecorator = cohortDecorator;
486     }
487
488     static class CohortEntry {
489         private final String transactionID;
490         private ShardDataTreeCohort cohort;
491         private final ReadWriteShardDataTreeTransaction transaction;
492         private ActorRef replySender;
493         private Shard shard;
494         private boolean doImmediateCommit;
495         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
496
497         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
498             this.transaction = Preconditions.checkNotNull(transaction);
499             this.transactionID = transactionID;
500         }
501
502         CohortEntry(String transactionID, ShardDataTreeCohort cohort,
503                 MutableCompositeModification compositeModification) {
504             this.transactionID = transactionID;
505             this.cohort = cohort;
506             this.transaction = null;
507         }
508
509         CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
510             this.transactionID = transactionID;
511             this.cohort = cohort;
512             this.transaction = null;
513         }
514
515         void updateLastAccessTime() {
516             lastAccessTimer.reset();
517             lastAccessTimer.start();
518         }
519
520         String getTransactionID() {
521             return transactionID;
522         }
523
524         ShardDataTreeCohort getCohort() {
525             return cohort;
526         }
527
528         void applyModifications(Iterable<Modification> modifications) {
529             for (Modification modification : modifications) {
530                 modification.apply(transaction.getSnapshot());
531             }
532         }
533
534         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
535             Preconditions.checkState(cohort == null, "cohort was already set");
536
537             setDoImmediateCommit(doImmediateCommit);
538
539             cohort = transaction.ready();
540
541             if(cohortDecorator != null) {
542                 // Call the hook for unit tests.
543                 cohort = cohortDecorator.decorate(transactionID, cohort);
544             }
545         }
546
547         boolean isReadyToCommit() {
548             return replySender != null;
549         }
550
551         boolean isExpired(long expireTimeInMillis) {
552             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
553         }
554
555         boolean isDoImmediateCommit() {
556             return doImmediateCommit;
557         }
558
559         void setDoImmediateCommit(boolean doImmediateCommit) {
560             this.doImmediateCommit = doImmediateCommit;
561         }
562
563         ActorRef getReplySender() {
564             return replySender;
565         }
566
567         void setReplySender(ActorRef replySender) {
568             this.replySender = replySender;
569         }
570
571         Shard getShard() {
572             return shard;
573         }
574
575         void setShard(Shard shard) {
576             this.shard = shard;
577         }
578
579         @Override
580         public String toString() {
581             StringBuilder builder = new StringBuilder();
582             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
583                     .append(doImmediateCommit).append("]");
584             return builder.toString();
585         }
586     }
587 }