38f55f300f52334e15b718d9c2aa094b056199dc
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.Futures;
12 import com.google.common.base.Preconditions;
13 import java.util.AbstractMap.SimpleImmutableEntry;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.Map.Entry;
18 import java.util.Optional;
19 import java.util.SortedSet;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.concurrent.GuardedBy;
22 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.Future;
27 import scala.concurrent.Promise;
28
29 /**
30  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
31  * TransactionContext instance are cached until the TransactionContext instance becomes available at which
32  * time they are executed.
33  *
34  * @author Thomas Pantelis
35  */
36 class TransactionContextWrapper {
37     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
38
39     /**
40      * The list of transaction operations to execute once the TransactionContext becomes available.
41      */
42     @GuardedBy("queuedTxOperations")
43     private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
44     private final TransactionIdentifier identifier;
45     private final OperationLimiter limiter;
46     private final String shardName;
47
48     /**
49      * The resulting TransactionContext.
50      */
51     private volatile TransactionContext transactionContext;
52     @GuardedBy("queuedTxOperations")
53     private TransactionContext deferredTransactionContext;
54     @GuardedBy("queuedTxOperations")
55     private boolean pendingEnqueue;
56
57     TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
58             final String shardName) {
59         this.identifier = Preconditions.checkNotNull(identifier);
60         this.limiter = new OperationLimiter(identifier,
61                 // 1 extra permit for the ready operation
62                 actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
63                 TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
64         this.shardName = Preconditions.checkNotNull(shardName);
65     }
66
67     TransactionContext getTransactionContext() {
68         return transactionContext;
69     }
70
71     TransactionIdentifier getIdentifier() {
72         return identifier;
73     }
74
75     /**
76      * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
77      * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
78      * context is not available.
79      */
80     private void enqueueTransactionOperation(final TransactionOperation operation) {
81         // We have three things to do here:
82         // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
83         // - acquire a permit for the operation if we still need to enqueue it
84         // - enqueue the operation
85         //
86         // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
87         // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
88         // complications are:
89         // - this method may be called from the thread invoking executePriorTransactionOperations()
90         // - user may be violating API contract of using the transaction from a single thread
91
92         // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
93         // the lock, we will assert that we will be enqueing another operation.
94         final TransactionContext contextOnEntry;
95         synchronized (queuedTxOperations) {
96             contextOnEntry = transactionContext;
97             if (contextOnEntry == null) {
98                 Preconditions.checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected",
99                         identifier);
100                 pendingEnqueue = true;
101             }
102         }
103
104         // Short-circuit if there is a context
105         if (contextOnEntry != null) {
106             operation.invoke(transactionContext, null);
107             return;
108         }
109
110         boolean cleanupEnqueue = true;
111         TransactionContext finishHandoff = null;
112         try {
113             // Acquire the permit,
114             final boolean havePermit = limiter.acquire();
115             if (!havePermit) {
116                 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
117                     shardName);
118             }
119
120             // Ready to enqueue, take the lock again and append the operation
121             synchronized (queuedTxOperations) {
122                 LOG.debug("Tx {} Queuing TransactionOperation", identifier);
123                 queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
124                 pendingEnqueue = false;
125                 cleanupEnqueue = false;
126                 finishHandoff = deferredTransactionContext;
127                 deferredTransactionContext = null;
128             }
129         } finally {
130             if (cleanupEnqueue) {
131                 synchronized (queuedTxOperations) {
132                     pendingEnqueue = false;
133                     finishHandoff = deferredTransactionContext;
134                     deferredTransactionContext = null;
135                 }
136             }
137             if (finishHandoff != null) {
138                 executePriorTransactionOperations(finishHandoff);
139             }
140         }
141     }
142
143     void maybeExecuteTransactionOperation(final TransactionOperation op) {
144         final TransactionContext localContext = transactionContext;
145         if (localContext != null) {
146             op.invoke(localContext, null);
147         } else {
148             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
149             // callback to be executed after the Tx is created.
150             enqueueTransactionOperation(op);
151         }
152     }
153
154     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
155         while (true) {
156             // Access to queuedTxOperations and transactionContext must be protected and atomic
157             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
158             // issues and ensure no TransactionOperation is missed and that they are processed
159             // in the order they occurred.
160
161             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
162             // in case a TransactionOperation results in another transaction operation being
163             // queued (eg a put operation from a client read Future callback that is notified
164             // synchronously).
165             final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
166             synchronized (queuedTxOperations) {
167                 if (queuedTxOperations.isEmpty()) {
168                     if (!pendingEnqueue) {
169                         // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
170                         localTransactionContext.operationHandOffComplete();
171
172                         // This is null-to-non-null transition after which we are releasing the lock and not doing
173                         // any further processing.
174                         transactionContext = localTransactionContext;
175                     } else {
176                         deferredTransactionContext = localTransactionContext;
177                     }
178                     return;
179                 }
180
181                 operationsBatch = new ArrayList<>(queuedTxOperations);
182                 queuedTxOperations.clear();
183             }
184
185             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is
186             // that we need to re-acquire the lock below but this should be negligible.
187             for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
188                 final Boolean permit = oper.getValue();
189                 if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
190                     // If the context is not using limiting we need to release operations as we are queueing them, so
191                     // user threads are not charged for them.
192                     limiter.release();
193                 }
194                 oper.getKey().invoke(localTransactionContext, permit);
195             }
196         }
197     }
198
199     Future<ActorSelection> readyTransaction(Optional<SortedSet<String>> participatingShardNames) {
200         // avoid the creation of a promise and a TransactionOperation
201         final TransactionContext localContext = transactionContext;
202         if (localContext != null) {
203             return localContext.readyTransaction(null, participatingShardNames);
204         }
205
206         final Promise<ActorSelection> promise = Futures.promise();
207         enqueueTransactionOperation(new TransactionOperation() {
208             @Override
209             public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
210                 promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
211             }
212         });
213
214         return promise.future();
215     }
216
217     OperationLimiter getLimiter() {
218         return limiter;
219     }
220 }