Merge changes I3e404877,Ida2a5c32,I9e6ce426,I6a4b90f6,I79717533
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.cache.Cache;
13 import com.google.common.cache.CacheBuilder;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.modification.Modification;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 /**
26  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
27  *
28  * @author Thomas Pantelis
29  */
30 public class ShardCommitCoordinator {
31
32     private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
33
34     private final Cache<String, CohortEntry> cohortCache;
35
36     private CohortEntry currentCohortEntry;
37
38     private final Queue<CohortEntry> queuedCohortEntries;
39
40     private final int queueCapacity;
41
42     public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
43         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
44                 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
45
46         this.queueCapacity = queueCapacity;
47
48         // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
49         // since this should only be accessed on the shard's dispatcher.
50         queuedCohortEntries = new LinkedList<>();
51     }
52
53     /**
54      * This method caches a cohort entry for the given transactions ID in preparation for the
55      * subsequent 3-phase commit.
56      *
57      * @param transactionID the ID of the transaction
58      * @param cohort the cohort to participate in the transaction commit
59      * @param modification the modification made by the transaction
60      */
61     public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
62             Modification modification) {
63
64         cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
65     }
66
67     /**
68      * This method handles the canCommit phase for a transaction.
69      *
70      * @param canCommit the CanCommitTransaction message
71      * @param sender the actor that sent the message
72      * @param shard the transaction's shard actor
73      */
74     public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
75             final ActorRef shard) {
76         String transactionID = canCommit.getTransactionID();
77         if(LOG.isDebugEnabled()) {
78             LOG.debug("Processing canCommit for transaction {} for shard {}",
79                     transactionID, shard.path());
80         }
81
82         // Lookup the cohort entry that was cached previously (or should have been) by
83         // transactionReady (via the ForwardedReadyTransaction message).
84         final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
85         if(cohortEntry == null) {
86             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
87             // between canCommit and ready and the entry was expired from the cache.
88             IllegalStateException ex = new IllegalStateException(
89                     String.format("No cohort entry found for transaction %s", transactionID));
90             LOG.error(ex.getMessage());
91             sender.tell(new Status.Failure(ex), shard);
92             return;
93         }
94
95         cohortEntry.setCanCommitSender(sender);
96         cohortEntry.setShard(shard);
97
98         if(currentCohortEntry != null) {
99             // There's already a Tx commit in progress - attempt to queue this entry to be
100             // committed after the current Tx completes.
101             LOG.debug("Transaction {} is already in progress - queueing transaction {}",
102                     currentCohortEntry.getTransactionID(), transactionID);
103
104             if(queuedCohortEntries.size() < queueCapacity) {
105                 queuedCohortEntries.offer(cohortEntry);
106             } else {
107                 removeCohortEntry(transactionID);
108
109                 RuntimeException ex = new RuntimeException(
110                         String.format("Could not enqueue transaction %s - the maximum commit queue"+
111                                       " capacity %d has been reached.",
112                                 transactionID, queueCapacity));
113                 LOG.error(ex.getMessage());
114                 sender.tell(new Status.Failure(ex), shard);
115             }
116         } else {
117             // No Tx commit currently in progress - make this the current entry and proceed with
118             // canCommit.
119             cohortEntry.updateLastAccessTime();
120             currentCohortEntry = cohortEntry;
121
122             doCanCommit(cohortEntry);
123         }
124     }
125
126     private void doCanCommit(final CohortEntry cohortEntry) {
127
128         try {
129             // We block on the future here so we don't have to worry about possibly accessing our
130             // state on a different thread outside of our dispatcher. Also, the data store
131             // currently uses a same thread executor anyway.
132             Boolean canCommit = cohortEntry.getCohort().canCommit().get();
133
134             cohortEntry.getCanCommitSender().tell(
135                     canCommit ? CanCommitTransactionReply.YES.toSerializable() :
136                         CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
137
138             if(!canCommit) {
139                 // Remove the entry from the cache now since the Tx will be aborted.
140                 removeCohortEntry(cohortEntry.getTransactionID());
141             }
142         } catch (InterruptedException | ExecutionException e) {
143             LOG.debug("An exception occurred during canCommit", e);
144
145             // Remove the entry from the cache now since the Tx will be aborted.
146             removeCohortEntry(cohortEntry.getTransactionID());
147             cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
148         }
149     }
150
151     /**
152      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
153      * matches the current entry.
154      *
155      * @param transactionID the ID of the transaction
156      * @return the current CohortEntry or null if the given transaction ID does not match the
157      *         current entry.
158      */
159     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
160         if(isCurrentTransaction(transactionID)) {
161             return currentCohortEntry;
162         }
163
164         return null;
165     }
166
167     public CohortEntry getCurrentCohortEntry() {
168         return currentCohortEntry;
169     }
170
171     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
172         CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
173         cohortCache.invalidate(transactionID);
174         return cohortEntry;
175     }
176
177     public void removeCohortEntry(String transactionID) {
178         cohortCache.invalidate(transactionID);
179     }
180
181     public boolean isCurrentTransaction(String transactionID) {
182         return currentCohortEntry != null &&
183                 currentCohortEntry.getTransactionID().equals(transactionID);
184     }
185
186     /**
187      * This method is called when a transaction is complete, successful or not. If the given
188      * given transaction ID matches the current in-progress transaction, the next cohort entry,
189      * if any, is dequeued and processed.
190      *
191      * @param transactionID the ID of the completed transaction
192      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
193      *        the cache.
194      */
195     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
196         if(removeCohortEntry) {
197             removeCohortEntry(transactionID);
198         }
199
200         if(isCurrentTransaction(transactionID)) {
201             // Dequeue the next cohort entry waiting in the queue.
202             currentCohortEntry = queuedCohortEntries.poll();
203             if(currentCohortEntry != null) {
204                 doCanCommit(currentCohortEntry);
205             }
206         }
207     }
208
209     static class CohortEntry {
210         private final String transactionID;
211         private final DOMStoreThreePhaseCommitCohort cohort;
212         private final Modification modification;
213         private ActorRef canCommitSender;
214         private ActorRef shard;
215         private long lastAccessTime;
216
217         CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
218                 Modification modification) {
219             this.transactionID = transactionID;
220             this.cohort = cohort;
221             this.modification = modification;
222         }
223
224         void updateLastAccessTime() {
225             lastAccessTime = System.currentTimeMillis();
226         }
227
228         long getLastAccessTime() {
229             return lastAccessTime;
230         }
231
232         String getTransactionID() {
233             return transactionID;
234         }
235
236         DOMStoreThreePhaseCommitCohort getCohort() {
237             return cohort;
238         }
239
240         Modification getModification() {
241             return modification;
242         }
243
244         ActorRef getCanCommitSender() {
245             return canCommitSender;
246         }
247
248         void setCanCommitSender(ActorRef canCommitSender) {
249             this.canCommitSender = canCommitSender;
250         }
251
252         ActorRef getShard() {
253             return shard;
254         }
255
256         void setShard(ActorRef shard) {
257             this.shard = shard;
258         }
259     }
260 }