Merge "Add support for metadata to the Match/Action classes"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
1 /*
2  * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import java.util.LinkedList;
11 import java.util.Queue;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeUnit;
14 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
15 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
16 import org.opendaylight.controller.cluster.datastore.modification.Modification;
17 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 import akka.actor.ActorRef;
21 import akka.actor.Status;
22 import com.google.common.cache.Cache;
23 import com.google.common.cache.CacheBuilder;
24
25 /**
26  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
27  *
28  * @author Thomas Pantelis
29  */
30 public class ShardCommitCoordinator {
31
32     private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
33
34     private static final Object CAN_COMMIT_REPLY_TRUE =
35             new CanCommitTransactionReply(Boolean.TRUE).toSerializable();
36
37     private static final Object CAN_COMMIT_REPLY_FALSE =
38             new CanCommitTransactionReply(Boolean.FALSE).toSerializable();
39
40     private final Cache<String, CohortEntry> cohortCache;
41
42     private CohortEntry currentCohortEntry;
43
44     private final Queue<CohortEntry> queuedCohortEntries;
45
46     private final int queueCapacity;
47
48     public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
49         cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
50                 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
51
52         this.queueCapacity = queueCapacity;
53
54         // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
55         // since this should only be accessed on the shard's dispatcher.
56         queuedCohortEntries = new LinkedList<>();
57     }
58
59     /**
60      * This method caches a cohort entry for the given transactions ID in preparation for the
61      * subsequent 3-phase commit.
62      *
63      * @param transactionID the ID of the transaction
64      * @param cohort the cohort to participate in the transaction commit
65      * @param modification the modification made by the transaction
66      */
67     public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
68             Modification modification) {
69
70         cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
71     }
72
73     /**
74      * This method handles the canCommit phase for a transaction.
75      *
76      * @param canCommit the CanCommitTransaction message
77      * @param sender the actor that sent the message
78      * @param shard the transaction's shard actor
79      */
80     public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
81             final ActorRef shard) {
82         String transactionID = canCommit.getTransactionID();
83         if(LOG.isDebugEnabled()) {
84             LOG.debug("Processing canCommit for transaction {} for shard {}",
85                     transactionID, shard.path());
86         }
87
88         // Lookup the cohort entry that was cached previously (or should have been) by
89         // transactionReady (via the ForwardedReadyTransaction message).
90         final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
91         if(cohortEntry == null) {
92             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
93             // between canCommit and ready and the entry was expired from the cache.
94             IllegalStateException ex = new IllegalStateException(
95                     String.format("No cohort entry found for transaction %s", transactionID));
96             LOG.error(ex.getMessage());
97             sender.tell(new Status.Failure(ex), shard);
98             return;
99         }
100
101         cohortEntry.setCanCommitSender(sender);
102         cohortEntry.setShard(shard);
103
104         if(currentCohortEntry != null) {
105             // There's already a Tx commit in progress - attempt to queue this entry to be
106             // committed after the current Tx completes.
107             LOG.debug("Transaction {} is already in progress - queueing transaction {}",
108                     currentCohortEntry.getTransactionID(), transactionID);
109
110             if(queuedCohortEntries.size() < queueCapacity) {
111                 queuedCohortEntries.offer(cohortEntry);
112             } else {
113                 removeCohortEntry(transactionID);
114
115                 RuntimeException ex = new RuntimeException(
116                         String.format("Could not enqueue transaction %s - the maximum commit queue"+
117                                       " capacity %d has been reached.",
118                                 transactionID, queueCapacity));
119                 LOG.error(ex.getMessage());
120                 sender.tell(new Status.Failure(ex), shard);
121             }
122         } else {
123             // No Tx commit currently in progress - make this the current entry and proceed with
124             // canCommit.
125             cohortEntry.updateLastAccessTime();
126             currentCohortEntry = cohortEntry;
127
128             doCanCommit(cohortEntry);
129         }
130     }
131
132     private void doCanCommit(final CohortEntry cohortEntry) {
133
134         try {
135             // We block on the future here so we don't have to worry about possibly accessing our
136             // state on a different thread outside of our dispatcher. Also, the data store
137             // currently uses a same thread executor anyway.
138             Boolean canCommit = cohortEntry.getCohort().canCommit().get();
139
140             cohortEntry.getCanCommitSender().tell(
141                     canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard());
142
143             if(!canCommit) {
144                 // Remove the entry from the cache now since the Tx will be aborted.
145                 removeCohortEntry(cohortEntry.getTransactionID());
146             }
147         } catch (InterruptedException | ExecutionException e) {
148             LOG.debug("An exception occurred during canCommit", e);
149
150             // Remove the entry from the cache now since the Tx will be aborted.
151             removeCohortEntry(cohortEntry.getTransactionID());
152             cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
153         }
154     }
155
156     /**
157      * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
158      * matches the current entry.
159      *
160      * @param transactionID the ID of the transaction
161      * @return the current CohortEntry or null if the given transaction ID does not match the
162      *         current entry.
163      */
164     public CohortEntry getCohortEntryIfCurrent(String transactionID) {
165         if(isCurrentTransaction(transactionID)) {
166             return currentCohortEntry;
167         }
168
169         return null;
170     }
171
172     public CohortEntry getCurrentCohortEntry() {
173         return currentCohortEntry;
174     }
175
176     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
177         CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
178         cohortCache.invalidate(transactionID);
179         return cohortEntry;
180     }
181
182     public void removeCohortEntry(String transactionID) {
183         cohortCache.invalidate(transactionID);
184     }
185
186     public boolean isCurrentTransaction(String transactionID) {
187         return currentCohortEntry != null &&
188                 currentCohortEntry.getTransactionID().equals(transactionID);
189     }
190
191     /**
192      * This method is called when a transaction is complete, successful or not. If the given
193      * given transaction ID matches the current in-progress transaction, the next cohort entry,
194      * if any, is dequeued and processed.
195      *
196      * @param transactionID the ID of the completed transaction
197      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
198      *        the cache.
199      */
200     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
201         if(removeCohortEntry) {
202             removeCohortEntry(transactionID);
203         }
204
205         if(isCurrentTransaction(transactionID)) {
206             // Dequeue the next cohort entry waiting in the queue.
207             currentCohortEntry = queuedCohortEntries.poll();
208             if(currentCohortEntry != null) {
209                 doCanCommit(currentCohortEntry);
210             }
211         }
212     }
213
214     static class CohortEntry {
215         private final String transactionID;
216         private final DOMStoreThreePhaseCommitCohort cohort;
217         private final Modification modification;
218         private ActorRef canCommitSender;
219         private ActorRef shard;
220         private long lastAccessTime;
221
222         CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
223                 Modification modification) {
224             this.transactionID = transactionID;
225             this.cohort = cohort;
226             this.modification = modification;
227         }
228
229         void updateLastAccessTime() {
230             lastAccessTime = System.currentTimeMillis();
231         }
232
233         long getLastAccessTime() {
234             return lastAccessTime;
235         }
236
237         String getTransactionID() {
238             return transactionID;
239         }
240
241         DOMStoreThreePhaseCommitCohort getCohort() {
242             return cohort;
243         }
244
245         Modification getModification() {
246             return modification;
247         }
248
249         ActorRef getCanCommitSender() {
250             return canCommitSender;
251         }
252
253         void setCanCommitSender(ActorRef canCommitSender) {
254             this.canCommitSender = canCommitSender;
255         }
256
257         ActorRef getShard() {
258             return shard;
259         }
260
261         void setShard(ActorRef shard) {
262             this.shard = shard;
263         }
264     }
265 }