Merge "Use ImmutableNodes.fromInstanceId in restconf"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.cache.Cache;
16 import com.google.common.cache.CacheBuilder;
17 import com.google.common.cache.RemovalCause;
18 import com.google.common.cache.RemovalListener;
19 import com.google.common.cache.RemovalNotification;
20 import java.util.LinkedList;
21 import java.util.Queue;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
26 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.modification.Modification;
31 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.slf4j.Logger;
34
35 /**
36  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
37  *
38  * @author Thomas Pantelis
39  */
40 public class ShardCommitCoordinator {
41
42     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
43     public interface CohortDecorator {
44         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
45     }
46
47     private final Cache<String, CohortEntry> cohortCache;
48
49     private CohortEntry currentCohortEntry;
50
51     private final ShardDataTree dataTree;
52
53     private final Queue<CohortEntry> queuedCohortEntries;
54
55     private int queueCapacity;
56
57     private final Logger log;
58
59     private final String name;
60
61     private final RemovalListener<String, CohortEntry> cacheRemovalListener =
62             new RemovalListener<String, CohortEntry>() {
63                 @Override
64                 public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
65                     if(notification.getCause() == RemovalCause.EXPIRED) {
66                         log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
67                     }
68                 }
69             };
70
71     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
72     private CohortDecorator cohortDecorator;
73
74     private ReadyTransactionReply readyTransactionReply;
75
76     public ShardCommitCoordinator(ShardDataTree dataTree,
77             long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
78
79         this.queueCapacity = queueCapacity;
80         this.log = log;
81         this.name = name;
82         this.dataTree = Preconditions.checkNotNull(dataTree);
83
84         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
85                 removalListener(cacheRemovalListener).build();
86
87         // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
88         // since this should only be accessed on the shard's dispatcher.
89         queuedCohortEntries = new LinkedList<>();
90     }
91
92     public void setQueueCapacity(int queueCapacity) {
93         this.queueCapacity = queueCapacity;
94     }
95
96     private ReadyTransactionReply readyTransactionReply(Shard shard) {
97         if(readyTransactionReply == null) {
98             readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(shard.self()));
99         }
100
101         return readyTransactionReply;
102     }
103
104     /**
105      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
106      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
107      */
108     public void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
109         log.debug("{}: Readying transaction {}, client version {}", name,
110                 ready.getTransactionID(), ready.getTxnClientVersion());
111
112         CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), ready.getCohort(),
113                 (MutableCompositeModification) ready.getModification());
114         cohortCache.put(ready.getTransactionID(), cohortEntry);
115
116         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
117             // Return our actor path as we'll handle the three phase commit except if the Tx client
118             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
119             // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
120             // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
121             ActorRef replyActorPath = shard.self();
122             if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
123                 log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
124                 replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
125                         ready.getTransactionID()));
126             }
127
128             ReadyTransactionReply readyTransactionReply =
129                     new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
130                             ready.getTxnClientVersion());
131             sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
132                 readyTransactionReply, shard.self());
133         } else {
134             if(ready.isDoImmediateCommit()) {
135                 cohortEntry.setDoImmediateCommit(true);
136                 cohortEntry.setReplySender(sender);
137                 cohortEntry.setShard(shard);
138                 handleCanCommit(cohortEntry);
139             } else {
140                 // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
141                 // front-end so send back a ReadyTransactionReply with our actor path.
142                 sender.tell(readyTransactionReply(shard), shard.self());
143             }
144         }
145     }
146
147     /**
148      * This method handles a BatchedModifications message for a transaction being prepared directly on the
149      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
150      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
151      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
152      *
153      * @param batched the BatchedModifications
154      * @param shardActor the transaction's shard actor
155      *
156      * @throws ExecutionException if an error occurs loading the cache
157      */
158     boolean handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard)
159             throws ExecutionException {
160         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
161         if(cohortEntry == null) {
162             cohortEntry = new CohortEntry(batched.getTransactionID(),
163                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
164                         batched.getTransactionChainID()));
165             cohortCache.put(batched.getTransactionID(), cohortEntry);
166         }
167
168         if(log.isDebugEnabled()) {
169             log.debug("{}: Applying {} batched modifications for Tx {}", name,
170                     batched.getModifications().size(), batched.getTransactionID());
171         }
172
173         cohortEntry.applyModifications(batched.getModifications());
174
175         if(batched.isReady()) {
176             if(log.isDebugEnabled()) {
177                 log.debug("{}: Readying Tx {}, client version {}", name,
178                         batched.getTransactionID(), batched.getVersion());
179             }
180
181             cohortEntry.ready(cohortDecorator, batched.isDoCommitOnReady());
182
183             if(batched.isDoCommitOnReady()) {
184                 cohortEntry.setReplySender(sender);
185                 cohortEntry.setShard(shard);
186                 handleCanCommit(cohortEntry);
187             } else {
188                 sender.tell(readyTransactionReply(shard), shard.self());
189             }
190         } else {
191             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
192         }
193
194         return batched.isReady();
195     }
196
197     private void handleCanCommit(CohortEntry cohortEntry) {
198         String transactionID = cohortEntry.getTransactionID();
199
200         if(log.isDebugEnabled()) {
201             log.debug("{}: Processing canCommit for transaction {} for shard {}",
202                     name, transactionID, cohortEntry.getShard().self().path());
203         }
204
205         if(currentCohortEntry != null) {
206             // There's already a Tx commit in progress - attempt to queue this entry to be
207             // committed after the current Tx completes.
208             log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
209                     name, currentCohortEntry.getTransactionID(), transactionID);
210
211             if(queuedCohortEntries.size() < queueCapacity) {
212                 queuedCohortEntries.offer(cohortEntry);
213             } else {
214                 removeCohortEntry(transactionID);
215
216                 RuntimeException ex = new RuntimeException(
217                         String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
218                                       " capacity %d has been reached.",
219                                       name, transactionID, queueCapacity));
220                 log.error(ex.getMessage());
221                 cohortEntry.getReplySender().tell(new Status.Failure(ex), cohortEntry.getShard().self());
222             }
223         } else {
224             // No Tx commit currently in progress - make this the current entry and proceed with
225             // canCommit.
226             cohortEntry.updateLastAccessTime();
227             currentCohortEntry = cohortEntry;
228
229             doCanCommit(cohortEntry);
230         }
231     }
232
233     /**
234      * This method handles the canCommit phase for a transaction.
235      *
236      * @param canCommit the CanCommitTransaction message
237      * @param sender the actor that sent the message
238      * @param shard the transaction's shard actor
239      */
240     public void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
241         // Lookup the cohort entry that was cached previously (or should have been) by
242         // transactionReady (via the ForwardedReadyTransaction message).
243         final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
244         if(cohortEntry == null) {
245             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
246             // between canCommit and ready and the entry was expired from the cache.
247             IllegalStateException ex = new IllegalStateException(
248                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
249             log.error(ex.getMessage());
250             sender.tell(new Status.Failure(ex), shard.self());
251             return;
252         }
253
254         cohortEntry.setReplySender(sender);
255         cohortEntry.setShard(shard);
256
257         handleCanCommit(cohortEntry);
258     }
259
260     private void doCanCommit(final CohortEntry cohortEntry) {
261
262         boolean canCommit = false;
263         try {
264             // We block on the future here so we don't have to worry about possibly accessing our
265             // state on a different thread outside of our dispatcher. Also, the data store
266             // currently uses a same thread executor anyway.
267             canCommit = cohortEntry.getCohort().canCommit().get();
268
269             if(cohortEntry.isDoImmediateCommit()) {
270                 if(canCommit) {
271                     doCommit(cohortEntry);
272                 } else {
273                     cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
274                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
275                 }
276             } else {
277                 cohortEntry.getReplySender().tell(
278                         canCommit ? CanCommitTransactionReply.YES.toSerializable() :
279                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
280             }
281         } catch (Exception e) {
282             log.debug("{}: An exception occurred during canCommit: {}", name, e);
283
284             Throwable failure = e;
285             if(e instanceof ExecutionException) {
286                 failure = e.getCause();
287             }
288
289             cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
290         } finally {
291             if(!canCommit) {
292                 // Remove the entry from the cache now.
293                 currentTransactionComplete(cohortEntry.getTransactionID(), true);
294             }
295         }
296     }
297
298     private boolean doCommit(CohortEntry cohortEntry) {
299         log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
300
301         boolean success = false;
302
303         // We perform the preCommit phase here atomically with the commit phase. This is an
304         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
305         // coordination of preCommit across shards in case of failure but preCommit should not
306         // normally fail since we ensure only one concurrent 3-phase commit.
307
308         try {
309             // We block on the future here so we don't have to worry about possibly accessing our
310             // state on a different thread outside of our dispatcher. Also, the data store
311             // currently uses a same thread executor anyway.
312             cohortEntry.getCohort().preCommit().get();
313
314             cohortEntry.getShard().continueCommit(cohortEntry);
315
316             cohortEntry.updateLastAccessTime();
317
318             success = true;
319         } catch (Exception e) {
320             log.error("{} An exception occurred while preCommitting transaction {}",
321                     name, cohortEntry.getTransactionID(), e);
322             cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
323
324             currentTransactionComplete(cohortEntry.getTransactionID(), true);
325         }
326
327         return success;
328     }
329
330     boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
331         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
332         // this transaction.
333         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
334         if(cohortEntry == null) {
335             // We're not the current Tx - the Tx was likely expired b/c it took too long in
336             // between the canCommit and commit messages.
337             IllegalStateException ex = new IllegalStateException(
338                     String.format("%s: Cannot commit transaction %s - it is not the current transaction",
339                             name, transactionID));
340             log.error(ex.getMessage());
341             sender.tell(new akka.actor.Status.Failure(ex), shard.self());
342             return false;
343         }
344
345         return doCommit(cohortEntry);
346     }
347
348     /**
349      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
350      * matches the current entry.
351      *
352      * @param transactionID the ID of the transaction
353      * @return the current CohortEntry or null if the given transaction ID does not match the
354      *         current entry.
355      */
356     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
357         if(isCurrentTransaction(transactionID)) {
358             return currentCohortEntry;
359         }
360
361         return null;
362     }
363
364     public CohortEntry getCurrentCohortEntry() {
365         return currentCohortEntry;
366     }
367
368     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
369         CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
370         cohortCache.invalidate(transactionID);
371         return cohortEntry;
372     }
373
374     public void removeCohortEntry(String transactionID) {
375         cohortCache.invalidate(transactionID);
376     }
377
378     public boolean isCurrentTransaction(String transactionID) {
379         return currentCohortEntry != null &&
380                 currentCohortEntry.getTransactionID().equals(transactionID);
381     }
382
383     /**
384      * This method is called when a transaction is complete, successful or not. If the given
385      * given transaction ID matches the current in-progress transaction, the next cohort entry,
386      * if any, is dequeued and processed.
387      *
388      * @param transactionID the ID of the completed transaction
389      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
390      *        the cache.
391      */
392     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
393         if(removeCohortEntry) {
394             removeCohortEntry(transactionID);
395         }
396
397         if(isCurrentTransaction(transactionID)) {
398             // Dequeue the next cohort entry waiting in the queue.
399             currentCohortEntry = queuedCohortEntries.poll();
400             if(currentCohortEntry != null) {
401                 currentCohortEntry.updateLastAccessTime();
402                 doCanCommit(currentCohortEntry);
403             }
404         }
405     }
406
407     @VisibleForTesting
408     void setCohortDecorator(CohortDecorator cohortDecorator) {
409         this.cohortDecorator = cohortDecorator;
410     }
411
412
413     static class CohortEntry {
414         private final String transactionID;
415         private ShardDataTreeCohort cohort;
416         private final ReadWriteShardDataTreeTransaction transaction;
417         private ActorRef replySender;
418         private Shard shard;
419         private long lastAccessTime;
420         private boolean doImmediateCommit;
421
422         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
423             this.transaction = Preconditions.checkNotNull(transaction);
424             this.transactionID = transactionID;
425         }
426
427         CohortEntry(String transactionID, ShardDataTreeCohort cohort,
428                 MutableCompositeModification compositeModification) {
429             this.transactionID = transactionID;
430             this.cohort = cohort;
431             this.transaction = null;
432         }
433
434         void updateLastAccessTime() {
435             lastAccessTime = System.currentTimeMillis();
436         }
437
438         long getLastAccessTime() {
439             return lastAccessTime;
440         }
441
442         String getTransactionID() {
443             return transactionID;
444         }
445
446         ShardDataTreeCohort getCohort() {
447             return cohort;
448         }
449
450         void applyModifications(Iterable<Modification> modifications) {
451             for (Modification modification : modifications) {
452                 modification.apply(transaction.getSnapshot());
453             }
454         }
455
456         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
457             Preconditions.checkState(cohort == null, "cohort was already set");
458
459             setDoImmediateCommit(doImmediateCommit);
460
461             cohort = transaction.ready();
462
463             if(cohortDecorator != null) {
464                 // Call the hook for unit tests.
465                 cohort = cohortDecorator.decorate(transactionID, cohort);
466             }
467         }
468
469         boolean isDoImmediateCommit() {
470             return doImmediateCommit;
471         }
472
473         void setDoImmediateCommit(boolean doImmediateCommit) {
474             this.doImmediateCommit = doImmediateCommit;
475         }
476
477         ActorRef getReplySender() {
478             return replySender;
479         }
480
481         void setReplySender(ActorRef replySender) {
482             this.replySender = replySender;
483         }
484
485         Shard getShard() {
486             return shard;
487         }
488
489         void setShard(Shard shard) {
490             this.shard = shard;
491         }
492     }
493 }