2 * Copyright (c) 2014 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorRef;
11 import akka.actor.Status;
12 import akka.serialization.Serialization;
13 import com.google.common.annotations.VisibleForTesting;
14 import com.google.common.base.Preconditions;
15 import com.google.common.cache.Cache;
16 import com.google.common.cache.CacheBuilder;
17 import com.google.common.cache.RemovalCause;
18 import com.google.common.cache.RemovalListener;
19 import com.google.common.cache.RemovalNotification;
20 import java.util.LinkedList;
21 import java.util.Queue;
22 import java.util.concurrent.ExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
25 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
28 import org.opendaylight.controller.cluster.datastore.modification.Modification;
29 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.slf4j.Logger;
35 * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
37 * @author Thomas Pantelis
39 public class ShardCommitCoordinator {
41 // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
42 public interface CohortDecorator {
43 DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
46 private final Cache<String, CohortEntry> cohortCache;
48 private CohortEntry currentCohortEntry;
50 private final DOMTransactionFactory transactionFactory;
52 private final Queue<CohortEntry> queuedCohortEntries;
54 private int queueCapacity;
56 private final Logger log;
58 private final String name;
60 private final String shardActorPath;
62 private final RemovalListener<String, CohortEntry> cacheRemovalListener =
63 new RemovalListener<String, CohortEntry>() {
65 public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
66 if(notification.getCause() == RemovalCause.EXPIRED) {
67 log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
72 // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
73 private CohortDecorator cohortDecorator;
75 public ShardCommitCoordinator(DOMTransactionFactory transactionFactory,
76 long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
78 this.queueCapacity = queueCapacity;
81 this.transactionFactory = transactionFactory;
83 shardActorPath = Serialization.serializedActorPath(shardActor);
85 cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
86 removalListener(cacheRemovalListener).build();
88 // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
89 // since this should only be accessed on the shard's dispatcher.
90 queuedCohortEntries = new LinkedList<>();
93 public void setQueueCapacity(int queueCapacity) {
94 this.queueCapacity = queueCapacity;
98 * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
99 * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
101 * @param transactionID the ID of the transaction
102 * @param cohort the cohort to participate in the transaction commit
103 * @param modification the modifications made by the transaction
105 public void transactionReady(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
106 MutableCompositeModification modification) {
108 cohortCache.put(transactionID, new CohortEntry(transactionID, cohort, modification));
112 * This method handles a BatchedModifications message for a transaction being prepared directly on the
113 * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
114 * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
115 * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
117 * @param batched the BatchedModifications
118 * @param shardActor the transaction's shard actor
120 * @throws ExecutionException if an error occurs loading the cache
122 public BatchedModificationsReply handleTransactionModifications(BatchedModifications batched)
123 throws ExecutionException {
124 CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
125 if(cohortEntry == null) {
126 cohortEntry = new CohortEntry(batched.getTransactionID(),
127 transactionFactory.<DOMStoreWriteTransaction>newTransaction(
128 TransactionProxy.TransactionType.WRITE_ONLY, batched.getTransactionID(),
129 batched.getTransactionChainID()));
130 cohortCache.put(batched.getTransactionID(), cohortEntry);
133 if(log.isDebugEnabled()) {
134 log.debug("{}: Applying {} batched modifications for Tx {}", name,
135 batched.getModifications().size(), batched.getTransactionID());
138 cohortEntry.applyModifications(batched.getModifications());
140 String cohortPath = null;
141 if(batched.isReady()) {
142 if(log.isDebugEnabled()) {
143 log.debug("{}: Readying Tx {}, client version {}", name,
144 batched.getTransactionID(), batched.getVersion());
147 cohortEntry.ready(cohortDecorator);
148 cohortPath = shardActorPath;
151 return new BatchedModificationsReply(batched.getModifications().size(), cohortPath);
155 * This method handles the canCommit phase for a transaction.
157 * @param canCommit the CanCommitTransaction message
158 * @param sender the actor that sent the message
159 * @param shard the transaction's shard actor
161 public void handleCanCommit(CanCommitTransaction canCommit, final ActorRef sender,
162 final ActorRef shard) {
163 String transactionID = canCommit.getTransactionID();
164 if(log.isDebugEnabled()) {
165 log.debug("{}: Processing canCommit for transaction {} for shard {}",
166 name, transactionID, shard.path());
169 // Lookup the cohort entry that was cached previously (or should have been) by
170 // transactionReady (via the ForwardedReadyTransaction message).
171 final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
172 if(cohortEntry == null) {
173 // Either canCommit was invoked before ready(shouldn't happen) or a long time passed
174 // between canCommit and ready and the entry was expired from the cache.
175 IllegalStateException ex = new IllegalStateException(
176 String.format("%s: No cohort entry found for transaction %s", name, transactionID));
177 log.error(ex.getMessage());
178 sender.tell(new Status.Failure(ex), shard);
182 cohortEntry.setCanCommitSender(sender);
183 cohortEntry.setShard(shard);
185 if(currentCohortEntry != null) {
186 // There's already a Tx commit in progress - attempt to queue this entry to be
187 // committed after the current Tx completes.
188 log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
189 name, currentCohortEntry.getTransactionID(), transactionID);
191 if(queuedCohortEntries.size() < queueCapacity) {
192 queuedCohortEntries.offer(cohortEntry);
194 removeCohortEntry(transactionID);
196 RuntimeException ex = new RuntimeException(
197 String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
198 " capacity %d has been reached.",
199 name, transactionID, queueCapacity));
200 log.error(ex.getMessage());
201 sender.tell(new Status.Failure(ex), shard);
204 // No Tx commit currently in progress - make this the current entry and proceed with
206 cohortEntry.updateLastAccessTime();
207 currentCohortEntry = cohortEntry;
209 doCanCommit(cohortEntry);
213 private void doCanCommit(final CohortEntry cohortEntry) {
216 // We block on the future here so we don't have to worry about possibly accessing our
217 // state on a different thread outside of our dispatcher. Also, the data store
218 // currently uses a same thread executor anyway.
219 Boolean canCommit = cohortEntry.getCohort().canCommit().get();
221 cohortEntry.getCanCommitSender().tell(
222 canCommit ? CanCommitTransactionReply.YES.toSerializable() :
223 CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard());
226 // Remove the entry from the cache now since the Tx will be aborted.
227 removeCohortEntry(cohortEntry.getTransactionID());
229 } catch (InterruptedException | ExecutionException e) {
230 log.debug("{}: An exception occurred during canCommit: {}", name, e);
232 // Remove the entry from the cache now since the Tx will be aborted.
233 removeCohortEntry(cohortEntry.getTransactionID());
234 cohortEntry.getCanCommitSender().tell(new Status.Failure(e), cohortEntry.getShard());
239 * Returns the cohort entry for the Tx commit currently in progress if the given transaction ID
240 * matches the current entry.
242 * @param transactionID the ID of the transaction
243 * @return the current CohortEntry or null if the given transaction ID does not match the
246 public CohortEntry getCohortEntryIfCurrent(String transactionID) {
247 if(isCurrentTransaction(transactionID)) {
248 return currentCohortEntry;
254 public CohortEntry getCurrentCohortEntry() {
255 return currentCohortEntry;
258 public CohortEntry getAndRemoveCohortEntry(String transactionID) {
259 CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
260 cohortCache.invalidate(transactionID);
264 public void removeCohortEntry(String transactionID) {
265 cohortCache.invalidate(transactionID);
268 public boolean isCurrentTransaction(String transactionID) {
269 return currentCohortEntry != null &&
270 currentCohortEntry.getTransactionID().equals(transactionID);
274 * This method is called when a transaction is complete, successful or not. If the given
275 * given transaction ID matches the current in-progress transaction, the next cohort entry,
276 * if any, is dequeued and processed.
278 * @param transactionID the ID of the completed transaction
279 * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
282 public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
283 if(removeCohortEntry) {
284 removeCohortEntry(transactionID);
287 if(isCurrentTransaction(transactionID)) {
288 // Dequeue the next cohort entry waiting in the queue.
289 currentCohortEntry = queuedCohortEntries.poll();
290 if(currentCohortEntry != null) {
291 currentCohortEntry.updateLastAccessTime();
292 doCanCommit(currentCohortEntry);
298 void setCohortDecorator(CohortDecorator cohortDecorator) {
299 this.cohortDecorator = cohortDecorator;
303 static class CohortEntry {
304 private final String transactionID;
305 private DOMStoreThreePhaseCommitCohort cohort;
306 private final MutableCompositeModification compositeModification;
307 private final DOMStoreWriteTransaction transaction;
308 private ActorRef canCommitSender;
309 private ActorRef shard;
310 private long lastAccessTime;
312 CohortEntry(String transactionID, DOMStoreWriteTransaction transaction) {
313 this.compositeModification = new MutableCompositeModification();
314 this.transaction = transaction;
315 this.transactionID = transactionID;
318 CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
319 MutableCompositeModification compositeModification) {
320 this.transactionID = transactionID;
321 this.cohort = cohort;
322 this.compositeModification = compositeModification;
323 this.transaction = null;
326 void updateLastAccessTime() {
327 lastAccessTime = System.currentTimeMillis();
330 long getLastAccessTime() {
331 return lastAccessTime;
334 String getTransactionID() {
335 return transactionID;
338 DOMStoreThreePhaseCommitCohort getCohort() {
342 MutableCompositeModification getModification() {
343 return compositeModification;
346 void applyModifications(Iterable<Modification> modifications) {
347 for(Modification modification: modifications) {
348 compositeModification.addModification(modification);
349 modification.apply(transaction);
353 void ready(CohortDecorator cohortDecorator) {
354 Preconditions.checkState(cohort == null, "cohort was already set");
356 cohort = transaction.ready();
358 if(cohortDecorator != null) {
359 // Call the hook for unit tests.
360 cohort = cohortDecorator.decorate(transactionID, cohort);
364 ActorRef getCanCommitSender() {
365 return canCommitSender;
368 void setCanCommitSender(ActorRef canCommitSender) {
369 this.canCommitSender = canCommitSender;
372 ActorRef getShard() {
376 void setShard(ActorRef shard) {
380 boolean hasModifications(){
381 return compositeModification.getModifications().size() > 0;