2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.cache.Cache;
13 import com.google.common.cache.CacheBuilder;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.modification.Modification;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
26 * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
28 * @author Thomas Pantelis
30 public class ShardCommitCoordinator {
32 private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
34 private final Cache<String, CohortEntry> cohortCache;
36 private CohortEntry currentCohortEntry;
38 private final Queue<CohortEntry> queuedCohortEntries;
40 private final int queueCapacity;
42 public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
43 cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
44 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
46 this.queueCapacity = queueCapacity;
48 // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
49 // since this should only be accessed on the shard's dispatcher.
50 queuedCohortEntries = new LinkedList<>();
54 * This method caches a cohort entry for the given transactions ID in preparation for the
55 * subsequent 3-phase commit.
57 * @param transactionID the ID of the transaction
58 * @param cohort the cohort to participate in the transaction commit
59 * @param modification the modification made by the transaction
61 public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
62 Modification modification) {
64 cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
68 * This method handles the canCommit phase for a transaction.
70 * @param canCommit the CanCommitTransaction message
71 * @param sender the actor that sent the message
72 * @param shard the transaction's shard actor
74 public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
75 final ActorRef shard) {
76 String transactionID = canCommit.getTransactionID();
77 if(LOG.isDebugEnabled()) {
78 LOG.debug("Processing canCommit for transaction {} for shard {}",
79 transactionID, shard.path());
82 // Lookup the cohort entry that was cached previously (or should have been) by
83 // transactionReady (via the ForwardedReadyTransaction message).
84 final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
85 if(cohortEntry == null) {
86 // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
87 // between canCommit and ready and the entry was expired from the cache.
88 IllegalStateException ex = new IllegalStateException(
89 String.format("No cohort entry found for transaction %s", transactionID));
90 LOG.error(ex.getMessage());
91 sender.tell(new Status.Failure(ex), shard);
95 cohortEntry.setCanCommitSender(sender);
96 cohortEntry.setShard(shard);
98 if(currentCohortEntry != null) {
99 // There's already a Tx commit in progress - attempt to queue this entry to be
100 // committed after the current Tx completes.
101 LOG.debug("Transaction {} is already in progress - queueing transaction {}",
102 currentCohortEntry.getTransactionID(), transactionID);
104 if(queuedCohortEntries.size() < queueCapacity) {
105 queuedCohortEntries.offer(cohortEntry);
107 removeCohortEntry(transactionID);
109 RuntimeException ex = new RuntimeException(
110 String.format("Could not enqueue transaction %s - the maximum commit queue"+
111 " capacity %d has been reached.",
112 transactionID, queueCapacity));
113 LOG.error(ex.getMessage());
114 sender.tell(new Status.Failure(ex), shard);
117 // No Tx commit currently in progress - make this the current entry and proceed with
119 cohortEntry.updateLastAccessTime();
120 currentCohortEntry = cohortEntry;
122 doCanCommit(cohortEntry);
126 private void doCanCommit(final CohortEntry cohortEntry) {
129 // We block on the future here so we don't have to worry about possibly accessing our
130 // state on a different thread outside of our dispatcher. Also, the data store
131 // currently uses a same thread executor anyway.
132 Boolean canCommit = cohortEntry.getCohort().canCommit().get();
134 cohortEntry.getCanCommitSender().tell(
135 canCommit ? CanCommitTransactionReply.YES.toSerializable() :
136 CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
139 // Remove the entry from the cache now since the Tx will be aborted.
140 removeCohortEntry(cohortEntry.getTransactionID());
142 } catch (InterruptedException | ExecutionException e) {
143 LOG.debug("An exception occurred during canCommit", e);
145 // Remove the entry from the cache now since the Tx will be aborted.
146 removeCohortEntry(cohortEntry.getTransactionID());
147 cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
152 * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
153 * matches the current entry.
155 * @param transactionID the ID of the transaction
156 * @return the current CohortEntry or null if the given transaction ID does not match the
159 public CohortEntry getCohortEntryIfCurrent(String transactionID) {
160 if(isCurrentTransaction(transactionID)) {
161 return currentCohortEntry;
167 public CohortEntry getCurrentCohortEntry() {
168 return currentCohortEntry;
171 public CohortEntry getAndRemoveCohortEntry(String transactionID) {
172 CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
173 cohortCache.invalidate(transactionID);
177 public void removeCohortEntry(String transactionID) {
178 cohortCache.invalidate(transactionID);
181 public boolean isCurrentTransaction(String transactionID) {
182 return currentCohortEntry != null &&
183 currentCohortEntry.getTransactionID().equals(transactionID);
187 * This method is called when a transaction is complete, successful or not. If the given
188 * given transaction ID matches the current in-progress transaction, the next cohort entry,
189 * if any, is dequeued and processed.
191 * @param transactionID the ID of the completed transaction
192 * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
195 public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
196 if(removeCohortEntry) {
197 removeCohortEntry(transactionID);
200 if(isCurrentTransaction(transactionID)) {
201 // Dequeue the next cohort entry waiting in the queue.
202 currentCohortEntry = queuedCohortEntries.poll();
203 if(currentCohortEntry != null) {
204 doCanCommit(currentCohortEntry);
209 static class CohortEntry {
210 private final String transactionID;
211 private final DOMStoreThreePhaseCommitCohort cohort;
212 private final Modification modification;
213 private ActorRef canCommitSender;
214 private ActorRef shard;
215 private long lastAccessTime;
217 CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
218 Modification modification) {
219 this.transactionID = transactionID;
220 this.cohort = cohort;
221 this.modification = modification;
224 void updateLastAccessTime() {
225 lastAccessTime = System.currentTimeMillis();
228 long getLastAccessTime() {
229 return lastAccessTime;
232 String getTransactionID() {
233 return transactionID;
236 DOMStoreThreePhaseCommitCohort getCohort() {
240 Modification getModification() {
244 ActorRef getCanCommitSender() {
245 return canCommitSender;
248 void setCanCommitSender(ActorRef canCommitSender) {
249 this.canCommitSender = canCommitSender;
252 ActorRef getShard() {
256 void setShard(ActorRef shard) {