Raise EOS unsuccessful request reporting to error
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorSelection;
14 import akka.dispatch.Futures;
15 import java.util.AbstractMap.SimpleImmutableEntry;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Map.Entry;
20 import java.util.Optional;
21 import java.util.SortedSet;
22 import java.util.concurrent.TimeUnit;
23 import org.checkerframework.checker.lock.qual.GuardedBy;
24 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
25 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29 import scala.concurrent.Promise;
30
31 /**
32  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
33  * TransactionContext instance are cached until the TransactionContext instance becomes available at which
34  * time they are executed.
35  *
36  * @author Thomas Pantelis
37  */
38 class TransactionContextWrapper {
39     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
40
41     /**
42      * The list of transaction operations to execute once the TransactionContext becomes available.
43      */
44     @GuardedBy("queuedTxOperations")
45     private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
46     private final TransactionIdentifier identifier;
47     private final OperationLimiter limiter;
48     private final String shardName;
49
50     /**
51      * The resulting TransactionContext.
52      */
53     private volatile TransactionContext transactionContext;
54     @GuardedBy("queuedTxOperations")
55     private TransactionContext deferredTransactionContext;
56     @GuardedBy("queuedTxOperations")
57     private boolean pendingEnqueue;
58
59     TransactionContextWrapper(final TransactionIdentifier identifier, final ActorUtils actorUtils,
60             final String shardName) {
61         this.identifier = requireNonNull(identifier);
62         this.limiter = new OperationLimiter(identifier,
63                 // 1 extra permit for the ready operation
64                 actorUtils.getDatastoreContext().getShardBatchedModificationCount() + 1,
65                 TimeUnit.MILLISECONDS.toSeconds(actorUtils.getDatastoreContext().getOperationTimeoutInMillis()));
66         this.shardName = requireNonNull(shardName);
67     }
68
69     TransactionContext getTransactionContext() {
70         return transactionContext;
71     }
72
73     TransactionIdentifier getIdentifier() {
74         return identifier;
75     }
76
77     /**
78      * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
79      * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
80      * context is not available.
81      */
82     private void enqueueTransactionOperation(final TransactionOperation operation) {
83         // We have three things to do here:
84         // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
85         // - acquire a permit for the operation if we still need to enqueue it
86         // - enqueue the operation
87         //
88         // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
89         // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
90         // complications are:
91         // - this method may be called from the thread invoking executePriorTransactionOperations()
92         // - user may be violating API contract of using the transaction from a single thread
93
94         // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
95         // the lock, we will assert that we will be enqueing another operation.
96         final TransactionContext contextOnEntry;
97         synchronized (queuedTxOperations) {
98             contextOnEntry = transactionContext;
99             if (contextOnEntry == null) {
100                 checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", identifier);
101                 pendingEnqueue = true;
102             }
103         }
104
105         // Short-circuit if there is a context
106         if (contextOnEntry != null) {
107             operation.invoke(transactionContext, null);
108             return;
109         }
110
111         boolean cleanupEnqueue = true;
112         TransactionContext finishHandoff = null;
113         try {
114             // Acquire the permit,
115             final boolean havePermit = limiter.acquire();
116             if (!havePermit) {
117                 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
118                     shardName);
119             }
120
121             // Ready to enqueue, take the lock again and append the operation
122             synchronized (queuedTxOperations) {
123                 LOG.debug("Tx {} Queuing TransactionOperation", identifier);
124                 queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
125                 pendingEnqueue = false;
126                 cleanupEnqueue = false;
127                 finishHandoff = deferredTransactionContext;
128                 deferredTransactionContext = null;
129             }
130         } finally {
131             if (cleanupEnqueue) {
132                 synchronized (queuedTxOperations) {
133                     pendingEnqueue = false;
134                     finishHandoff = deferredTransactionContext;
135                     deferredTransactionContext = null;
136                 }
137             }
138             if (finishHandoff != null) {
139                 executePriorTransactionOperations(finishHandoff);
140             }
141         }
142     }
143
144     void maybeExecuteTransactionOperation(final TransactionOperation op) {
145         final TransactionContext localContext = transactionContext;
146         if (localContext != null) {
147             op.invoke(localContext, null);
148         } else {
149             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
150             // callback to be executed after the Tx is created.
151             enqueueTransactionOperation(op);
152         }
153     }
154
155     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
156         while (true) {
157             // Access to queuedTxOperations and transactionContext must be protected and atomic
158             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
159             // issues and ensure no TransactionOperation is missed and that they are processed
160             // in the order they occurred.
161
162             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
163             // in case a TransactionOperation results in another transaction operation being
164             // queued (eg a put operation from a client read Future callback that is notified
165             // synchronously).
166             final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
167             synchronized (queuedTxOperations) {
168                 if (queuedTxOperations.isEmpty()) {
169                     if (!pendingEnqueue) {
170                         // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
171                         localTransactionContext.operationHandOffComplete();
172
173                         // This is null-to-non-null transition after which we are releasing the lock and not doing
174                         // any further processing.
175                         transactionContext = localTransactionContext;
176                     } else {
177                         deferredTransactionContext = localTransactionContext;
178                     }
179                     return;
180                 }
181
182                 operationsBatch = new ArrayList<>(queuedTxOperations);
183                 queuedTxOperations.clear();
184             }
185
186             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is
187             // that we need to re-acquire the lock below but this should be negligible.
188             for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
189                 final Boolean permit = oper.getValue();
190                 if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
191                     // If the context is not using limiting we need to release operations as we are queueing them, so
192                     // user threads are not charged for them.
193                     limiter.release();
194                 }
195                 oper.getKey().invoke(localTransactionContext, permit);
196             }
197         }
198     }
199
200     Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames) {
201         // avoid the creation of a promise and a TransactionOperation
202         final TransactionContext localContext = transactionContext;
203         if (localContext != null) {
204             return localContext.readyTransaction(null, participatingShardNames);
205         }
206
207         final Promise<ActorSelection> promise = Futures.promise();
208         enqueueTransactionOperation(new TransactionOperation() {
209             @Override
210             public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
211                 promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
212             }
213         });
214
215         return promise.future();
216     }
217
218     OperationLimiter getLimiter() {
219         return limiter;
220     }
221 }