Merge "Remove deprecation suppression"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.cache.Cache;
16 import com.google.common.cache.CacheBuilder;
17 import com.google.common.cache.RemovalCause;
18 import com.google.common.cache.RemovalListener;
19 import com.google.common.cache.RemovalNotification;
20 import java.util.LinkedList;
21 import java.util.Queue;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
28 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
31 import org.slf4j.Logger;
32
33 /**
34  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
35  *
36  * @author Thomas Pantelis
37  */
38 public class ShardCommitCoordinator {
39
40     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
41     public interface CohortDecorator {
42         DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
43     }
44
45     private final Cache<String, CohortEntry> cohortCache;
46
47     private CohortEntry currentCohortEntry;
48
49     private final DOMTransactionFactory transactionFactory;
50
51     private final Queue<CohortEntry> queuedCohortEntries;
52
53     private int queueCapacity;
54
55     private final Logger log;
56
57     private final String name;
58
59     private final String shardActorPath;
60
61     private final RemovalListener<String, CohortEntry> cacheRemovalListener =
62             new RemovalListener<String, CohortEntry>() {
63                 @Override
64                 public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
65                     if(notification.getCause() == RemovalCause.EXPIRED) {
66                         log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
67                     }
68                 }
69             };
70
71     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
72     private CohortDecorator cohortDecorator;
73
74     public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
75             long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
76
77         this.queueCapacity = queueCapacity;
78         this.log = log;
79         this.name = name;
80         this.transactionFactory = transactionFactory;
81
82         shardActorPath = Serialization.serializedActorPath(shardActor);
83
84         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
85                 removalListener(cacheRemovalListener).build();
86
87         // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
88         // since this should only be accessed on the shard's dispatcher.
89         queuedCohortEntries = new LinkedList<>();
90     }
91
92     public void setQueueCapacity(int queueCapacity) {
93         this.queueCapacity = queueCapacity;
94     }
95
96     /**
97      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
98      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
99      *
100      * @param transactionID the ID of the transaction
101      * @param cohort the cohort to participate in the transaction commit
102      * @param modification the modifications made by the transaction
103      */
104     public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
105             MutableCompositeModification modification) {
106
107         cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
108     }
109
110     /**
111      * This method handles a BatchedModifications message for a transaction being prepared directly on the
112      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
113      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
114      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
115      *
116      * @param batched the BatchedModifications
117      * @param shardActor the transaction's shard actor
118      *
119      * @throws ExecutionException if an error occurs loading the cache
120      */
121     public boolean handleTransactionModifications(BatchedModifications batched)
122             throws ExecutionException {
123         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
124         if(cohortEntry == null) {
125             cohortEntry = new CohortEntry(batched.getTransactionID(),
126                     transactionFactory.<DOMStoreWriteTransaction>newTransaction(
127                         TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
128                         batched.getTransactionChainID()));
129             cohortCache.put(batched.getTransactionID(), cohortEntry);
130         }
131
132         if(log.isDebugEnabled()) {
133             log.debug("{}: Applying {} batched modifications for Tx {}", name,
134                     batched.getModifications().size(), batched.getTransactionID());
135         }
136
137         cohortEntry.applyModifications(batched.getModifications());
138
139         if(batched.isReady()) {
140             if(log.isDebugEnabled()) {
141                 log.debug("{}: Readying Tx {}, client version {}", name,
142                         batched.getTransactionID(), batched.getVersion());
143             }
144
145             cohortEntry.ready(cohortDecorator);
146         }
147
148         return batched.isReady();
149     }
150
151     /**
152      * This method handles the canCommit phase for a transaction.
153      *
154      * @param canCommit the CanCommitTransaction message
155      * @param sender the actor that sent the message
156      * @param shard the transaction's shard actor
157      */
158     public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
159             final ActorRef shard) {
160         String transactionID = canCommit.getTransactionID();
161         if(log.isDebugEnabled()) {
162             log.debug("{}: Processing canCommit for transaction {} for shard {}",
163                     name, transactionID, shard.path());
164         }
165
166         // Lookup the cohort entry that was cached previously (or should have been) by
167         // transactionReady (via the ForwardedReadyTransaction message).
168         final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
169         if(cohortEntry == null) {
170             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
171             // between canCommit and ready and the entry was expired from the cache.
172             IllegalStateException ex = new IllegalStateException(
173                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
174             log.error(ex.getMessage());
175             sender.tell(new Status.Failure(ex), shard);
176             return;
177         }
178
179         cohortEntry.setCanCommitSender(sender);
180         cohortEntry.setShard(shard);
181
182         if(currentCohortEntry != null) {
183             // There's already a Tx commit in progress - attempt to queue this entry to be
184             // committed after the current Tx completes.
185             log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
186                     name, currentCohortEntry.getTransactionID(), transactionID);
187
188             if(queuedCohortEntries.size() < queueCapacity) {
189                 queuedCohortEntries.offer(cohortEntry);
190             } else {
191                 removeCohortEntry(transactionID);
192
193                 RuntimeException ex = new RuntimeException(
194                         String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
195                                       " capacity %d has been reached.",
196                                       name, transactionID, queueCapacity));
197                 log.error(ex.getMessage());
198                 sender.tell(new Status.Failure(ex), shard);
199             }
200         } else {
201             // No Tx commit currently in progress - make this the current entry and proceed with
202             // canCommit.
203             cohortEntry.updateLastAccessTime();
204             currentCohortEntry = cohortEntry;
205
206             doCanCommit(cohortEntry);
207         }
208     }
209
210     private void doCanCommit(final CohortEntry cohortEntry) {
211
212         try {
213             // We block on the future here so we don't have to worry about possibly accessing our
214             // state on a different thread outside of our dispatcher. Also, the data store
215             // currently uses a same thread executor anyway.
216             Boolean canCommit = cohortEntry.getCohort().canCommit().get();
217
218             cohortEntry.getCanCommitSender().tell(
219                     canCommit ? CanCommitTransactionReply.YES.toSerializable() :
220                         CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
221
222             if(!canCommit) {
223                 // Remove the entry from the cache now since the Tx will be aborted.
224                 removeCohortEntry(cohortEntry.getTransactionID());
225             }
226         } catch (InterruptedException | ExecutionException e) {
227             log.debug("{}: An exception occurred during canCommit: {}", name, e);
228
229             // Remove the entry from the cache now since the Tx will be aborted.
230             removeCohortEntry(cohortEntry.getTransactionID());
231             cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
232         }
233     }
234
235     /**
236      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
237      * matches the current entry.
238      *
239      * @param transactionID the ID of the transaction
240      * @return the current CohortEntry or null if the given transaction ID does not match the
241      *         current entry.
242      */
243     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
244         if(isCurrentTransaction(transactionID)) {
245             return currentCohortEntry;
246         }
247
248         return null;
249     }
250
251     public CohortEntry getCurrentCohortEntry() {
252         return currentCohortEntry;
253     }
254
255     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
256         CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
257         cohortCache.invalidate(transactionID);
258         return cohortEntry;
259     }
260
261     public void removeCohortEntry(String transactionID) {
262         cohortCache.invalidate(transactionID);
263     }
264
265     public boolean isCurrentTransaction(String transactionID) {
266         return currentCohortEntry != null &&
267                 currentCohortEntry.getTransactionID().equals(transactionID);
268     }
269
270     /**
271      * This method is called when a transaction is complete, successful or not. If the given
272      * given transaction ID matches the current in-progress transaction, the next cohort entry,
273      * if any, is dequeued and processed.
274      *
275      * @param transactionID the ID of the completed transaction
276      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
277      *        the cache.
278      */
279     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
280         if(removeCohortEntry) {
281             removeCohortEntry(transactionID);
282         }
283
284         if(isCurrentTransaction(transactionID)) {
285             // Dequeue the next cohort entry waiting in the queue.
286             currentCohortEntry = queuedCohortEntries.poll();
287             if(currentCohortEntry != null) {
288                 currentCohortEntry.updateLastAccessTime();
289                 doCanCommit(currentCohortEntry);
290             }
291         }
292     }
293
294     @VisibleForTesting
295     void setCohortDecorator(CohortDecorator cohortDecorator) {
296         this.cohortDecorator = cohortDecorator;
297     }
298
299
300     static class CohortEntry {
301         private final String transactionID;
302         private DOMStoreThreePhaseCommitCohort cohort;
303         private final MutableCompositeModification compositeModification;
304         private final DOMStoreWriteTransaction transaction;
305         private ActorRef canCommitSender;
306         private ActorRef shard;
307         private long lastAccessTime;
308
309         CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
310             this.compositeModification = new MutableCompositeModification();
311             this.transaction = transaction;
312             this.transactionID = transactionID;
313         }
314
315         CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
316                 MutableCompositeModification compositeModification) {
317             this.transactionID = transactionID;
318             this.cohort = cohort;
319             this.compositeModification = compositeModification;
320             this.transaction = null;
321         }
322
323         void updateLastAccessTime() {
324             lastAccessTime = System.currentTimeMillis();
325         }
326
327         long getLastAccessTime() {
328             return lastAccessTime;
329         }
330
331         String getTransactionID() {
332             return transactionID;
333         }
334
335         DOMStoreThreePhaseCommitCohort getCohort() {
336             return cohort;
337         }
338
339         MutableCompositeModification getModification() {
340             return compositeModification;
341         }
342
343         void applyModifications(Iterable<Modification> modifications) {
344             for(Modification modification: modifications) {
345                 compositeModification.addModification(modification);
346                 modification.apply(transaction);
347             }
348         }
349
350         void ready(CohortDecorator cohortDecorator) {
351             Preconditions.checkState(cohort == null, "cohort was already set");
352
353             cohort = transaction.ready();
354
355             if(cohortDecorator != null) {
356                 // Call the hook for unit tests.
357                 cohort = cohortDecorator.decorate(transactionID, cohort);
358             }
359         }
360
361         ActorRef getCanCommitSender() {
362             return canCommitSender;
363         }
364
365         void setCanCommitSender(ActorRef canCommitSender) {
366             this.canCommitSender = canCommitSender;
367         }
368
369         ActorRef getShard() {
370             return shard;
371         }
372
373         void setShard(ActorRef shard) {
374             this.shard = shard;
375         }
376
377         boolean hasModifications(){
378             return compositeModification.getModifications().size() > 0;
379         }
380     }
381 }