2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import java.util.LinkedList;
11 import java.util.Queue;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeUnit;
14 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
15 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
16 import org.opendaylight.controller.cluster.datastore.modification.Modification;
17 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 import akka.actor.ActorRef;
21 import akka.actor.Status;
22 import com.google.common.cache.Cache;
23 import com.google.common.cache.CacheBuilder;
26 * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
28 * @author Thomas Pantelis
30 public class ShardCommitCoordinator {
32 private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinator.class);
34 private static final Object CAN_COMMIT_REPLY_TRUE =
35 new CanCommitTransactionReply(Boolean.TRUE).toSerializable();
37 private static final Object CAN_COMMIT_REPLY_FALSE =
38 new CanCommitTransactionReply(Boolean.FALSE).toSerializable();
40 private final Cache<String, CohortEntry> cohortCache;
42 private CohortEntry currentCohortEntry;
44 private final Queue<CohortEntry> queuedCohortEntries;
46 private final int queueCapacity;
48 public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity) {
49 cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
50 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
52 this.queueCapacity = queueCapacity;
54 // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
55 // since this should only be accessed on the shard's dispatcher.
56 queuedCohortEntries = new LinkedList<>();
60 * This method caches a cohort entry for the given transactions ID in preparation for the
61 * subsequent 3-phase commit.
63 * @param transactionID the ID of the transaction
64 * @param cohort the cohort to participate in the transaction commit
65 * @param modification the modification made by the transaction
67 public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
68 Modification modification) {
70 cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
74 * This method handles the canCommit phase for a transaction.
76 * @param canCommit the CanCommitTransaction message
77 * @param sender the actor that sent the message
78 * @param shard the transaction's shard actor
80 public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
81 final ActorRef shard) {
82 String transactionID = canCommit.getTransactionID();
83 if(LOG.isDebugEnabled()) {
84 LOG.debug("Processing canCommit for transaction {} for shard {}",
85 transactionID, shard.path());
88 // Lookup the cohort entry that was cached previously (or should have been) by
89 // transactionReady (via the ForwardedReadyTransaction message).
90 final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
91 if(cohortEntry == null) {
92 // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
93 // between canCommit and ready and the entry was expired from the cache.
94 IllegalStateException ex = new IllegalStateException(
95 String.format("No cohort entry found for transaction %s", transactionID));
96 LOG.error(ex.getMessage());
97 sender.tell(new Status.Failure(ex), shard);
101 cohortEntry.setCanCommitSender(sender);
102 cohortEntry.setShard(shard);
104 if(currentCohortEntry != null) {
105 // There's already a Tx commit in progress - attempt to queue this entry to be
106 // committed after the current Tx completes.
107 LOG.debug("Transaction {} is already in progress - queueing transaction {}",
108 currentCohortEntry.getTransactionID(), transactionID);
110 if(queuedCohortEntries.size() < queueCapacity) {
111 queuedCohortEntries.offer(cohortEntry);
113 removeCohortEntry(transactionID);
115 RuntimeException ex = new RuntimeException(
116 String.format("Could not enqueue transaction %s - the maximum commit queue"+
117 " capacity %d has been reached.",
118 transactionID, queueCapacity));
119 LOG.error(ex.getMessage());
120 sender.tell(new Status.Failure(ex), shard);
123 // No Tx commit currently in progress - make this the current entry and proceed with
125 cohortEntry.updateLastAccessTime();
126 currentCohortEntry = cohortEntry;
128 doCanCommit(cohortEntry);
132 private void doCanCommit(final CohortEntry cohortEntry) {
135 // We block on the future here so we don't have to worry about possibly accessing our
136 // state on a different thread outside of our dispatcher. Also, the data store
137 // currently uses a same thread executor anyway.
138 Boolean canCommit = cohortEntry.getCohort().canCommit().get();
140 cohortEntry.getCanCommitSender().tell(
141 canCommit ? CAN_COMMIT_REPLY_TRUE : CAN_COMMIT_REPLY_FALSE, cohortEntry.getShard());
144 // Remove the entry from the cache now since the Tx will be aborted.
145 removeCohortEntry(cohortEntry.getTransactionID());
147 } catch (InterruptedException | ExecutionException e) {
148 LOG.debug("An exception occurred during canCommit", e);
150 // Remove the entry from the cache now since the Tx will be aborted.
151 removeCohortEntry(cohortEntry.getTransactionID());
152 cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
157 * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
158 * matches the current entry.
160 * @param transactionID the ID of the transaction
161 * @return the current CohortEntry or null if the given transaction ID does not match the
164 public CohortEntry getCohortEntryIfCurrent(String transactionID) {
165 if(isCurrentTransaction(transactionID)) {
166 return currentCohortEntry;
172 public CohortEntry getCurrentCohortEntry() {
173 return currentCohortEntry;
176 public CohortEntry getAndRemoveCohortEntry(String transactionID) {
177 CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
178 cohortCache.invalidate(transactionID);
182 public void removeCohortEntry(String transactionID) {
183 cohortCache.invalidate(transactionID);
186 public boolean isCurrentTransaction(String transactionID) {
187 return currentCohortEntry != null &&
188 currentCohortEntry.getTransactionID().equals(transactionID);
192 * This method is called when a transaction is complete, successful or not. If the given
193 * given transaction ID matches the current in-progress transaction, the next cohort entry,
194 * if any, is dequeued and processed.
196 * @param transactionID the ID of the completed transaction
197 * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
200 public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
201 if(removeCohortEntry) {
202 removeCohortEntry(transactionID);
205 if(isCurrentTransaction(transactionID)) {
206 // Dequeue the next cohort entry waiting in the queue.
207 currentCohortEntry = queuedCohortEntries.poll();
208 if(currentCohortEntry != null) {
209 doCanCommit(currentCohortEntry);
214 static class CohortEntry {
215 private final String transactionID;
216 private final DOMStoreThreePhaseCommitCohort cohort;
217 private final Modification modification;
218 private ActorRef canCommitSender;
219 private ActorRef shard;
220 private long lastAccessTime;
222 CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
223 Modification modification) {
224 this.transactionID = transactionID;
225 this.cohort = cohort;
226 this.modification = modification;
229 void updateLastAccessTime() {
230 lastAccessTime = System.currentTimeMillis();
233 long getLastAccessTime() {
234 return lastAccessTime;
237 String getTransactionID() {
238 return transactionID;
241 DOMStoreThreePhaseCommitCohort getCohort() {
245 Modification getModification() {
249 ActorRef getCanCommitSender() {
250 return canCommitSender;
253 void setCanCommitSender(ActorRef canCommitSender) {
254 this.canCommitSender = canCommitSender;
257 ActorRef getShard() {
261 void setShard(ActorRef shard) {