2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
13 import akka.actor.ActorSelection;
14 import akka.dispatch.Futures;
15 import java.util.AbstractMap.SimpleImmutableEntry;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Map.Entry;
20 import java.util.Optional;
21 import java.util.SortedSet;
22 import java.util.concurrent.TimeUnit;
23 import org.checkerframework.checker.lock.qual.GuardedBy;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29 import scala.concurrent.Promise;
32 * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
33 * TransactionContext instance are cached until the TransactionContext instance becomes available at which
34 * time they are executed.
36 * @author Thomas Pantelis
38 class TransactionContextWrapper {
39 private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
42 * The list of transaction operations to execute once the TransactionContext becomes available.
44 @GuardedBy("queuedTxOperations")
45 private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
46 private final TransactionIdentifier identifier;
47 private final OperationLimiter limiter;
48 private final String shardName;
51 * The resulting TransactionContext.
53 private volatile TransactionContext transactionContext;
54 @GuardedBy("queuedTxOperations")
55 private TransactionContext deferredTransactionContext;
56 @GuardedBy("queuedTxOperations")
57 private boolean pendingEnqueue;
59 TransactionContextWrapper(final TransactionIdentifier identifier, final ActorUtils actorUtils,
60 final String shardName) {
61 this.identifier = requireNonNull(identifier);
62 this.limiter = new OperationLimiter(identifier,
63 // 1 extra permit for the ready operation
64 actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1,
65 TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis()));
66 this.shardName = requireNonNull(shardName);
69 TransactionContext getTransactionContext() {
70 return transactionContext;
73 TransactionIdentifier getIdentifier() {
78 * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
79 * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
80 * context is not available.
82 private void enqueueTransactionOperation(final TransactionOperation operation) {
83 // We have three things to do here:
84 // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
85 // - acquire a permit for the operation if we still need to enqueue it
86 // - enqueue the operation
88 // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
89 // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
91 // - this method may be called from the thread invoking executePriorTransactionOperations()
92 // - user may be violating API contract of using the transaction from a single thread
94 // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
95 // the lock, we will assert that we will be enqueing another operation.
96 final TransactionContext contextOnEntry;
97 synchronized (queuedTxOperations) {
98 contextOnEntry = transactionContext;
99 if (contextOnEntry == null) {
100 checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", identifier);
101 pendingEnqueue = true;
105 // Short-circuit if there is a context
106 if (contextOnEntry != null) {
107 operation.invoke(transactionContext, null);
111 boolean cleanupEnqueue = true;
112 TransactionContext finishHandoff = null;
114 // Acquire the permit,
115 final boolean havePermit = limiter.acquire();
117 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
121 // Ready to enqueue, take the lock again and append the operation
122 synchronized (queuedTxOperations) {
123 LOG.debug("Tx {} Queuing TransactionOperation", identifier);
124 queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
125 pendingEnqueue = false;
126 cleanupEnqueue = false;
127 finishHandoff = deferredTransactionContext;
128 deferredTransactionContext = null;
131 if (cleanupEnqueue) {
132 synchronized (queuedTxOperations) {
133 pendingEnqueue = false;
134 finishHandoff = deferredTransactionContext;
135 deferredTransactionContext = null;
138 if (finishHandoff != null) {
139 executePriorTransactionOperations(finishHandoff);
144 void maybeExecuteTransactionOperation(final TransactionOperation op) {
145 final TransactionContext localContext = transactionContext;
146 if (localContext != null) {
147 op.invoke(localContext, null);
149 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
150 // callback to be executed after the Tx is created.
151 enqueueTransactionOperation(op);
155 void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
157 // Access to queuedTxOperations and transactionContext must be protected and atomic
158 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
159 // issues and ensure no TransactionOperation is missed and that they are processed
160 // in the order they occurred.
162 // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
163 // in case a TransactionOperation results in another transaction operation being
164 // queued (eg a put operation from a client read Future callback that is notified
166 final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
167 synchronized (queuedTxOperations) {
168 if (queuedTxOperations.isEmpty()) {
169 if (!pendingEnqueue) {
170 // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
171 localTransactionContext.operationHandOffComplete();
173 // This is null-to-non-null transition after which we are releasing the lock and not doing
174 // any further processing.
175 transactionContext = localTransactionContext;
177 deferredTransactionContext = localTransactionContext;
182 operationsBatch = new ArrayList<>(queuedTxOperations);
183 queuedTxOperations.clear();
186 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is
187 // that we need to re-acquire the lock below but this should be negligible.
188 for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
189 final Boolean permit = oper.getValue();
190 if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
191 // If the context is not using limiting we need to release operations as we are queueing them, so
192 // user threads are not charged for them.
195 oper.getKey().invoke(localTransactionContext, permit);
200 Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames) {
201 // avoid the creation of a promise and a TransactionOperation
202 final TransactionContext localContext = transactionContext;
203 if (localContext != null) {
204 return localContext.readyTransaction(null, participatingShardNames);
207 final Promise<ActorSelection> promise = Futures.promise();
208 enqueueTransactionOperation(new TransactionOperation() {
210 public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
211 promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
215 return promise.future();
218 OperationLimiter getLimiter() {