Do not use ActorSystem.actorFor as it is deprecated
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.cache.Cache;
13 import com.google.common.cache.CacheBuilder;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.modification.Modification;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.slf4j.Logger;
23
24 /**
25  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
26  *
27  * @author Thomas Pantelis
28  */
29 public class ShardCommitCoordinator {
30
31     private final Cache<String, CohortEntry> cohortCache;
32
33     private CohortEntry currentCohortEntry;
34
35     private final Queue<CohortEntry> queuedCohortEntries;
36
37     private int queueCapacity;
38
39     private final Logger log;
40
41     private final String name;
42
43     public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
44             String name) {
45         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
46                 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
47
48         this.queueCapacity = queueCapacity;
49         this.log = log;
50         this.name = name;
51
52         // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
53         // since this should only be accessed on the shard's dispatcher.
54         queuedCohortEntries = new LinkedList<>();
55     }
56
57     public void setQueueCapacity(int queueCapacity) {
58         this.queueCapacity = queueCapacity;
59     }
60
61     /**
62      * This method caches a cohort entry for the given transactions ID in preparation for the
63      * subsequent 3-phase commit.
64      *
65      * @param transactionID the ID of the transaction
66      * @param cohort the cohort to participate in the transaction commit
67      * @param modification the modification made by the transaction
68      */
69     public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
70             Modification modification) {
71
72         cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
73     }
74
75     /**
76      * This method handles the canCommit phase for a transaction.
77      *
78      * @param canCommit the CanCommitTransaction message
79      * @param sender the actor that sent the message
80      * @param shard the transaction's shard actor
81      */
82     public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
83             final ActorRef shard) {
84         String transactionID = canCommit.getTransactionID();
85         if(log.isDebugEnabled()) {
86             log.debug("{}: Processing canCommit for transaction {} for shard {}",
87                     name, transactionID, shard.path());
88         }
89
90         // Lookup the cohort entry that was cached previously (or should have been) by
91         // transactionReady (via the ForwardedReadyTransaction message).
92         final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
93         if(cohortEntry == null) {
94             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
95             // between canCommit and ready and the entry was expired from the cache.
96             IllegalStateException ex = new IllegalStateException(
97                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
98             log.error(ex.getMessage());
99             sender.tell(new Status.Failure(ex), shard);
100             return;
101         }
102
103         cohortEntry.setCanCommitSender(sender);
104         cohortEntry.setShard(shard);
105
106         if(currentCohortEntry != null) {
107             // There's already a Tx commit in progress - attempt to queue this entry to be
108             // committed after the current Tx completes.
109             log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
110                     name, currentCohortEntry.getTransactionID(), transactionID);
111
112             if(queuedCohortEntries.size() < queueCapacity) {
113                 queuedCohortEntries.offer(cohortEntry);
114             } else {
115                 removeCohortEntry(transactionID);
116
117                 RuntimeException ex = new RuntimeException(
118                         String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
119                                       " capacity %d has been reached.",
120                                       name, transactionID, queueCapacity));
121                 log.error(ex.getMessage());
122                 sender.tell(new Status.Failure(ex), shard);
123             }
124         } else {
125             // No Tx commit currently in progress - make this the current entry and proceed with
126             // canCommit.
127             cohortEntry.updateLastAccessTime();
128             currentCohortEntry = cohortEntry;
129
130             doCanCommit(cohortEntry);
131         }
132     }
133
134     private void doCanCommit(final CohortEntry cohortEntry) {
135
136         try {
137             // We block on the future here so we don't have to worry about possibly accessing our
138             // state on a different thread outside of our dispatcher. Also, the data store
139             // currently uses a same thread executor anyway.
140             Boolean canCommit = cohortEntry.getCohort().canCommit().get();
141
142             cohortEntry.getCanCommitSender().tell(
143                     canCommit ? CanCommitTransactionReply.YES.toSerializable() :
144                         CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
145
146             if(!canCommit) {
147                 // Remove the entry from the cache now since the Tx will be aborted.
148                 removeCohortEntry(cohortEntry.getTransactionID());
149             }
150         } catch (InterruptedException | ExecutionException e) {
151             log.debug("{}: An exception occurred during canCommit: {}", name, e);
152
153             // Remove the entry from the cache now since the Tx will be aborted.
154             removeCohortEntry(cohortEntry.getTransactionID());
155             cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
156         }
157     }
158
159     /**
160      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
161      * matches the current entry.
162      *
163      * @param transactionID the ID of the transaction
164      * @return the current CohortEntry or null if the given transaction ID does not match the
165      *         current entry.
166      */
167     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
168         if(isCurrentTransaction(transactionID)) {
169             return currentCohortEntry;
170         }
171
172         return null;
173     }
174
175     public CohortEntry getCurrentCohortEntry() {
176         return currentCohortEntry;
177     }
178
179     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
180         CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
181         cohortCache.invalidate(transactionID);
182         return cohortEntry;
183     }
184
185     public void removeCohortEntry(String transactionID) {
186         cohortCache.invalidate(transactionID);
187     }
188
189     public boolean isCurrentTransaction(String transactionID) {
190         return currentCohortEntry != null &&
191                 currentCohortEntry.getTransactionID().equals(transactionID);
192     }
193
194     /**
195      * This method is called when a transaction is complete, successful or not. If the given
196      * given transaction ID matches the current in-progress transaction, the next cohort entry,
197      * if any, is dequeued and processed.
198      *
199      * @param transactionID the ID of the completed transaction
200      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
201      *        the cache.
202      */
203     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
204         if(removeCohortEntry) {
205             removeCohortEntry(transactionID);
206         }
207
208         if(isCurrentTransaction(transactionID)) {
209             // Dequeue the next cohort entry waiting in the queue.
210             currentCohortEntry = queuedCohortEntries.poll();
211             if(currentCohortEntry != null) {
212                 currentCohortEntry.updateLastAccessTime();
213                 doCanCommit(currentCohortEntry);
214             }
215         }
216     }
217
218     static class CohortEntry {
219         private final String transactionID;
220         private final DOMStoreThreePhaseCommitCohort cohort;
221         private final Modification modification;
222         private ActorRef canCommitSender;
223         private ActorRef shard;
224         private long lastAccessTime;
225
226         CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
227                 Modification modification) {
228             this.transactionID = transactionID;
229             this.cohort = cohort;
230             this.modification = modification;
231         }
232
233         void updateLastAccessTime() {
234             lastAccessTime = System.currentTimeMillis();
235         }
236
237         long getLastAccessTime() {
238             return lastAccessTime;
239         }
240
241         String getTransactionID() {
242             return transactionID;
243         }
244
245         DOMStoreThreePhaseCommitCohort getCohort() {
246             return cohort;
247         }
248
249         Modification getModification() {
250             return modification;
251         }
252
253         ActorRef getCanCommitSender() {
254             return canCommitSender;
255         }
256
257         void setCanCommitSender(ActorRef canCommitSender) {
258             this.canCommitSender = canCommitSender;
259         }
260
261         ActorRef getShard() {
262             return shard;
263         }
264
265         void setShard(ActorRef shard) {
266             this.shard = shard;
267         }
268     }
269 }