b08d4192b48c1ddd7b3cf3162782b68ef129c855
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.Futures;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Lists;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 import scala.concurrent.Future;
22 import scala.concurrent.Promise;
23
24 /**
25  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
26  * TransactionContext instance are cached until the TransactionContext instance becomes available at which
27  * time they are executed.
28  *
29  * @author Thomas Pantelis
30  */
31 class TransactionContextWrapper {
32     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
33
34     /**
35      * The list of transaction operations to execute once the TransactionContext becomes available.
36      */
37     @GuardedBy("queuedTxOperations")
38     private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
39
40     /**
41      * The resulting TransactionContext.
42      */
43     private volatile TransactionContext transactionContext;
44
45     private final OperationLimiter limiter;
46
47     TransactionContextWrapper(final OperationLimiter limiter) {
48         this.limiter = Preconditions.checkNotNull(limiter);
49     }
50
51     TransactionContext getTransactionContext() {
52         return transactionContext;
53     }
54
55     TransactionIdentifier getIdentifier() {
56         return limiter.getIdentifier();
57     }
58
59     /**
60      * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
61      */
62     private void enqueueTransactionOperation(final TransactionOperation operation) {
63         final boolean invokeOperation;
64         synchronized (queuedTxOperations) {
65             if (transactionContext == null) {
66                 LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
67
68                 queuedTxOperations.add(operation);
69                 invokeOperation = false;
70             }  else {
71                 invokeOperation = true;
72             }
73         }
74
75         if (invokeOperation) {
76             operation.invoke(transactionContext);
77         } else {
78             limiter.acquire();
79         }
80     }
81
82     void maybeExecuteTransactionOperation(final TransactionOperation op) {
83
84         if (transactionContext != null) {
85             op.invoke(transactionContext);
86         } else {
87             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
88             // callback to be executed after the Tx is created.
89             enqueueTransactionOperation(op);
90         }
91     }
92
93     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
94         while(true) {
95             // Access to queuedTxOperations and transactionContext must be protected and atomic
96             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
97             // issues and ensure no TransactionOperation is missed and that they are processed
98             // in the order they occurred.
99
100             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
101             // in case a TransactionOperation results in another transaction operation being
102             // queued (eg a put operation from a client read Future callback that is notified
103             // synchronously).
104             Collection<TransactionOperation> operationsBatch = null;
105             synchronized (queuedTxOperations) {
106                 if (queuedTxOperations.isEmpty()) {
107                     // We're done invoking the TransactionOperations so we can now publish the
108                     // TransactionContext.
109                     localTransactionContext.operationHandoffComplete();
110                     transactionContext = localTransactionContext;
111                     break;
112                 }
113
114                 operationsBatch = new ArrayList<>(queuedTxOperations);
115                 queuedTxOperations.clear();
116             }
117
118             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
119             // A slight down-side is that we need to re-acquire the lock below but this should
120             // be negligible.
121             for (TransactionOperation oper : operationsBatch) {
122                 oper.invoke(localTransactionContext);
123             }
124         }
125     }
126
127     Future<ActorSelection> readyTransaction() {
128         // avoid the creation of a promise and a TransactionOperation
129         if (transactionContext != null) {
130             return transactionContext.readyTransaction();
131         }
132
133         final Promise<ActorSelection> promise = Futures.promise();
134         enqueueTransactionOperation(new TransactionOperation() {
135             @Override
136             public void invoke(TransactionContext transactionContext) {
137                 promise.completeWith(transactionContext.readyTransaction());
138             }
139         });
140
141         return promise.future();
142     }
143 }