7d806890448ff783be319bcb66c3086f3d8f2e00
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.base.Stopwatch;
16 import java.util.HashMap;
17 import java.util.Iterator;
18 import java.util.LinkedList;
19 import java.util.Map;
20 import java.util.Queue;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.modification.Modification;
31 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.slf4j.Logger;
34
35 /**
36  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
37  *
38  * @author Thomas Pantelis
39  */
40 class ShardCommitCoordinator {
41
42     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
43     public interface CohortDecorator {
44         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
45     }
46
47     private final Map<String, CohortEntry> cohortCache = new HashMap<>();
48
49     private CohortEntry currentCohortEntry;
50
51     private final ShardDataTree dataTree;
52
53     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
54     // since this should only be accessed on the shard's dispatcher.
55     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
56
57     private int queueCapacity;
58
59     private final Logger log;
60
61     private final String name;
62
63     private final long cacheExpiryTimeoutInMillis;
64
65     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
66     private CohortDecorator cohortDecorator;
67
68     private ReadyTransactionReply readyTransactionReply;
69
70     ShardCommitCoordinator(ShardDataTree dataTree,
71             long cacheExpiryTimeoutInMillis, int queueCapacity, ActorRef shardActor, Logger log, String name) {
72
73         this.queueCapacity = queueCapacity;
74         this.log = log;
75         this.name = name;
76         this.dataTree = Preconditions.checkNotNull(dataTree);
77         this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
78     }
79
80     void setQueueCapacity(int queueCapacity) {
81         this.queueCapacity = queueCapacity;
82     }
83
84     private ReadyTransactionReply readyTransactionReply(Shard shard) {
85         if(readyTransactionReply == null) {
86             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
87         }
88
89         return readyTransactionReply;
90     }
91
92     private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
93         if(queuedCohortEntries.size() < queueCapacity) {
94             queuedCohortEntries.offer(cohortEntry);
95             return true;
96         } else {
97             cohortCache.remove(cohortEntry.getTransactionID());
98
99             RuntimeException ex = new RuntimeException(
100                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
101                                   " capacity %d has been reached.",
102                                   name, cohortEntry.getTransactionID(), queueCapacity));
103             log.error(ex.getMessage());
104             sender.tell(new Status.Failure(ex), shard.self());
105             return false;
106         }
107     }
108
109     /**
110      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
111      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
112      *
113      * @param ready the ForwardedReadyTransaction message to process
114      * @param sender the sender of the message
115      * @param shard the transaction's shard actor
116      */
117     void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
118         log.debug("{}: Readying transaction {}, client version {}", name,
119                 ready.getTransactionID(), ready.getTxnClientVersion());
120
121         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
122                 (MutableCompositeModification) ready.getModification());
123         cohortCache.put(ready.getTransactionID(), cohortEntry);
124
125         if(!queueCohortEntry(cohortEntry, sender, shard)) {
126             return;
127         }
128
129         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
130             // Return our actor path as we'll handle the three phase commit except if the Tx client
131             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
132             // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
133             // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
134             ActorRef replyActorPath = shard.self();
135             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
136                 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
137                 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
138                         ready.getTransactionID()));
139             }
140
141             ReadyTransactionReply readyTransactionReply =
142                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
143                             ready.getTxnClientVersion());
144             sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
145                 readyTransactionReply, shard.self());
146         } else {
147             if(ready.isDoImmediateCommit()) {
148                 cohortEntry.setDoImmediateCommit(true);
149                 cohortEntry.setReplySender(sender);
150                 cohortEntry.setShard(shard);
151                 handleCanCommit(cohortEntry);
152             } else {
153                 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
154                 // front-end so send back a ReadyTransactionReply with our actor path.
155                 sender.tell(readyTransactionReply(shard), shard.self());
156             }
157         }
158     }
159
160     /**
161      * This method handles a BatchedModifications message for a transaction being prepared directly on the
162      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
163      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
164      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
165      *
166      * @param batched the BatchedModifications message to process
167      * @param sender the sender of the message
168      * @param shard the transaction's shard actor
169      */
170     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
171         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
172         if(cohortEntry == null) {
173             cohortEntry = new CohortEntry(batched.getTransactionID(),
174                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
175                         batched.getTransactionChainID()));
176             cohortCache.put(batched.getTransactionID(), cohortEntry);
177         }
178
179         if(log.isDebugEnabled()) {
180             log.debug("{}: Applying {} batched modifications for Tx {}", name,
181                     batched.getModifications().size(), batched.getTransactionID());
182         }
183
184         cohortEntry.applyModifications(batched.getModifications());
185
186         if(batched.isReady()) {
187             if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
188                 throw new IllegalStateException(String.format(
189                         "The total number of batched messages received %d does not match the number sent %d",
190                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
191             }
192
193             if(!queueCohortEntry(cohortEntry, sender, shard)) {
194                 return;
195             }
196
197             if(log.isDebugEnabled()) {
198                 log.debug("{}: Readying Tx {}, client version {}", name,
199                         batched.getTransactionID(), batched.getVersion());
200             }
201
202             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
203
204             if(batched.isDoCommitOnReady()) {
205                 cohortEntry.setReplySender(sender);
206                 cohortEntry.setShard(shard);
207                 handleCanCommit(cohortEntry);
208             } else {
209                 sender.tell(readyTransactionReply(shard), shard.self());
210             }
211         } else {
212             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
213         }
214     }
215
216     /**
217      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
218      * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
219      *
220      * @param message the ReadyLocalTransaction message to process
221      * @param sender the sender of the message
222      * @param shard the transaction's shard actor
223      */
224     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
225         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
226                 message.getTransactionID());
227         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
228         cohortCache.put(message.getTransactionID(), cohortEntry);
229         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
230
231         if(!queueCohortEntry(cohortEntry, sender, shard)) {
232             return;
233         }
234
235         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
236
237         if (message.isDoCommitOnReady()) {
238             cohortEntry.setReplySender(sender);
239             cohortEntry.setShard(shard);
240             handleCanCommit(cohortEntry);
241         } else {
242             sender.tell(readyTransactionReply(shard), shard.self());
243         }
244     }
245
246     private void handleCanCommit(CohortEntry cohortEntry) {
247         String transactionID = cohortEntry.getTransactionID();
248
249         cohortEntry.updateLastAccessTime();
250
251         if(currentCohortEntry != null) {
252             // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
253             // queue and will get processed after all prior entries complete.
254
255             if(log.isDebugEnabled()) {
256                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
257                         name, currentCohortEntry.getTransactionID(), transactionID);
258             }
259
260             return;
261         }
262
263         // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
264         // it the current entry and proceed with canCommit.
265         // Purposely checking reference equality here.
266         if(queuedCohortEntries.peek() == cohortEntry) {
267             currentCohortEntry = queuedCohortEntries.poll();
268             doCanCommit(currentCohortEntry);
269         } else {
270             if(log.isDebugEnabled()) {
271                 log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
272                         name, queuedCohortEntries.peek().getTransactionID(), transactionID);
273             }
274         }
275     }
276
277     /**
278      * This method handles the canCommit phase for a transaction.
279      *
280      * @param transactionID the ID of the transaction to canCommit
281      * @param sender the actor to which to send the response
282      * @param shard the transaction's shard actor
283      */
284     void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
285         // Lookup the cohort entry that was cached previously (or should have been) by
286         // transactionReady (via the ForwardedReadyTransaction message).
287         final CohortEntry cohortEntry = cohortCache.get(transactionID);
288         if(cohortEntry == null) {
289             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
290             // between canCommit and ready and the entry was expired from the cache.
291             IllegalStateException ex = new IllegalStateException(
292                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
293             log.error(ex.getMessage());
294             sender.tell(new Status.Failure(ex), shard.self());
295             return;
296         }
297
298         cohortEntry.setReplySender(sender);
299         cohortEntry.setShard(shard);
300
301         handleCanCommit(cohortEntry);
302     }
303
304     private void doCanCommit(final CohortEntry cohortEntry) {
305         boolean canCommit = false;
306         try {
307             // We block on the future here so we don't have to worry about possibly accessing our
308             // state on a different thread outside of our dispatcher. Also, the data store
309             // currently uses a same thread executor anyway.
310             canCommit = cohortEntry.getCohort().canCommit().get();
311
312             log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
313
314             if(cohortEntry.isDoImmediateCommit()) {
315                 if(canCommit) {
316                     doCommit(cohortEntry);
317                 } else {
318                     cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
319                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
320                 }
321             } else {
322                 cohortEntry.getReplySender().tell(
323                         canCommit ? CanCommitTransactionReply.YES.toSerializable() :
324                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
325             }
326         } catch (Exception e) {
327             log.debug("{}: An exception occurred during canCommit", name, e);
328
329             Throwable failure = e;
330             if(e instanceof ExecutionException) {
331                 failure = e.getCause();
332             }
333
334             cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
335         } finally {
336             if(!canCommit) {
337                 // Remove the entry from the cache now.
338                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
339             }
340         }
341     }
342
343     private boolean doCommit(CohortEntry cohortEntry) {
344         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
345
346         boolean success = false;
347
348         // We perform the preCommit phase here atomically with the commit phase. This is an
349         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
350         // coordination of preCommit across shards in case of failure but preCommit should not
351         // normally fail since we ensure only one concurrent 3-phase commit.
352
353         try {
354             // We block on the future here so we don't have to worry about possibly accessing our
355             // state on a different thread outside of our dispatcher. Also, the data store
356             // currently uses a same thread executor anyway.
357             cohortEntry.getCohort().preCommit().get();
358
359             cohortEntry.getShard().continueCommit(cohortEntry);
360
361             cohortEntry.updateLastAccessTime();
362
363             success = true;
364         } catch (Exception e) {
365             log.error("{} An exception occurred while preCommitting transaction {}",
366                     name, cohortEntry.getTransactionID(), e);
367             cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
368
369             currentTransactionComplete(cohortEntry.getTransactionID(), true);
370         }
371
372         return success;
373     }
374
375     /**
376      * This method handles the preCommit and commit phases for a transaction.
377      *
378      * @param transactionID the ID of the transaction to commit
379      * @param sender the actor to which to send the response
380      * @param shard the transaction's shard actor
381      * @return true if the transaction was successfully prepared, false otherwise.
382      */
383     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
384         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
385         // this transaction.
386         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
387         if(cohortEntry == null) {
388             // We're not the current Tx - the Tx was likely expired b/c it took too long in
389             // between the canCommit and commit messages.
390             IllegalStateException ex = new IllegalStateException(
391                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
392                             name, transactionID));
393             log.error(ex.getMessage());
394             sender.tell(new akka.actor.Status.Failure(ex), shard.self());
395             return false;
396         }
397
398         cohortEntry.setReplySender(sender);
399         return doCommit(cohortEntry);
400     }
401
402     /**
403      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
404      * matches the current entry.
405      *
406      * @param transactionID the ID of the transaction
407      * @return the current CohortEntry or null if the given transaction ID does not match the
408      *         current entry.
409      */
410     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
411         if(isCurrentTransaction(transactionID)) {
412             return currentCohortEntry;
413         }
414
415         return null;
416     }
417
418     public CohortEntry getCurrentCohortEntry() {
419         return currentCohortEntry;
420     }
421
422     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
423         return cohortCache.remove(transactionID);
424     }
425
426     public boolean isCurrentTransaction(String transactionID) {
427         return currentCohortEntry != null &&
428                 currentCohortEntry.getTransactionID().equals(transactionID);
429     }
430
431     /**
432      * This method is called when a transaction is complete, successful or not. If the given
433      * given transaction ID matches the current in-progress transaction, the next cohort entry,
434      * if any, is dequeued and processed.
435      *
436      * @param transactionID the ID of the completed transaction
437      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
438      *        the cache.
439      */
440     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
441         if(removeCohortEntry) {
442             cohortCache.remove(transactionID);
443         }
444
445         if(isCurrentTransaction(transactionID)) {
446             currentCohortEntry = null;
447
448             log.debug("{}: currentTransactionComplete: {}", name, transactionID);
449
450             maybeProcessNextCohortEntry();
451         }
452     }
453
454     private void maybeProcessNextCohortEntry() {
455         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
456         // clean out expired entries.
457         Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
458         while(iter.hasNext()) {
459             CohortEntry next = iter.next();
460             if(next.isReadyToCommit()) {
461                 if(currentCohortEntry == null) {
462                     if(log.isDebugEnabled()) {
463                         log.debug("{}: Next entry to canCommit {}", name, next);
464                     }
465
466                     iter.remove();
467                     currentCohortEntry = next;
468                     currentCohortEntry.updateLastAccessTime();
469                     doCanCommit(currentCohortEntry);
470                 }
471
472                 break;
473             } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
474                 log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
475                         name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
476
477                 iter.remove();
478                 cohortCache.remove(next.getTransactionID());
479             } else {
480                 break;
481             }
482         }
483     }
484
485     void cleanupExpiredCohortEntries() {
486         maybeProcessNextCohortEntry();
487     }
488
489     @VisibleForTesting
490     void setCohortDecorator(CohortDecorator cohortDecorator) {
491         this.cohortDecorator = cohortDecorator;
492     }
493
494     static class CohortEntry {
495         private final String transactionID;
496         private ShardDataTreeCohort cohort;
497         private final ReadWriteShardDataTreeTransaction transaction;
498         private ActorRef replySender;
499         private Shard shard;
500         private boolean doImmediateCommit;
501         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
502         private int totalBatchedModificationsReceived;
503
504         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
505             this.transaction = Preconditions.checkNotNull(transaction);
506             this.transactionID = transactionID;
507         }
508
509         CohortEntry(String transactionID, ShardDataTreeCohort cohort,
510                 MutableCompositeModification compositeModification) {
511             this.transactionID = transactionID;
512             this.cohort = cohort;
513             this.transaction = null;
514         }
515
516         CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
517             this.transactionID = transactionID;
518             this.cohort = cohort;
519             this.transaction = null;
520         }
521
522         void updateLastAccessTime() {
523             lastAccessTimer.reset();
524             lastAccessTimer.start();
525         }
526
527         String getTransactionID() {
528             return transactionID;
529         }
530
531         ShardDataTreeCohort getCohort() {
532             return cohort;
533         }
534
535         int getTotalBatchedModificationsReceived() {
536             return totalBatchedModificationsReceived;
537         }
538
539         void applyModifications(Iterable<Modification> modifications) {
540             for (Modification modification : modifications) {
541                 modification.apply(transaction.getSnapshot());
542             }
543
544             totalBatchedModificationsReceived++;
545         }
546
547         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
548             Preconditions.checkState(cohort == null, "cohort was already set");
549
550             setDoImmediateCommit(doImmediateCommit);
551
552             cohort = transaction.ready();
553
554             if(cohortDecorator != null) {
555                 // Call the hook for unit tests.
556                 cohort = cohortDecorator.decorate(transactionID, cohort);
557             }
558         }
559
560         boolean isReadyToCommit() {
561             return replySender != null;
562         }
563
564         boolean isExpired(long expireTimeInMillis) {
565             return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
566         }
567
568         boolean isDoImmediateCommit() {
569             return doImmediateCommit;
570         }
571
572         void setDoImmediateCommit(boolean doImmediateCommit) {
573             this.doImmediateCommit = doImmediateCommit;
574         }
575
576         ActorRef getReplySender() {
577             return replySender;
578         }
579
580         void setReplySender(ActorRef replySender) {
581             this.replySender = replySender;
582         }
583
584         Shard getShard() {
585             return shard;
586         }
587
588         void setShard(Shard shard) {
589             this.shard = shard;
590         }
591
592         @Override
593         public String toString() {
594             StringBuilder builder = new StringBuilder();
595             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
596                     .append(doImmediateCommit).append("]");
597             return builder.toString();
598         }
599     }
600 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.