Merge "Do not duplicate OSGi dependencyManagement"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.cache.Cache;
13 import com.google.common.cache.CacheBuilder;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.modification.Modification;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.slf4j.Logger;
23
24 /**
25  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
26  *
27  * @author Thomas Pantelis
28  */
29 public class ShardCommitCoordinator {
30
31     private final Cache<String, CohortEntry> cohortCache;
32
33     private CohortEntry currentCohortEntry;
34
35     private final Queue<CohortEntry> queuedCohortEntries;
36
37     private final int queueCapacity;
38
39     private final Logger log;
40
41     private final String name;
42
43     public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
44             String name) {
45         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
46                 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
47
48         this.queueCapacity = queueCapacity;
49         this.log = log;
50         this.name = name;
51
52         // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
53         // since this should only be accessed on the shard's dispatcher.
54         queuedCohortEntries = new LinkedList<>();
55     }
56
57     /**
58      * This method caches a cohort entry for the given transactions ID in preparation for the
59      * subsequent 3-phase commit.
60      *
61      * @param transactionID the ID of the transaction
62      * @param cohort the cohort to participate in the transaction commit
63      * @param modification the modification made by the transaction
64      */
65     public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
66             Modification modification) {
67
68         cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
69     }
70
71     /**
72      * This method handles the canCommit phase for a transaction.
73      *
74      * @param canCommit the CanCommitTransaction message
75      * @param sender the actor that sent the message
76      * @param shard the transaction's shard actor
77      */
78     public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
79             final ActorRef shard) {
80         String transactionID = canCommit.getTransactionID();
81         if(log.isDebugEnabled()) {
82             log.debug("{}: Processing canCommit for transaction {} for shard {}",
83                     name, transactionID, shard.path());
84         }
85
86         // Lookup the cohort entry that was cached previously (or should have been) by
87         // transactionReady (via the ForwardedReadyTransaction message).
88         final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
89         if(cohortEntry == null) {
90             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
91             // between canCommit and ready and the entry was expired from the cache.
92             IllegalStateException ex = new IllegalStateException(
93                     String.format("%s: No cohort entry found for transaction %s", name, transactionID));
94             log.error(ex.getMessage());
95             sender.tell(new Status.Failure(ex), shard);
96             return;
97         }
98
99         cohortEntry.setCanCommitSender(sender);
100         cohortEntry.setShard(shard);
101
102         if(currentCohortEntry != null) {
103             // There's already a Tx commit in progress - attempt to queue this entry to be
104             // committed after the current Tx completes.
105             log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
106                     name, currentCohortEntry.getTransactionID(), transactionID);
107
108             if(queuedCohortEntries.size() < queueCapacity) {
109                 queuedCohortEntries.offer(cohortEntry);
110             } else {
111                 removeCohortEntry(transactionID);
112
113                 RuntimeException ex = new RuntimeException(
114                         String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
115                                       " capacity %d has been reached.",
116                                       name, transactionID, queueCapacity));
117                 log.error(ex.getMessage());
118                 sender.tell(new Status.Failure(ex), shard);
119             }
120         } else {
121             // No Tx commit currently in progress - make this the current entry and proceed with
122             // canCommit.
123             cohortEntry.updateLastAccessTime();
124             currentCohortEntry = cohortEntry;
125
126             doCanCommit(cohortEntry);
127         }
128     }
129
130     private void doCanCommit(final CohortEntry cohortEntry) {
131
132         try {
133             // We block on the future here so we don't have to worry about possibly accessing our
134             // state on a different thread outside of our dispatcher. Also, the data store
135             // currently uses a same thread executor anyway.
136             Boolean canCommit = cohortEntry.getCohort().canCommit().get();
137
138             cohortEntry.getCanCommitSender().tell(
139                     canCommit ? CanCommitTransactionReply.YES.toSerializable() :
140                         CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
141
142             if(!canCommit) {
143                 // Remove the entry from the cache now since the Tx will be aborted.
144                 removeCohortEntry(cohortEntry.getTransactionID());
145             }
146         } catch (InterruptedException | ExecutionException e) {
147             log.debug("{}: An exception occurred during canCommit: {}", name, e);
148
149             // Remove the entry from the cache now since the Tx will be aborted.
150             removeCohortEntry(cohortEntry.getTransactionID());
151             cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
152         }
153     }
154
155     /**
156      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
157      * matches the current entry.
158      *
159      * @param transactionID the ID of the transaction
160      * @return the current CohortEntry or null if the given transaction ID does not match the
161      *         current entry.
162      */
163     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
164         if(isCurrentTransaction(transactionID)) {
165             return currentCohortEntry;
166         }
167
168         return null;
169     }
170
171     public CohortEntry getCurrentCohortEntry() {
172         return currentCohortEntry;
173     }
174
175     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
176         CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
177         cohortCache.invalidate(transactionID);
178         return cohortEntry;
179     }
180
181     public void removeCohortEntry(String transactionID) {
182         cohortCache.invalidate(transactionID);
183     }
184
185     public boolean isCurrentTransaction(String transactionID) {
186         return currentCohortEntry != null &&
187                 currentCohortEntry.getTransactionID().equals(transactionID);
188     }
189
190     /**
191      * This method is called when a transaction is complete, successful or not. If the given
192      * given transaction ID matches the current in-progress transaction, the next cohort entry,
193      * if any, is dequeued and processed.
194      *
195      * @param transactionID the ID of the completed transaction
196      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
197      *        the cache.
198      */
199     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
200         if(removeCohortEntry) {
201             removeCohortEntry(transactionID);
202         }
203
204         if(isCurrentTransaction(transactionID)) {
205             // Dequeue the next cohort entry waiting in the queue.
206             currentCohortEntry = queuedCohortEntries.poll();
207             if(currentCohortEntry != null) {
208                 currentCohortEntry.updateLastAccessTime();
209                 doCanCommit(currentCohortEntry);
210             }
211         }
212     }
213
214     static class CohortEntry {
215         private final String transactionID;
216         private final DOMStoreThreePhaseCommitCohort cohort;
217         private final Modification modification;
218         private ActorRef canCommitSender;
219         private ActorRef shard;
220         private long lastAccessTime;
221
222         CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
223                 Modification modification) {
224             this.transactionID = transactionID;
225             this.cohort = cohort;
226             this.modification = modification;
227         }
228
229         void updateLastAccessTime() {
230             lastAccessTime = System.currentTimeMillis();
231         }
232
233         long getLastAccessTime() {
234             return lastAccessTime;
235         }
236
237         String getTransactionID() {
238             return transactionID;
239         }
240
241         DOMStoreThreePhaseCommitCohort getCohort() {
242             return cohort;
243         }
244
245         Modification getModification() {
246             return modification;
247         }
248
249         ActorRef getCanCommitSender() {
250             return canCommitSender;
251         }
252
253         void setCanCommitSender(ActorRef canCommitSender) {
254             this.canCommitSender = canCommitSender;
255         }
256
257         ActorRef getShard() {
258             return shard;
259         }
260
261         void setShard(ActorRef shard) {
262             this.shard = shard;
263         }
264     }
265 }