Merge "BUG 2854 : Do not add empty read write transactions to the replicable journal"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.cache.Cache;
13 import com.google.common.cache.CacheBuilder;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
21 import org.opendaylight.controller.cluster.datastore.modification.Modification;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
23 import org.slf4j.Logger;
24
25 /**
26  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
27  *
28  * @author Thomas Pantelis
29  */
30 public class ShardCommitCoordinator {
31
32     private final Cache<String, CohortEntry> cohortCache;
33
34     private CohortEntry currentCohortEntry;
35
36     private final Queue<CohortEntry> queuedCohortEntries;
37
38     private int queueCapacity;
39
40     private final Logger log;
41
42     private final String name;
43
44     public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
45             String name) {
46         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
47                 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
48
49         this.queueCapacity = queueCapacity;
50         this.log = log;
51         this.name = name;
52
53         // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
54         // since this should only be accessed on the shard's dispatcher.
55         queuedCohortEntries = new LinkedList<>();
56     }
57
58     public void setQueueCapacity(int queueCapacity) {
59         this.queueCapacity = queueCapacity;
60     }
61
62     /**
63      * This method caches a cohort entry for the given transactions ID in preparation for the
64      * subsequent 3-phase commit.
65      *
66      * @param transactionID the ID of the transaction
67      * @param cohort the cohort to participate in the transaction commit
68      * @param modification the modification made by the transaction
69      */
70     public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
71             Modification modification) {
72
73         cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
74     }
75
76     /**
77      * This method handles the canCommit phase for a transaction.
78      *
79      * @param canCommit the CanCommitTransaction message
80      * @param sender the actor that sent the message
81      * @param shard the transaction's shard actor
82      */
83     public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
84             final ActorRef shard) {
85         String transactionID = canCommit.getTransactionID();
86         if(log.isDebugEnabled()) {
87             log.debug("{}: Processing canCommit for transaction {} for shard {}",
88                     name, transactionID, shard.path());
89         }
90
91         // Lookup the cohort entry that was cached previously (or should have been) by
92         // transactionReady (via the ForwardedReadyTransaction message).
93         final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
94         if(cohortEntry == null) {
95             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
96             // between canCommit and ready and the entry was expired from the cache.
97             IllegalStateException ex = new IllegalStateException(
98                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
99             log.error(ex.getMessage());
100             sender.tell(new Status.Failure(ex), shard);
101             return;
102         }
103
104         cohortEntry.setCanCommitSender(sender);
105         cohortEntry.setShard(shard);
106
107         if(currentCohortEntry != null) {
108             // There's already a Tx commit in progress - attempt to queue this entry to be
109             // committed after the current Tx completes.
110             log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
111                     name, currentCohortEntry.getTransactionID(), transactionID);
112
113             if(queuedCohortEntries.size() < queueCapacity) {
114                 queuedCohortEntries.offer(cohortEntry);
115             } else {
116                 removeCohortEntry(transactionID);
117
118                 RuntimeException ex = new RuntimeException(
119                         String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
120                                       " capacity %d has been reached.",
121                                       name, transactionID, queueCapacity));
122                 log.error(ex.getMessage());
123                 sender.tell(new Status.Failure(ex), shard);
124             }
125         } else {
126             // No Tx commit currently in progress - make this the current entry and proceed with
127             // canCommit.
128             cohortEntry.updateLastAccessTime();
129             currentCohortEntry = cohortEntry;
130
131             doCanCommit(cohortEntry);
132         }
133     }
134
135     private void doCanCommit(final CohortEntry cohortEntry) {
136
137         try {
138             // We block on the future here so we don't have to worry about possibly accessing our
139             // state on a different thread outside of our dispatcher. Also, the data store
140             // currently uses a same thread executor anyway.
141             Boolean canCommit = cohortEntry.getCohort().canCommit().get();
142
143             cohortEntry.getCanCommitSender().tell(
144                     canCommit ? CanCommitTransactionReply.YES.toSerializable() :
145                         CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
146
147             if(!canCommit) {
148                 // Remove the entry from the cache now since the Tx will be aborted.
149                 removeCohortEntry(cohortEntry.getTransactionID());
150             }
151         } catch (InterruptedException | ExecutionException e) {
152             log.debug("{}: An exception occurred during canCommit: {}", name, e);
153
154             // Remove the entry from the cache now since the Tx will be aborted.
155             removeCohortEntry(cohortEntry.getTransactionID());
156             cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
157         }
158     }
159
160     /**
161      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
162      * matches the current entry.
163      *
164      * @param transactionID the ID of the transaction
165      * @return the current CohortEntry or null if the given transaction ID does not match the
166      *         current entry.
167      */
168     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
169         if(isCurrentTransaction(transactionID)) {
170             return currentCohortEntry;
171         }
172
173         return null;
174     }
175
176     public CohortEntry getCurrentCohortEntry() {
177         return currentCohortEntry;
178     }
179
180     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
181         CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
182         cohortCache.invalidate(transactionID);
183         return cohortEntry;
184     }
185
186     public void removeCohortEntry(String transactionID) {
187         cohortCache.invalidate(transactionID);
188     }
189
190     public boolean isCurrentTransaction(String transactionID) {
191         return currentCohortEntry != null &&
192                 currentCohortEntry.getTransactionID().equals(transactionID);
193     }
194
195     /**
196      * This method is called when a transaction is complete, successful or not. If the given
197      * given transaction ID matches the current in-progress transaction, the next cohort entry,
198      * if any, is dequeued and processed.
199      *
200      * @param transactionID the ID of the completed transaction
201      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
202      *        the cache.
203      */
204     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
205         if(removeCohortEntry) {
206             removeCohortEntry(transactionID);
207         }
208
209         if(isCurrentTransaction(transactionID)) {
210             // Dequeue the next cohort entry waiting in the queue.
211             currentCohortEntry = queuedCohortEntries.poll();
212             if(currentCohortEntry != null) {
213                 currentCohortEntry.updateLastAccessTime();
214                 doCanCommit(currentCohortEntry);
215             }
216         }
217     }
218
219     static class CohortEntry {
220         private final String transactionID;
221         private final DOMStoreThreePhaseCommitCohort cohort;
222         private final Modification modification;
223         private ActorRef canCommitSender;
224         private ActorRef shard;
225         private long lastAccessTime;
226
227         CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
228                 Modification modification) {
229             this.transactionID = transactionID;
230             this.cohort = cohort;
231             this.modification = modification;
232         }
233
234         void updateLastAccessTime() {
235             lastAccessTime = System.currentTimeMillis();
236         }
237
238         long getLastAccessTime() {
239             return lastAccessTime;
240         }
241
242         String getTransactionID() {
243             return transactionID;
244         }
245
246         DOMStoreThreePhaseCommitCohort getCohort() {
247             return cohort;
248         }
249
250         Modification getModification() {
251             return modification;
252         }
253
254         ActorRef getCanCommitSender() {
255             return canCommitSender;
256         }
257
258         void setCanCommitSender(ActorRef canCommitSender) {
259             this.canCommitSender = canCommitSender;
260         }
261
262         ActorRef getShard() {
263             return shard;
264         }
265
266         void setShard(ActorRef shard) {
267             this.shard = shard;
268         }
269
270         boolean hasModifications(){
271             if(modification instanceof CompositeModification){
272                 return ((CompositeModification) modification).getModifications().size() > 0;
273             }
274             return true;
275         }
276     }
277 }