Fix TransactionContextWrapper limiter accounting
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.Futures;
12 import com.google.common.base.Preconditions;
13 import java.util.AbstractMap.SimpleImmutableEntry;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.Map.Entry;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
21 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24 import scala.concurrent.Future;
25 import scala.concurrent.Promise;
26
27 /**
28  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
29  * TransactionContext instance are cached until the TransactionContext instance becomes available at which
30  * time they are executed.
31  *
32  * @author Thomas Pantelis
33  */
34 class TransactionContextWrapper {
35     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
36
37     /**
38      * The list of transaction operations to execute once the TransactionContext becomes available.
39      */
40     @GuardedBy("queuedTxOperations")
41     private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
42     private final TransactionIdentifier identifier;
43     private final OperationLimiter limiter;
44     private final String shardName;
45
46     /**
47      * The resulting TransactionContext.
48      */
49     private volatile TransactionContext transactionContext;
50     @GuardedBy("queuedTxOperations")
51     private TransactionContext deferredTransactionContext;
52     @GuardedBy("queuedTxOperations")
53     private boolean pendingEnqueue;
54
55     TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
56             final String shardName) {
57         this.identifier = Preconditions.checkNotNull(identifier);
58         this.limiter = new OperationLimiter(identifier,
59                 // 1 extra permit for the ready operation
60                 actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
61                 TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
62         this.shardName = Preconditions.checkNotNull(shardName);
63     }
64
65     TransactionContext getTransactionContext() {
66         return transactionContext;
67     }
68
69     TransactionIdentifier getIdentifier() {
70         return identifier;
71     }
72
73     /**
74      * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
75      * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
76      * context is not available.
77      */
78     private void enqueueTransactionOperation(final TransactionOperation operation) {
79         // We have three things to do here:
80         // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
81         // - acquire a permit for the operation if we still need to enqueue it
82         // - enqueue the operation
83         //
84         // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
85         // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
86         // complications are:
87         // - this method may be called from the thread invoking executePriorTransactionOperations()
88         // - user may be violating API contract of using the transaction from a single thread
89
90         // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
91         // the lock, we will assert that we will be enqueing another operation.
92         final TransactionContext contextOnEntry;
93         synchronized (queuedTxOperations) {
94             contextOnEntry = transactionContext;
95             if (contextOnEntry == null) {
96                 Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected",
97                         identifier);
98                 pendingEnqueue = true;
99             }
100         }
101
102         // Short-circuit if there is a context
103         if (contextOnEntry != null) {
104             operation.invoke(transactionContext, null);
105             return;
106         }
107
108         boolean cleanupEnqueue = true;
109         TransactionContext finishHandoff = null;
110         try {
111             // Acquire the permit,
112             final boolean havePermit = limiter.acquire();
113             if (!havePermit) {
114                 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
115                     shardName);
116             }
117
118             // Ready to enqueue, take the lock again and append the operation
119             synchronized (queuedTxOperations) {
120                 LOG.debug("Tx {} Queuing TransactionOperation", identifier);
121                 queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
122                 pendingEnqueue = false;
123                 cleanupEnqueue = false;
124                 finishHandoff = deferredTransactionContext;
125                 deferredTransactionContext = null;
126             }
127         } finally {
128             if (cleanupEnqueue) {
129                 synchronized (queuedTxOperations) {
130                     pendingEnqueue = false;
131                     finishHandoff = deferredTransactionContext;
132                     deferredTransactionContext = null;
133                 }
134             }
135             if (finishHandoff != null) {
136                 executePriorTransactionOperations(finishHandoff);
137             }
138         }
139     }
140
141     void maybeExecuteTransactionOperation(final TransactionOperation op) {
142         final TransactionContext localContext = transactionContext;
143         if (localContext != null) {
144             op.invoke(localContext, null);
145         } else {
146             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
147             // callback to be executed after the Tx is created.
148             enqueueTransactionOperation(op);
149         }
150     }
151
152     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
153         while (true) {
154             // Access to queuedTxOperations and transactionContext must be protected and atomic
155             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
156             // issues and ensure no TransactionOperation is missed and that they are processed
157             // in the order they occurred.
158
159             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
160             // in case a TransactionOperation results in another transaction operation being
161             // queued (eg a put operation from a client read Future callback that is notified
162             // synchronously).
163             final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
164             synchronized (queuedTxOperations) {
165                 if (queuedTxOperations.isEmpty()) {
166                     if (!pendingEnqueue) {
167                         // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
168                         localTransactionContext.operationHandOffComplete();
169                         if (!localTransactionContext.usesOperationLimiting()) {
170                             limiter.releaseAll();
171                         }
172
173                         // This is null-to-non-null transition after which we are releasing the lock and not doing
174                         // any further processing.
175                         transactionContext = localTransactionContext;
176                     } else {
177                         deferredTransactionContext = localTransactionContext;
178                     }
179                     return;
180                 }
181
182                 operationsBatch = new ArrayList<>(queuedTxOperations);
183                 queuedTxOperations.clear();
184             }
185
186             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
187             // A slight down-side is that we need to re-acquire the lock below but this should
188             // be negligible.
189             for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
190                 oper.getKey().invoke(localTransactionContext, oper.getValue());
191             }
192         }
193     }
194
195     Future<ActorSelection> readyTransaction() {
196         // avoid the creation of a promise and a TransactionOperation
197         final TransactionContext localContext = transactionContext;
198         if (localContext != null) {
199             return localContext.readyTransaction(null);
200         }
201
202         final Promise<ActorSelection> promise = Futures.promise();
203         enqueueTransactionOperation(new TransactionOperation() {
204             @Override
205             public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
206                 promise.completeWith(newTransactionContext.readyTransaction(havePermit));
207             }
208         });
209
210         return promise.future();
211     }
212
213     OperationLimiter getLimiter() {
214         return limiter;
215     }
216 }