2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static com.google.common.base.Preconditions.checkState;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.Futures;
14 import java.util.AbstractMap.SimpleImmutableEntry;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.Map.Entry;
19 import java.util.Optional;
20 import java.util.SortedSet;
21 import org.checkerframework.checker.lock.qual.GuardedBy;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
24 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import scala.concurrent.Future;
28 import scala.concurrent.Promise;
31 * Delayed implementation of TransactionContextWrapper. Operations destined for the target
32 * TransactionContext instance are cached until the TransactionContext instance becomes
33 * available at which time they are executed.
35 * @author Thomas Pantelis
37 final class DelayedTransactionContextWrapper extends AbstractTransactionContextWrapper {
38 private static final Logger LOG = LoggerFactory.getLogger(DelayedTransactionContextWrapper.class);
41 * The list of transaction operations to execute once the TransactionContext becomes available.
43 @GuardedBy("queuedTxOperations")
44 private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
47 * The resulting TransactionContext.
49 private volatile TransactionContext transactionContext;
50 @GuardedBy("queuedTxOperations")
51 private TransactionContext deferredTransactionContext;
52 @GuardedBy("queuedTxOperations")
53 private boolean pendingEnqueue;
55 DelayedTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
56 @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
57 super(identifier, actorUtils, shardName);
61 TransactionContext getTransactionContext() {
62 return transactionContext;
66 void maybeExecuteTransactionOperation(final TransactionOperation op) {
67 final TransactionContext localContext = transactionContext;
68 if (localContext != null) {
69 op.invoke(localContext, null);
71 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
72 // callback to be executed after the Tx is created.
73 enqueueTransactionOperation(op);
78 Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
79 // avoid the creation of a promise and a TransactionOperation
80 final TransactionContext localContext = transactionContext;
81 if (localContext != null) {
82 return localContext.readyTransaction(null, participatingShardNames);
85 final Promise<ActorSelection> promise = Futures.promise();
86 enqueueTransactionOperation(new TransactionOperation() {
88 public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
89 promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
93 return promise.future();
97 * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
98 * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
99 * context is not available.
101 private void enqueueTransactionOperation(final TransactionOperation operation) {
102 // We have three things to do here:
103 // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
104 // - acquire a permit for the operation if we still need to enqueue it
105 // - enqueue the operation
107 // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
108 // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
109 // complications are:
110 // - this method may be called from the thread invoking executePriorTransactionOperations()
111 // - user may be violating API contract of using the transaction from a single thread
113 // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
114 // the lock, we will assert that we will be enqueing another operation.
115 final TransactionContext contextOnEntry;
116 synchronized (queuedTxOperations) {
117 contextOnEntry = transactionContext;
118 if (contextOnEntry == null) {
119 checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", getIdentifier());
120 pendingEnqueue = true;
124 // Short-circuit if there is a context
125 if (contextOnEntry != null) {
126 operation.invoke(transactionContext, null);
130 boolean cleanupEnqueue = true;
131 TransactionContext finishHandoff = null;
133 // Acquire the permit,
134 final boolean havePermit = getLimiter().acquire();
136 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", getIdentifier(),
140 // Ready to enqueue, take the lock again and append the operation
141 synchronized (queuedTxOperations) {
142 LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
143 queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
144 pendingEnqueue = false;
145 cleanupEnqueue = false;
146 finishHandoff = deferredTransactionContext;
147 deferredTransactionContext = null;
150 if (cleanupEnqueue) {
151 synchronized (queuedTxOperations) {
152 pendingEnqueue = false;
153 finishHandoff = deferredTransactionContext;
154 deferredTransactionContext = null;
157 if (finishHandoff != null) {
158 executePriorTransactionOperations(finishHandoff);
163 void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
165 // Access to queuedTxOperations and transactionContext must be protected and atomic
166 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
167 // issues and ensure no TransactionOperation is missed and that they are processed
168 // in the order they occurred.
170 // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
171 // in case a TransactionOperation results in another transaction operation being
172 // queued (eg a put operation from a client read Future callback that is notified
174 final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
175 synchronized (queuedTxOperations) {
176 if (queuedTxOperations.isEmpty()) {
177 if (!pendingEnqueue) {
178 // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
179 localTransactionContext.operationHandOffComplete();
181 // This is null-to-non-null transition after which we are releasing the lock and not doing
182 // any further processing.
183 transactionContext = localTransactionContext;
185 deferredTransactionContext = localTransactionContext;
190 operationsBatch = new ArrayList<>(queuedTxOperations);
191 queuedTxOperations.clear();
194 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is
195 // that we need to re-acquire the lock below but this should be negligible.
196 for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
197 final Boolean permit = oper.getValue();
198 if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
199 // If the context is not using limiting we need to release operations as we are queueing them, so
200 // user threads are not charged for them.
201 getLimiter().release();
203 oper.getKey().invoke(localTransactionContext, permit);