2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.cache.Cache;
16 import com.google.common.cache.CacheBuilder;
17 import com.google.common.cache.RemovalCause;
18 import com.google.common.cache.RemovalListener;
19 import com.google.common.cache.RemovalNotification;
20 import java.util.LinkedList;
21 import java.util.Queue;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
28 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
31 import org.slf4j.Logger;
34 * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
36 * @author Thomas Pantelis
38 public class ShardCommitCoordinator {
40 // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
41 public interface CohortDecorator {
42 DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
45 private final Cache<String, CohortEntry> cohortCache;
47 private CohortEntry currentCohortEntry;
49 private final DOMTransactionFactory transactionFactory;
51 private final Queue<CohortEntry> queuedCohortEntries;
53 private int queueCapacity;
55 private final Logger log;
57 private final String name;
59 private final String shardActorPath;
61 private final RemovalListener<String, CohortEntry> cacheRemovalListener =
62 new RemovalListener<String, CohortEntry>() {
64 public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
65 if(notification.getCause() == RemovalCause.EXPIRED) {
66 log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
71 // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
72 private CohortDecorator cohortDecorator;
74 public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
75 long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
77 this.queueCapacity = queueCapacity;
80 this.transactionFactory = transactionFactory;
82 shardActorPath = Serialization.serializedActorPath(shardActor);
84 cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
85 removalListener(cacheRemovalListener).build();
87 // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
88 // since this should only be accessed on the shard's dispatcher.
89 queuedCohortEntries = new LinkedList<>();
92 public void setQueueCapacity(int queueCapacity) {
93 this.queueCapacity = queueCapacity;
97 * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
98 * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
100 * @param transactionID the ID of the transaction
101 * @param cohort the cohort to participate in the transaction commit
102 * @param modification the modifications made by the transaction
104 public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
105 MutableCompositeModification modification) {
107 cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
111 * This method handles a BatchedModifications message for a transaction being prepared directly on the
112 * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
113 * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
114 * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
116 * @param batched the BatchedModifications
117 * @param shardActor the transaction's shard actor
119 * @throws ExecutionException if an error occurs loading the cache
121 public boolean handleTransactionModifications(BatchedModifications batched)
122 throws ExecutionException {
123 CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
124 if(cohortEntry == null) {
125 cohortEntry = new CohortEntry(batched.getTransactionID(),
126 transactionFactory.<DOMStoreWriteTransaction>newTransaction(
127 TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
128 batched.getTransactionChainID()));
129 cohortCache.put(batched.getTransactionID(), cohortEntry);
132 if(log.isDebugEnabled()) {
133 log.debug("{}: Applying {} batched modifications for Tx {}", name,
134 batched.getModifications().size(), batched.getTransactionID());
137 cohortEntry.applyModifications(batched.getModifications());
139 if(batched.isReady()) {
140 if(log.isDebugEnabled()) {
141 log.debug("{}: Readying Tx {}, client version {}", name,
142 batched.getTransactionID(), batched.getVersion());
145 cohortEntry.ready(cohortDecorator);
148 return batched.isReady();
152 * This method handles the canCommit phase for a transaction.
154 * @param canCommit the CanCommitTransaction message
155 * @param sender the actor that sent the message
156 * @param shard the transaction's shard actor
158 public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
159 final ActorRef shard) {
160 String transactionID = canCommit.getTransactionID();
161 if(log.isDebugEnabled()) {
162 log.debug("{}: Processing canCommit for transaction {} for shard {}",
163 name, transactionID, shard.path());
166 // Lookup the cohort entry that was cached previously (or should have been) by
167 // transactionReady (via the ForwardedReadyTransaction message).
168 final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
169 if(cohortEntry == null) {
170 // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
171 // between canCommit and ready and the entry was expired from the cache.
172 IllegalStateException ex = new IllegalStateException(
173 String.format("%s: No cohort entry found for transaction %s", name, transactionID));
174 log.error(ex.getMessage());
175 sender.tell(new Status.Failure(ex), shard);
179 cohortEntry.setCanCommitSender(sender);
180 cohortEntry.setShard(shard);
182 if(currentCohortEntry != null) {
183 // There's already a Tx commit in progress - attempt to queue this entry to be
184 // committed after the current Tx completes.
185 log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
186 name, currentCohortEntry.getTransactionID(), transactionID);
188 if(queuedCohortEntries.size() < queueCapacity) {
189 queuedCohortEntries.offer(cohortEntry);
191 removeCohortEntry(transactionID);
193 RuntimeException ex = new RuntimeException(
194 String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
195 " capacity %d has been reached.",
196 name, transactionID, queueCapacity));
197 log.error(ex.getMessage());
198 sender.tell(new Status.Failure(ex), shard);
201 // No Tx commit currently in progress - make this the current entry and proceed with
203 cohortEntry.updateLastAccessTime();
204 currentCohortEntry = cohortEntry;
206 doCanCommit(cohortEntry);
210 private void doCanCommit(final CohortEntry cohortEntry) {
213 // We block on the future here so we don't have to worry about possibly accessing our
214 // state on a different thread outside of our dispatcher. Also, the data store
215 // currently uses a same thread executor anyway.
216 Boolean canCommit = cohortEntry.getCohort().canCommit().get();
218 cohortEntry.getCanCommitSender().tell(
219 canCommit ? CanCommitTransactionReply.YES.toSerializable() :
220 CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
223 // Remove the entry from the cache now since the Tx will be aborted.
224 removeCohortEntry(cohortEntry.getTransactionID());
226 } catch (InterruptedException | ExecutionException e) {
227 log.debug("{}: An exception occurred during canCommit: {}", name, e);
229 // Remove the entry from the cache now since the Tx will be aborted.
230 removeCohortEntry(cohortEntry.getTransactionID());
231 cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
236 * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
237 * matches the current entry.
239 * @param transactionID the ID of the transaction
240 * @return the current CohortEntry or null if the given transaction ID does not match the
243 public CohortEntry getCohortEntryIfCurrent(String transactionID) {
244 if(isCurrentTransaction(transactionID)) {
245 return currentCohortEntry;
251 public CohortEntry getCurrentCohortEntry() {
252 return currentCohortEntry;
255 public CohortEntry getAndRemoveCohortEntry(String transactionID) {
256 CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
257 cohortCache.invalidate(transactionID);
261 public void removeCohortEntry(String transactionID) {
262 cohortCache.invalidate(transactionID);
265 public boolean isCurrentTransaction(String transactionID) {
266 return currentCohortEntry != null &&
267 currentCohortEntry.getTransactionID().equals(transactionID);
271 * This method is called when a transaction is complete, successful or not. If the given
272 * given transaction ID matches the current in-progress transaction, the next cohort entry,
273 * if any, is dequeued and processed.
275 * @param transactionID the ID of the completed transaction
276 * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
279 public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
280 if(removeCohortEntry) {
281 removeCohortEntry(transactionID);
284 if(isCurrentTransaction(transactionID)) {
285 // Dequeue the next cohort entry waiting in the queue.
286 currentCohortEntry = queuedCohortEntries.poll();
287 if(currentCohortEntry != null) {
288 currentCohortEntry.updateLastAccessTime();
289 doCanCommit(currentCohortEntry);
295 void setCohortDecorator(CohortDecorator cohortDecorator) {
296 this.cohortDecorator = cohortDecorator;
300 static class CohortEntry {
301 private final String transactionID;
302 private DOMStoreThreePhaseCommitCohort cohort;
303 private final MutableCompositeModification compositeModification;
304 private final DOMStoreWriteTransaction transaction;
305 private ActorRef canCommitSender;
306 private ActorRef shard;
307 private long lastAccessTime;
309 CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
310 this.compositeModification = new MutableCompositeModification();
311 this.transaction = transaction;
312 this.transactionID = transactionID;
315 CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
316 MutableCompositeModification compositeModification) {
317 this.transactionID = transactionID;
318 this.cohort = cohort;
319 this.compositeModification = compositeModification;
320 this.transaction = null;
323 void updateLastAccessTime() {
324 lastAccessTime = System.currentTimeMillis();
327 long getLastAccessTime() {
328 return lastAccessTime;
331 String getTransactionID() {
332 return transactionID;
335 DOMStoreThreePhaseCommitCohort getCohort() {
339 MutableCompositeModification getModification() {
340 return compositeModification;
343 void applyModifications(Iterable<Modification> modifications) {
344 for(Modification modification: modifications) {
345 compositeModification.addModification(modification);
346 modification.apply(transaction);
350 void ready(CohortDecorator cohortDecorator) {
351 Preconditions.checkState(cohort == null, "cohort was already set");
353 cohort = transaction.ready();
355 if(cohortDecorator != null) {
356 // Call the hook for unit tests.
357 cohort = cohortDecorator.decorate(transactionID, cohort);
361 ActorRef getCanCommitSender() {
362 return canCommitSender;
365 void setCanCommitSender(ActorRef canCommitSender) {
366 this.canCommitSender = canCommitSender;
369 ActorRef getShard() {
373 void setShard(ActorRef shard) {
377 boolean hasModifications(){
378 return compositeModification.getModifications().size() > 0;