2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.cache.Cache;
13 import com.google.common.cache.CacheBuilder;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
21 import org.opendaylight.controller.cluster.datastore.modification.Modification;
22 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
23 import org.slf4j.Logger;
26 * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
28 * @author Thomas Pantelis
30 public class ShardCommitCoordinator {
32 private final Cache<String, CohortEntry> cohortCache;
34 private CohortEntry currentCohortEntry;
36 private final Queue<CohortEntry> queuedCohortEntries;
38 private int queueCapacity;
40 private final Logger log;
42 private final String name;
44 public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
46 cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
47 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
49 this.queueCapacity = queueCapacity;
53 // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
54 // since this should only be accessed on the shard's dispatcher.
55 queuedCohortEntries = new LinkedList<>();
58 public void setQueueCapacity(int queueCapacity) {
59 this.queueCapacity = queueCapacity;
63 * This method caches a cohort entry for the given transactions ID in preparation for the
64 * subsequent 3-phase commit.
66 * @param transactionID the ID of the transaction
67 * @param cohort the cohort to participate in the transaction commit
68 * @param modification the modification made by the transaction
70 public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
71 Modification modification) {
73 cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
77 * This method handles the canCommit phase for a transaction.
79 * @param canCommit the CanCommitTransaction message
80 * @param sender the actor that sent the message
81 * @param shard the transaction's shard actor
83 public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
84 final ActorRef shard) {
85 String transactionID = canCommit.getTransactionID();
86 if(log.isDebugEnabled()) {
87 log.debug("{}: Processing canCommit for transaction {} for shard {}",
88 name, transactionID, shard.path());
91 // Lookup the cohort entry that was cached previously (or should have been) by
92 // transactionReady (via the ForwardedReadyTransaction message).
93 final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
94 if(cohortEntry == null) {
95 // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
96 // between canCommit and ready and the entry was expired from the cache.
97 IllegalStateException ex = new IllegalStateException(
98 String.format("%s: No cohort entry found for transaction %s", name, transactionID));
99 log.error(ex.getMessage());
100 sender.tell(new Status.Failure(ex), shard);
104 cohortEntry.setCanCommitSender(sender);
105 cohortEntry.setShard(shard);
107 if(currentCohortEntry != null) {
108 // There's already a Tx commit in progress - attempt to queue this entry to be
109 // committed after the current Tx completes.
110 log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
111 name, currentCohortEntry.getTransactionID(), transactionID);
113 if(queuedCohortEntries.size() < queueCapacity) {
114 queuedCohortEntries.offer(cohortEntry);
116 removeCohortEntry(transactionID);
118 RuntimeException ex = new RuntimeException(
119 String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
120 " capacity %d has been reached.",
121 name, transactionID, queueCapacity));
122 log.error(ex.getMessage());
123 sender.tell(new Status.Failure(ex), shard);
126 // No Tx commit currently in progress - make this the current entry and proceed with
128 cohortEntry.updateLastAccessTime();
129 currentCohortEntry = cohortEntry;
131 doCanCommit(cohortEntry);
135 private void doCanCommit(final CohortEntry cohortEntry) {
138 // We block on the future here so we don't have to worry about possibly accessing our
139 // state on a different thread outside of our dispatcher. Also, the data store
140 // currently uses a same thread executor anyway.
141 Boolean canCommit = cohortEntry.getCohort().canCommit().get();
143 cohortEntry.getCanCommitSender().tell(
144 canCommit ? CanCommitTransactionReply.YES.toSerializable() :
145 CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
148 // Remove the entry from the cache now since the Tx will be aborted.
149 removeCohortEntry(cohortEntry.getTransactionID());
151 } catch (InterruptedException | ExecutionException e) {
152 log.debug("{}: An exception occurred during canCommit: {}", name, e);
154 // Remove the entry from the cache now since the Tx will be aborted.
155 removeCohortEntry(cohortEntry.getTransactionID());
156 cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
161 * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
162 * matches the current entry.
164 * @param transactionID the ID of the transaction
165 * @return the current CohortEntry or null if the given transaction ID does not match the
168 public CohortEntry getCohortEntryIfCurrent(String transactionID) {
169 if(isCurrentTransaction(transactionID)) {
170 return currentCohortEntry;
176 public CohortEntry getCurrentCohortEntry() {
177 return currentCohortEntry;
180 public CohortEntry getAndRemoveCohortEntry(String transactionID) {
181 CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
182 cohortCache.invalidate(transactionID);
186 public void removeCohortEntry(String transactionID) {
187 cohortCache.invalidate(transactionID);
190 public boolean isCurrentTransaction(String transactionID) {
191 return currentCohortEntry != null &&
192 currentCohortEntry.getTransactionID().equals(transactionID);
196 * This method is called when a transaction is complete, successful or not. If the given
197 * given transaction ID matches the current in-progress transaction, the next cohort entry,
198 * if any, is dequeued and processed.
200 * @param transactionID the ID of the completed transaction
201 * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
204 public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
205 if(removeCohortEntry) {
206 removeCohortEntry(transactionID);
209 if(isCurrentTransaction(transactionID)) {
210 // Dequeue the next cohort entry waiting in the queue.
211 currentCohortEntry = queuedCohortEntries.poll();
212 if(currentCohortEntry != null) {
213 currentCohortEntry.updateLastAccessTime();
214 doCanCommit(currentCohortEntry);
219 static class CohortEntry {
220 private final String transactionID;
221 private final DOMStoreThreePhaseCommitCohort cohort;
222 private final Modification modification;
223 private ActorRef canCommitSender;
224 private ActorRef shard;
225 private long lastAccessTime;
227 CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
228 Modification modification) {
229 this.transactionID = transactionID;
230 this.cohort = cohort;
231 this.modification = modification;
234 void updateLastAccessTime() {
235 lastAccessTime = System.currentTimeMillis();
238 long getLastAccessTime() {
239 return lastAccessTime;
242 String getTransactionID() {
243 return transactionID;
246 DOMStoreThreePhaseCommitCohort getCohort() {
250 Modification getModification() {
254 ActorRef getCanCommitSender() {
255 return canCommitSender;
258 void setCanCommitSender(ActorRef canCommitSender) {
259 this.canCommitSender = canCommitSender;
262 ActorRef getShard() {
266 void setShard(ActorRef shard) {
270 boolean hasModifications(){
271 if(modification instanceof CompositeModification){
272 return ((CompositeModification) modification).getModifications().size() > 0;