2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import com.google.common.cache.Cache;
13 import com.google.common.cache.CacheBuilder;
14 import java.util.LinkedList;
15 import java.util.Queue;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
19 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
20 import org.opendaylight.controller.cluster.datastore.modification.Modification;
21 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
22 import org.slf4j.Logger;
25 * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
27 * @author Thomas Pantelis
29 public class ShardCommitCoordinator {
31 private final Cache<String, CohortEntry> cohortCache;
33 private CohortEntry currentCohortEntry;
35 private final Queue<CohortEntry> queuedCohortEntries;
37 private final int queueCapacity;
39 private final Logger log;
41 private final String name;
43 public ShardCommitCoordinator(long cacheExpiryTimeoutInSec, int queueCapacity, Logger log,
45 cohortCache = CacheBuilder.newBuilder().expireAfterAccess(
46 cacheExpiryTimeoutInSec, TimeUnit.SECONDS).build();
48 this.queueCapacity = queueCapacity;
52 // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
53 // since this should only be accessed on the shard's dispatcher.
54 queuedCohortEntries = new LinkedList<>();
58 * This method caches a cohort entry for the given transactions ID in preparation for the
59 * subsequent 3-phase commit.
61 * @param transactionID the ID of the transaction
62 * @param cohort the cohort to participate in the transaction commit
63 * @param modification the modification made by the transaction
65 public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
66 Modification modification) {
68 cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
72 * This method handles the canCommit phase for a transaction.
74 * @param canCommit the CanCommitTransaction message
75 * @param sender the actor that sent the message
76 * @param shard the transaction's shard actor
78 public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
79 final ActorRef shard) {
80 String transactionID = canCommit.getTransactionID();
81 if(log.isDebugEnabled()) {
82 log.debug("{}: Processing canCommit for transaction {} for shard {}",
83 name, transactionID, shard.path());
86 // Lookup the cohort entry that was cached previously (or should have been) by
87 // transactionReady (via the ForwardedReadyTransaction message).
88 final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
89 if(cohortEntry == null) {
90 // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
91 // between canCommit and ready and the entry was expired from the cache.
92 IllegalStateException ex = new IllegalStateException(
93 String.format("%s: No cohort entry found for transaction %s", name, transactionID));
94 log.error(ex.getMessage());
95 sender.tell(new Status.Failure(ex), shard);
99 cohortEntry.setCanCommitSender(sender);
100 cohortEntry.setShard(shard);
102 if(currentCohortEntry != null) {
103 // There's already a Tx commit in progress - attempt to queue this entry to be
104 // committed after the current Tx completes.
105 log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
106 name, currentCohortEntry.getTransactionID(), transactionID);
108 if(queuedCohortEntries.size() < queueCapacity) {
109 queuedCohortEntries.offer(cohortEntry);
111 removeCohortEntry(transactionID);
113 RuntimeException ex = new RuntimeException(
114 String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
115 " capacity %d has been reached.",
116 name, transactionID, queueCapacity));
117 log.error(ex.getMessage());
118 sender.tell(new Status.Failure(ex), shard);
121 // No Tx commit currently in progress - make this the current entry and proceed with
123 cohortEntry.updateLastAccessTime();
124 currentCohortEntry = cohortEntry;
126 doCanCommit(cohortEntry);
130 private void doCanCommit(final CohortEntry cohortEntry) {
133 // We block on the future here so we don't have to worry about possibly accessing our
134 // state on a different thread outside of our dispatcher. Also, the data store
135 // currently uses a same thread executor anyway.
136 Boolean canCommit = cohortEntry.getCohort().canCommit().get();
138 cohortEntry.getCanCommitSender().tell(
139 canCommit ? CanCommitTransactionReply.YES.toSerializable() :
140 CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
143 // Remove the entry from the cache now since the Tx will be aborted.
144 removeCohortEntry(cohortEntry.getTransactionID());
146 } catch (InterruptedException | ExecutionException e) {
147 log.debug("{}: An exception occurred during canCommit: {}", name, e);
149 // Remove the entry from the cache now since the Tx will be aborted.
150 removeCohortEntry(cohortEntry.getTransactionID());
151 cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
156 * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
157 * matches the current entry.
159 * @param transactionID the ID of the transaction
160 * @return the current CohortEntry or null if the given transaction ID does not match the
163 public CohortEntry getCohortEntryIfCurrent(String transactionID) {
164 if(isCurrentTransaction(transactionID)) {
165 return currentCohortEntry;
171 public CohortEntry getCurrentCohortEntry() {
172 return currentCohortEntry;
175 public CohortEntry getAndRemoveCohortEntry(String transactionID) {
176 CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
177 cohortCache.invalidate(transactionID);
181 public void removeCohortEntry(String transactionID) {
182 cohortCache.invalidate(transactionID);
185 public boolean isCurrentTransaction(String transactionID) {
186 return currentCohortEntry != null &&
187 currentCohortEntry.getTransactionID().equals(transactionID);
191 * This method is called when a transaction is complete, successful or not. If the given
192 * given transaction ID matches the current in-progress transaction, the next cohort entry,
193 * if any, is dequeued and processed.
195 * @param transactionID the ID of the completed transaction
196 * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
199 public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
200 if(removeCohortEntry) {
201 removeCohortEntry(transactionID);
204 if(isCurrentTransaction(transactionID)) {
205 // Dequeue the next cohort entry waiting in the queue.
206 currentCohortEntry = queuedCohortEntries.poll();
207 if(currentCohortEntry != null) {
208 currentCohortEntry.updateLastAccessTime();
209 doCanCommit(currentCohortEntry);
214 static class CohortEntry {
215 private final String transactionID;
216 private final DOMStoreThreePhaseCommitCohort cohort;
217 private final Modification modification;
218 private ActorRef canCommitSender;
219 private ActorRef shard;
220 private long lastAccessTime;
222 CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
223 Modification modification) {
224 this.transactionID = transactionID;
225 this.cohort = cohort;
226 this.modification = modification;
229 void updateLastAccessTime() {
230 lastAccessTime = System.currentTimeMillis();
233 long getLastAccessTime() {
234 return lastAccessTime;
237 String getTransactionID() {
238 return transactionID;
241 DOMStoreThreePhaseCommitCohort getCohort() {
245 Modification getModification() {
249 ActorRef getCanCommitSender() {
250 return canCommitSender;
253 void setCanCommitSender(ActorRef canCommitSender) {
254 this.canCommitSender = canCommitSender;
257 ActorRef getShard() {
261 void setShard(ActorRef shard) {