Merge "Do not use ActorSystem.actorFor as it is deprecated"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.cache.Cache;
16 import com.google.common.cache.CacheBuilder;
17 import com.google.common.cache.RemovalCause;
18 import com.google.common.cache.RemovalListener;
19 import com.google.common.cache.RemovalNotification;
20 import java.util.LinkedList;
21 import java.util.Queue;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.modification.Modification;
29 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.slf4j.Logger;
33
34 /**
35  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
36  *
37  * @author Thomas Pantelis
38  */
39 public class ShardCommitCoordinator {
40
41     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
42     public interface CohortDecorator {
43         DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
44     }
45
46     private final Cache<String, CohortEntry> cohortCache;
47
48     private CohortEntry currentCohortEntry;
49
50     private final DOMTransactionFactory transactionFactory;
51
52     private final Queue<CohortEntry> queuedCohortEntries;
53
54     private int queueCapacity;
55
56     private final Logger log;
57
58     private final String name;
59
60     private final String shardActorPath;
61
62     private final RemovalListener<String, CohortEntry> cacheRemovalListener =
63             new RemovalListener<String, CohortEntry>() {
64                 @Override
65                 public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
66                     if(notification.getCause() == RemovalCause.EXPIRED) {
67                         log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
68                     }
69                 }
70             };
71
72     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
73     private CohortDecorator cohortDecorator;
74
75     public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
76             long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
77
78         this.queueCapacity = queueCapacity;
79         this.log = log;
80         this.name = name;
81         this.transactionFactory = transactionFactory;
82
83         shardActorPath = Serialization.serializedActorPath(shardActor);
84
85         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
86                 removalListener(cacheRemovalListener).build();
87
88         // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
89         // since this should only be accessed on the shard's dispatcher.
90         queuedCohortEntries = new LinkedList<>();
91     }
92
93     public void setQueueCapacity(int queueCapacity) {
94         this.queueCapacity = queueCapacity;
95     }
96
97     /**
98      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
99      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
100      *
101      * @param transactionID the ID of the transaction
102      * @param cohort the cohort to participate in the transaction commit
103      * @param modification the modifications made by the transaction
104      */
105     public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
106             MutableCompositeModification modification) {
107
108         cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
109     }
110
111     /**
112      * This method handles a BatchedModifications message for a transaction being prepared directly on the
113      * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
114      * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
115      * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
116      *
117      * @param batched the BatchedModifications
118      * @param shardActor the transaction's shard actor
119      *
120      * @throws ExecutionException if an error occurs loading the cache
121      */
122     public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched)
123             throws ExecutionException {
124         CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
125         if(cohortEntry == null) {
126             cohortEntry = new CohortEntry(batched.getTransactionID(),
127                     transactionFactory.<DOMStoreWriteTransaction>newTransaction(
128                         TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
129                         batched.getTransactionChainID()));
130             cohortCache.put(batched.getTransactionID(), cohortEntry);
131         }
132
133         if(log.isDebugEnabled()) {
134             log.debug("{}: Applying {} batched modifications for Tx {}", name,
135                     batched.getModifications().size(), batched.getTransactionID());
136         }
137
138         cohortEntry.applyModifications(batched.getModifications());
139
140         String cohortPath = null;
141         if(batched.isReady()) {
142             if(log.isDebugEnabled()) {
143                 log.debug("{}: Readying Tx {}, client version {}", name,
144                         batched.getTransactionID(), batched.getVersion());
145             }
146
147             cohortEntry.ready(cohortDecorator);
148             cohortPath = shardActorPath;
149         }
150
151         return new BatchedModificationsReply(batched.getModifications().size(), cohortPath);
152     }
153
154     /**
155      * This method handles the canCommit phase for a transaction.
156      *
157      * @param canCommit the CanCommitTransaction message
158      * @param sender the actor that sent the message
159      * @param shard the transaction's shard actor
160      */
161     public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
162             final ActorRef shard) {
163         String transactionID = canCommit.getTransactionID();
164         if(log.isDebugEnabled()) {
165             log.debug("{}: Processing canCommit for transaction {} for shard {}",
166                     name, transactionID, shard.path());
167         }
168
169         // Lookup the cohort entry that was cached previously (or should have been) by
170         // transactionReady (via the ForwardedReadyTransaction message).
171         final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
172         if(cohortEntry == null) {
173             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
174             // between canCommit and ready and the entry was expired from the cache.
175             IllegalStateException ex = new IllegalStateException(
176                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
177             log.error(ex.getMessage());
178             sender.tell(new Status.Failure(ex), shard);
179             return;
180         }
181
182         cohortEntry.setCanCommitSender(sender);
183         cohortEntry.setShard(shard);
184
185         if(currentCohortEntry != null) {
186             // There's already a Tx commit in progress - attempt to queue this entry to be
187             // committed after the current Tx completes.
188             log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
189                     name, currentCohortEntry.getTransactionID(), transactionID);
190
191             if(queuedCohortEntries.size() < queueCapacity) {
192                 queuedCohortEntries.offer(cohortEntry);
193             } else {
194                 removeCohortEntry(transactionID);
195
196                 RuntimeException ex = new RuntimeException(
197                         String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
198                                       " capacity %d has been reached.",
199                                       name, transactionID, queueCapacity));
200                 log.error(ex.getMessage());
201                 sender.tell(new Status.Failure(ex), shard);
202             }
203         } else {
204             // No Tx commit currently in progress - make this the current entry and proceed with
205             // canCommit.
206             cohortEntry.updateLastAccessTime();
207             currentCohortEntry = cohortEntry;
208
209             doCanCommit(cohortEntry);
210         }
211     }
212
213     private void doCanCommit(final CohortEntry cohortEntry) {
214
215         try {
216             // We block on the future here so we don't have to worry about possibly accessing our
217             // state on a different thread outside of our dispatcher. Also, the data store
218             // currently uses a same thread executor anyway.
219             Boolean canCommit = cohortEntry.getCohort().canCommit().get();
220
221             cohortEntry.getCanCommitSender().tell(
222                     canCommit ? CanCommitTransactionReply.YES.toSerializable() :
223                         CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
224
225             if(!canCommit) {
226                 // Remove the entry from the cache now since the Tx will be aborted.
227                 removeCohortEntry(cohortEntry.getTransactionID());
228             }
229         } catch (InterruptedException | ExecutionException e) {
230             log.debug("{}: An exception occurred during canCommit: {}", name, e);
231
232             // Remove the entry from the cache now since the Tx will be aborted.
233             removeCohortEntry(cohortEntry.getTransactionID());
234             cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
235         }
236     }
237
238     /**
239      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
240      * matches the current entry.
241      *
242      * @param transactionID the ID of the transaction
243      * @return the current CohortEntry or null if the given transaction ID does not match the
244      *         current entry.
245      */
246     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
247         if(isCurrentTransaction(transactionID)) {
248             return currentCohortEntry;
249         }
250
251         return null;
252     }
253
254     public CohortEntry getCurrentCohortEntry() {
255         return currentCohortEntry;
256     }
257
258     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
259         CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
260         cohortCache.invalidate(transactionID);
261         return cohortEntry;
262     }
263
264     public void removeCohortEntry(String transactionID) {
265         cohortCache.invalidate(transactionID);
266     }
267
268     public boolean isCurrentTransaction(String transactionID) {
269         return currentCohortEntry != null &&
270                 currentCohortEntry.getTransactionID().equals(transactionID);
271     }
272
273     /**
274      * This method is called when a transaction is complete, successful or not. If the given
275      * given transaction ID matches the current in-progress transaction, the next cohort entry,
276      * if any, is dequeued and processed.
277      *
278      * @param transactionID the ID of the completed transaction
279      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
280      *        the cache.
281      */
282     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
283         if(removeCohortEntry) {
284             removeCohortEntry(transactionID);
285         }
286
287         if(isCurrentTransaction(transactionID)) {
288             // Dequeue the next cohort entry waiting in the queue.
289             currentCohortEntry = queuedCohortEntries.poll();
290             if(currentCohortEntry != null) {
291                 currentCohortEntry.updateLastAccessTime();
292                 doCanCommit(currentCohortEntry);
293             }
294         }
295     }
296
297     @VisibleForTesting
298     void setCohortDecorator(CohortDecorator cohortDecorator) {
299         this.cohortDecorator = cohortDecorator;
300     }
301
302
303     static class CohortEntry {
304         private final String transactionID;
305         private DOMStoreThreePhaseCommitCohort cohort;
306         private final MutableCompositeModification compositeModification;
307         private final DOMStoreWriteTransaction transaction;
308         private ActorRef canCommitSender;
309         private ActorRef shard;
310         private long lastAccessTime;
311
312         CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
313             this.compositeModification = new MutableCompositeModification();
314             this.transaction = transaction;
315             this.transactionID = transactionID;
316         }
317
318         CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
319                 MutableCompositeModification compositeModification) {
320             this.transactionID = transactionID;
321             this.cohort = cohort;
322             this.compositeModification = compositeModification;
323             this.transaction = null;
324         }
325
326         void updateLastAccessTime() {
327             lastAccessTime = System.currentTimeMillis();
328         }
329
330         long getLastAccessTime() {
331             return lastAccessTime;
332         }
333
334         String getTransactionID() {
335             return transactionID;
336         }
337
338         DOMStoreThreePhaseCommitCohort getCohort() {
339             return cohort;
340         }
341
342         MutableCompositeModification getModification() {
343             return compositeModification;
344         }
345
346         void applyModifications(Iterable<Modification> modifications) {
347             for(Modification modification: modifications) {
348                 compositeModification.addModification(modification);
349                 modification.apply(transaction);
350             }
351         }
352
353         void ready(CohortDecorator cohortDecorator) {
354             Preconditions.checkState(cohort == null, "cohort was already set");
355
356             cohort = transaction.ready();
357
358             if(cohortDecorator != null) {
359                 // Call the hook for unit tests.
360                 cohort = cohortDecorator.decorate(transactionID, cohort);
361             }
362         }
363
364         ActorRef getCanCommitSender() {
365             return canCommitSender;
366         }
367
368         void setCanCommitSender(ActorRef canCommitSender) {
369             this.canCommitSender = canCommitSender;
370         }
371
372         ActorRef getShard() {
373             return shard;
374         }
375
376         void setShard(ActorRef shard) {
377             this.shard = shard;
378         }
379
380         boolean hasModifications(){
381             return compositeModification.getModifications().size() > 0;
382         }
383     }
384 }