2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorSelection;
11 import akka.dispatch.Futures;
12 import com.google.common.base.Preconditions;
13 import java.util.AbstractMap.SimpleImmutableEntry;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.Map.Entry;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24 import scala.concurrent.Future;
25 import scala.concurrent.Promise;
28 * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
29 * TransactionContext instance are cached until the TransactionContext instance becomes available at which
30 * time they are executed.
32 * @author Thomas Pantelis
34 class TransactionContextWrapper {
35 private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
38 * The list of transaction operations to execute once the TransactionContext becomes available.
40 @GuardedBy("queuedTxOperations")
41 private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
42 private final TransactionIdentifier identifier;
43 private final OperationLimiter limiter;
44 private final String shardName;
47 * The resulting TransactionContext.
49 private volatile TransactionContext transactionContext;
50 @GuardedBy("queuedTxOperations")
51 private TransactionContext deferredTransactionContext;
52 @GuardedBy("queuedTxOperations")
53 private boolean pendingEnqueue;
55 TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
56 final String shardName) {
57 this.identifier = Preconditions.checkNotNull(identifier);
58 this.limiter = new OperationLimiter(identifier,
59 // 1 extra permit for the ready operation
60 actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
61 TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
62 this.shardName = Preconditions.checkNotNull(shardName);
65 TransactionContext getTransactionContext() {
66 return transactionContext;
69 TransactionIdentifier getIdentifier() {
74 * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
75 * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
76 * context is not available.
78 private void enqueueTransactionOperation(final TransactionOperation operation) {
79 // We have three things to do here:
80 // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
81 // - acquire a permit for the operation if we still need to enqueue it
82 // - enqueue the operation
84 // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
85 // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
87 // - this method may be called from the thread invoking executePriorTransactionOperations()
88 // - user may be violating API contract of using the transaction from a single thread
90 // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
91 // the lock, we will assert that we will be enqueing another operation.
92 final TransactionContext contextOnEntry;
93 synchronized (queuedTxOperations) {
94 contextOnEntry = transactionContext;
95 if (contextOnEntry == null) {
96 Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected",
98 pendingEnqueue = true;
102 // Short-circuit if there is a context
103 if (contextOnEntry != null) {
104 operation.invoke(transactionContext, null);
108 boolean cleanupEnqueue = true;
109 TransactionContext finishHandoff = null;
111 // Acquire the permit,
112 final boolean havePermit = limiter.acquire();
114 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
118 // Ready to enqueue, take the lock again and append the operation
119 synchronized (queuedTxOperations) {
120 LOG.debug("Tx {} Queuing TransactionOperation", identifier);
121 queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
122 pendingEnqueue = false;
123 cleanupEnqueue = false;
124 finishHandoff = deferredTransactionContext;
125 deferredTransactionContext = null;
128 if (cleanupEnqueue) {
129 synchronized (queuedTxOperations) {
130 pendingEnqueue = false;
131 finishHandoff = deferredTransactionContext;
132 deferredTransactionContext = null;
135 if (finishHandoff != null) {
136 executePriorTransactionOperations(finishHandoff);
141 void maybeExecuteTransactionOperation(final TransactionOperation op) {
142 final TransactionContext localContext = transactionContext;
143 if (localContext != null) {
144 op.invoke(localContext, null);
146 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
147 // callback to be executed after the Tx is created.
148 enqueueTransactionOperation(op);
152 void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
154 // Access to queuedTxOperations and transactionContext must be protected and atomic
155 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
156 // issues and ensure no TransactionOperation is missed and that they are processed
157 // in the order they occurred.
159 // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
160 // in case a TransactionOperation results in another transaction operation being
161 // queued (eg a put operation from a client read Future callback that is notified
163 final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
164 synchronized (queuedTxOperations) {
165 if (queuedTxOperations.isEmpty()) {
166 if (!pendingEnqueue) {
167 // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
168 localTransactionContext.operationHandOffComplete();
170 // This is null-to-non-null transition after which we are releasing the lock and not doing
171 // any further processing.
172 transactionContext = localTransactionContext;
174 deferredTransactionContext = localTransactionContext;
179 operationsBatch = new ArrayList<>(queuedTxOperations);
180 queuedTxOperations.clear();
183 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is
184 // that we need to re-acquire the lock below but this should be negligible.
185 for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
186 final Boolean permit = oper.getValue();
187 if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
188 // If the context is not using limiting we need to release operations as we are queueing them, so
189 // user threads are not charged for them.
192 oper.getKey().invoke(localTransactionContext, permit);
197 Future<ActorSelection> readyTransaction() {
198 // avoid the creation of a promise and a TransactionOperation
199 final TransactionContext localContext = transactionContext;
200 if (localContext != null) {
201 return localContext.readyTransaction(null);
204 final Promise<ActorSelection> promise = Futures.promise();
205 enqueueTransactionOperation(new TransactionOperation() {
207 public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
208 promise.completeWith(newTransactionContext.readyTransaction(havePermit));
212 return promise.future();
215 OperationLimiter getLimiter() {