a126ce95971bae232c2da0b1f9fb9aa3c550cfe6
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.Futures;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Lists;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.concurrent.TimeUnit;
18 import javax.annotation.concurrent.GuardedBy;
19 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23 import scala.concurrent.Future;
24 import scala.concurrent.Promise;
25
26 /**
27  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
28  * TransactionContext instance are cached until the TransactionContext instance becomes available at which
29  * time they are executed.
30  *
31  * @author Thomas Pantelis
32  */
33 class TransactionContextWrapper {
34     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
35
36     /**
37      * The list of transaction operations to execute once the TransactionContext becomes available.
38      */
39     @GuardedBy("queuedTxOperations")
40     private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
41     private final TransactionIdentifier identifier;
42     private final String shardName;
43
44     /**
45      * The resulting TransactionContext.
46      */
47     private volatile TransactionContext transactionContext;
48
49     private final OperationLimiter limiter;
50
51     TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
52             final String shardName) {
53         this.identifier = Preconditions.checkNotNull(identifier);
54         this.limiter = new OperationLimiter(identifier,
55                 // 1 extra permit for the ready operation
56                 actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
57                 TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
58         this.shardName = Preconditions.checkNotNull(shardName);
59     }
60
61     TransactionContext getTransactionContext() {
62         return transactionContext;
63     }
64
65     TransactionIdentifier getIdentifier() {
66         return identifier;
67     }
68
69     /**
70      * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
71      */
72     private void enqueueTransactionOperation(final TransactionOperation operation) {
73         final boolean invokeOperation;
74         synchronized (queuedTxOperations) {
75             if (transactionContext == null) {
76                 LOG.debug("Tx {} Queuing TransactionOperation", identifier);
77
78                 queuedTxOperations.add(operation);
79                 invokeOperation = false;
80             }  else {
81                 invokeOperation = true;
82             }
83         }
84
85         if (invokeOperation) {
86             operation.invoke(transactionContext);
87         } else {
88             if (!limiter.acquire()) {
89                 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
90                     shardName);
91             }
92         }
93     }
94
95     void maybeExecuteTransactionOperation(final TransactionOperation op) {
96
97         if (transactionContext != null) {
98             op.invoke(transactionContext);
99         } else {
100             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
101             // callback to be executed after the Tx is created.
102             enqueueTransactionOperation(op);
103         }
104     }
105
106     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
107         while (true) {
108             // Access to queuedTxOperations and transactionContext must be protected and atomic
109             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
110             // issues and ensure no TransactionOperation is missed and that they are processed
111             // in the order they occurred.
112
113             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
114             // in case a TransactionOperation results in another transaction operation being
115             // queued (eg a put operation from a client read Future callback that is notified
116             // synchronously).
117             final Collection<TransactionOperation> operationsBatch;
118             synchronized (queuedTxOperations) {
119                 if (queuedTxOperations.isEmpty()) {
120                     // We're done invoking the TransactionOperations so we can now publish the
121                     // TransactionContext.
122                     localTransactionContext.operationHandOffComplete();
123                     if (!localTransactionContext.usesOperationLimiting()) {
124                         limiter.releaseAll();
125                     }
126                     transactionContext = localTransactionContext;
127                     break;
128                 }
129
130                 operationsBatch = new ArrayList<>(queuedTxOperations);
131                 queuedTxOperations.clear();
132             }
133
134             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
135             // A slight down-side is that we need to re-acquire the lock below but this should
136             // be negligible.
137             for (TransactionOperation oper : operationsBatch) {
138                 oper.invoke(localTransactionContext);
139             }
140         }
141     }
142
143     Future<ActorSelection> readyTransaction() {
144         // avoid the creation of a promise and a TransactionOperation
145         if (transactionContext != null) {
146             return transactionContext.readyTransaction();
147         }
148
149         final Promise<ActorSelection> promise = Futures.promise();
150         enqueueTransactionOperation(new TransactionOperation() {
151             @Override
152             public void invoke(TransactionContext newTransactionContext) {
153                 promise.completeWith(newTransactionContext.readyTransaction());
154             }
155         });
156
157         return promise.future();
158     }
159
160     public OperationLimiter getLimiter() {
161         return limiter;
162     }
163
164
165 }