Refactor Register*ListenerReply classes
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.Futures;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Lists;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.concurrent.TimeUnit;
18 import javax.annotation.concurrent.GuardedBy;
19 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23 import scala.concurrent.Future;
24 import scala.concurrent.Promise;
25
26 /**
27  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
28  * TransactionContext instance are cached until the TransactionContext instance becomes available at which
29  * time they are executed.
30  *
31  * @author Thomas Pantelis
32  */
33 class TransactionContextWrapper {
34     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
35
36     /**
37      * The list of transaction operations to execute once the TransactionContext becomes available.
38      */
39     @GuardedBy("queuedTxOperations")
40     private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
41
42     private final TransactionIdentifier identifier;
43
44     /**
45      * The resulting TransactionContext.
46      */
47     private volatile TransactionContext transactionContext;
48
49     private final OperationLimiter limiter;
50
51     TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext) {
52         this.identifier = Preconditions.checkNotNull(identifier);
53         this.limiter = new OperationLimiter(identifier,
54                 // 1 extra permit for the ready operation
55                 actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
56                 TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
57     }
58
59     TransactionContext getTransactionContext() {
60         return transactionContext;
61     }
62
63     TransactionIdentifier getIdentifier() {
64         return identifier;
65     }
66
67     /**
68      * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
69      */
70     private void enqueueTransactionOperation(final TransactionOperation operation) {
71         final boolean invokeOperation;
72         synchronized (queuedTxOperations) {
73             if (transactionContext == null) {
74                 LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
75
76                 queuedTxOperations.add(operation);
77                 invokeOperation = false;
78             }  else {
79                 invokeOperation = true;
80             }
81         }
82
83         if (invokeOperation) {
84             operation.invoke(transactionContext);
85         } else {
86             limiter.acquire();
87         }
88     }
89
90     void maybeExecuteTransactionOperation(final TransactionOperation op) {
91
92         if (transactionContext != null) {
93             op.invoke(transactionContext);
94         } else {
95             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
96             // callback to be executed after the Tx is created.
97             enqueueTransactionOperation(op);
98         }
99     }
100
101     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
102         while (true) {
103             // Access to queuedTxOperations and transactionContext must be protected and atomic
104             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
105             // issues and ensure no TransactionOperation is missed and that they are processed
106             // in the order they occurred.
107
108             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
109             // in case a TransactionOperation results in another transaction operation being
110             // queued (eg a put operation from a client read Future callback that is notified
111             // synchronously).
112             final Collection<TransactionOperation> operationsBatch;
113             synchronized (queuedTxOperations) {
114                 if (queuedTxOperations.isEmpty()) {
115                     // We're done invoking the TransactionOperations so we can now publish the
116                     // TransactionContext.
117                     localTransactionContext.operationHandOffComplete();
118                     if (!localTransactionContext.usesOperationLimiting()) {
119                         limiter.releaseAll();
120                     }
121                     transactionContext = localTransactionContext;
122                     break;
123                 }
124
125                 operationsBatch = new ArrayList<>(queuedTxOperations);
126                 queuedTxOperations.clear();
127             }
128
129             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
130             // A slight down-side is that we need to re-acquire the lock below but this should
131             // be negligible.
132             for (TransactionOperation oper : operationsBatch) {
133                 oper.invoke(localTransactionContext);
134             }
135         }
136     }
137
138     Future<ActorSelection> readyTransaction() {
139         // avoid the creation of a promise and a TransactionOperation
140         if (transactionContext != null) {
141             return transactionContext.readyTransaction();
142         }
143
144         final Promise<ActorSelection> promise = Futures.promise();
145         enqueueTransactionOperation(new TransactionOperation() {
146             @Override
147             public void invoke(TransactionContext newTransactionContext) {
148                 promise.completeWith(newTransactionContext.readyTransaction());
149             }
150         });
151
152         return promise.future();
153     }
154
155     public OperationLimiter getLimiter() {
156         return limiter;
157     }
158
159
160 }