2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.cache.Cache;
13 import com.google.common.cache.CacheBuilder;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.modification.Modification;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.slf4j.Logger;
25 * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
27 * @author Thomas Pantelis
29 public class ShardCommitCoordinator {
31 private final Cache<String, CohortEntry> cohortCache;
33 private CohortEntry currentCohortEntry;
35 private final Queue<CohortEntry> queuedCohortEntries;
37 private int queueCapacity;
39 private final Logger log;
41 private final String name;
43 public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
45 cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
46 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
48 this.queueCapacity = queueCapacity;
52 // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
53 // since this should only be accessed on the shard's dispatcher.
54 queuedCohortEntries = new LinkedList<>();
57 public void setQueueCapacity(int queueCapacity) {
58 this.queueCapacity = queueCapacity;
62 * This method caches a cohort entry for the given transactions ID in preparation for the
63 * subsequent 3-phase commit.
65 * @param transactionID the ID of the transaction
66 * @param cohort the cohort to participate in the transaction commit
67 * @param modification the modification made by the transaction
69 public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
70 Modification modification) {
72 cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
76 * This method handles the canCommit phase for a transaction.
78 * @param canCommit the CanCommitTransaction message
79 * @param sender the actor that sent the message
80 * @param shard the transaction's shard actor
82 public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
83 final ActorRef shard) {
84 String transactionID = canCommit.getTransactionID();
85 if(log.isDebugEnabled()) {
86 log.debug("{}: Processing canCommit for transaction {} for shard {}",
87 name, transactionID, shard.path());
90 // Lookup the cohort entry that was cached previously (or should have been) by
91 // transactionReady (via the ForwardedReadyTransaction message).
92 final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
93 if(cohortEntry == null) {
94 // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
95 // between canCommit and ready and the entry was expired from the cache.
96 IllegalStateException ex = new IllegalStateException(
97 String.format("%s: No cohort entry found for transaction %s", name, transactionID));
98 log.error(ex.getMessage());
99 sender.tell(new Status.Failure(ex), shard);
103 cohortEntry.setCanCommitSender(sender);
104 cohortEntry.setShard(shard);
106 if(currentCohortEntry != null) {
107 // There's already a Tx commit in progress - attempt to queue this entry to be
108 // committed after the current Tx completes.
109 log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
110 name, currentCohortEntry.getTransactionID(), transactionID);
112 if(queuedCohortEntries.size() < queueCapacity) {
113 queuedCohortEntries.offer(cohortEntry);
115 removeCohortEntry(transactionID);
117 RuntimeException ex = new RuntimeException(
118 String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
119 " capacity %d has been reached.",
120 name, transactionID, queueCapacity));
121 log.error(ex.getMessage());
122 sender.tell(new Status.Failure(ex), shard);
125 // No Tx commit currently in progress - make this the current entry and proceed with
127 cohortEntry.updateLastAccessTime();
128 currentCohortEntry = cohortEntry;
130 doCanCommit(cohortEntry);
134 private void doCanCommit(final CohortEntry cohortEntry) {
137 // We block on the future here so we don't have to worry about possibly accessing our
138 // state on a different thread outside of our dispatcher. Also, the data store
139 // currently uses a same thread executor anyway.
140 Boolean canCommit = cohortEntry.getCohort().canCommit().get();
142 cohortEntry.getCanCommitSender().tell(
143 canCommit ? CanCommitTransactionReply.YES.toSerializable() :
144 CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
147 // Remove the entry from the cache now since the Tx will be aborted.
148 removeCohortEntry(cohortEntry.getTransactionID());
150 } catch (InterruptedException | ExecutionException e) {
151 log.debug("{}: An exception occurred during canCommit: {}", name, e);
153 // Remove the entry from the cache now since the Tx will be aborted.
154 removeCohortEntry(cohortEntry.getTransactionID());
155 cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
160 * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
161 * matches the current entry.
163 * @param transactionID the ID of the transaction
164 * @return the current CohortEntry or null if the given transaction ID does not match the
167 public CohortEntry getCohortEntryIfCurrent(String transactionID) {
168 if(isCurrentTransaction(transactionID)) {
169 return currentCohortEntry;
175 public CohortEntry getCurrentCohortEntry() {
176 return currentCohortEntry;
179 public CohortEntry getAndRemoveCohortEntry(String transactionID) {
180 CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
181 cohortCache.invalidate(transactionID);
185 public void removeCohortEntry(String transactionID) {
186 cohortCache.invalidate(transactionID);
189 public boolean isCurrentTransaction(String transactionID) {
190 return currentCohortEntry != null &&
191 currentCohortEntry.getTransactionID().equals(transactionID);
195 * This method is called when a transaction is complete, successful or not. If the given
196 * given transaction ID matches the current in-progress transaction, the next cohort entry,
197 * if any, is dequeued and processed.
199 * @param transactionID the ID of the completed transaction
200 * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
203 public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
204 if(removeCohortEntry) {
205 removeCohortEntry(transactionID);
208 if(isCurrentTransaction(transactionID)) {
209 // Dequeue the next cohort entry waiting in the queue.
210 currentCohortEntry = queuedCohortEntries.poll();
211 if(currentCohortEntry != null) {
212 currentCohortEntry.updateLastAccessTime();
213 doCanCommit(currentCohortEntry);
218 static class CohortEntry {
219 private final String transactionID;
220 private final DOMStoreThreePhaseCommitCohort cohort;
221 private final Modification modification;
222 private ActorRef canCommitSender;
223 private ActorRef shard;
224 private long lastAccessTime;
226 CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
227 Modification modification) {
228 this.transactionID = transactionID;
229 this.cohort = cohort;
230 this.modification = modification;
233 void updateLastAccessTime() {
234 lastAccessTime = System.currentTimeMillis();
237 long getLastAccessTime() {
238 return lastAccessTime;
241 String getTransactionID() {
242 return transactionID;
245 DOMStoreThreePhaseCommitCohort getCohort() {
249 Modification getModification() {
253 ActorRef getCanCommitSender() {
254 return canCommitSender;
257 void setCanCommitSender(ActorRef canCommitSender) {
258 this.canCommitSender = canCommitSender;
261 ActorRef getShard() {
265 void setShard(ActorRef shard) {