BUG 3019 : Fix Operation throttling for modification batching scenarios
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.Futures;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Lists;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
19 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22 import scala.concurrent.Future;
23 import scala.concurrent.Promise;
24
25 /**
26  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
27  * TransactionContext instance are cached until the TransactionContext instance becomes available at which
28  * time they are executed.
29  *
30  * @author Thomas Pantelis
31  */
32 class TransactionContextWrapper {
33     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
34
35     /**
36      * The list of transaction operations to execute once the TransactionContext becomes available.
37      */
38     @GuardedBy("queuedTxOperations")
39     private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
40
41     private final TransactionIdentifier identifier;
42
43     /**
44      * The resulting TransactionContext.
45      */
46     private volatile TransactionContext transactionContext;
47
48     private final OperationLimiter limiter;
49
50     TransactionContextWrapper(TransactionIdentifier identifier, final ActorContext actorContext) {
51         this.identifier = Preconditions.checkNotNull(identifier);
52         this.limiter = new OperationLimiter(identifier,
53                 actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1, // 1 extra permit for the ready operation
54                 actorContext.getDatastoreContext().getOperationTimeoutInSeconds());
55     }
56
57     TransactionContext getTransactionContext() {
58         return transactionContext;
59     }
60
61     TransactionIdentifier getIdentifier() {
62         return identifier;
63     }
64
65     /**
66      * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
67      */
68     private void enqueueTransactionOperation(final TransactionOperation operation) {
69         final boolean invokeOperation;
70         synchronized (queuedTxOperations) {
71             if (transactionContext == null) {
72                 LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
73
74                 queuedTxOperations.add(operation);
75                 invokeOperation = false;
76             }  else {
77                 invokeOperation = true;
78             }
79         }
80
81         if (invokeOperation) {
82             operation.invoke(transactionContext);
83         } else {
84             limiter.acquire();
85         }
86     }
87
88     void maybeExecuteTransactionOperation(final TransactionOperation op) {
89
90         if (transactionContext != null) {
91             op.invoke(transactionContext);
92         } else {
93             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
94             // callback to be executed after the Tx is created.
95             enqueueTransactionOperation(op);
96         }
97     }
98
99     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
100         while(true) {
101             // Access to queuedTxOperations and transactionContext must be protected and atomic
102             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
103             // issues and ensure no TransactionOperation is missed and that they are processed
104             // in the order they occurred.
105
106             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
107             // in case a TransactionOperation results in another transaction operation being
108             // queued (eg a put operation from a client read Future callback that is notified
109             // synchronously).
110             Collection<TransactionOperation> operationsBatch = null;
111             synchronized (queuedTxOperations) {
112                 if (queuedTxOperations.isEmpty()) {
113                     // We're done invoking the TransactionOperations so we can now publish the
114                     // TransactionContext.
115                     localTransactionContext.operationHandOffComplete();
116                     if(!localTransactionContext.usesOperationLimiting()){
117                         limiter.releaseAll();
118                     }
119                     transactionContext = localTransactionContext;
120                     break;
121                 }
122
123                 operationsBatch = new ArrayList<>(queuedTxOperations);
124                 queuedTxOperations.clear();
125             }
126
127             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
128             // A slight down-side is that we need to re-acquire the lock below but this should
129             // be negligible.
130             for (TransactionOperation oper : operationsBatch) {
131                 oper.invoke(localTransactionContext);
132             }
133         }
134     }
135
136     Future<ActorSelection> readyTransaction() {
137         // avoid the creation of a promise and a TransactionOperation
138         if (transactionContext != null) {
139             return transactionContext.readyTransaction();
140         }
141
142         final Promise<ActorSelection> promise = Futures.promise();
143         enqueueTransactionOperation(new TransactionOperation() {
144             @Override
145             public void invoke(TransactionContext transactionContext) {
146                 promise.completeWith(transactionContext.readyTransaction());
147             }
148         });
149
150         return promise.future();
151     }
152
153     public OperationLimiter getLimiter() {
154         return limiter;
155     }
156
157
158 }