CDS: Implement front-end support for local transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import com.google.common.collect.Lists;
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.List;
14 import javax.annotation.concurrent.GuardedBy;
15 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 /**
20  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
21  * TransactionContext instance are cached until the TransactionContext instance becomes available at which
22  * time they are executed.
23  *
24  * @author Thomas Pantelis
25  */
26 class TransactionContextWrapper {
27     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
28
29     /**
30      * The list of transaction operations to execute once the TransactionContext becomes available.
31      */
32     @GuardedBy("queuedTxOperations")
33     private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
34
35     /**
36      * The resulting TransactionContext.
37      */
38     private volatile TransactionContext transactionContext;
39
40     private final TransactionIdentifier identifier;
41
42     TransactionContextWrapper(TransactionIdentifier identifier) {
43         this.identifier = identifier;
44     }
45
46     TransactionContext getTransactionContext() {
47         return transactionContext;
48     }
49
50     TransactionIdentifier getIdentifier() {
51         return identifier;
52     }
53
54     /**
55      * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
56      */
57     private void enqueueTransactionOperation(TransactionOperation operation) {
58         boolean invokeOperation = true;
59         synchronized(queuedTxOperations) {
60             if(transactionContext == null) {
61                 LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
62
63                 invokeOperation = false;
64                 queuedTxOperations.add(operation);
65             }
66         }
67
68         if(invokeOperation) {
69             operation.invoke(transactionContext);
70         }
71     }
72
73     void maybeExecuteTransactionOperation(final TransactionOperation op) {
74
75         if (transactionContext != null) {
76             op.invoke(transactionContext);
77         } else {
78             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
79             // callback to be executed after the Tx is created.
80             enqueueTransactionOperation(op);
81         }
82     }
83
84     void executePriorTransactionOperations(TransactionContext localTransactionContext) {
85         while(true) {
86             // Access to queuedTxOperations and transactionContext must be protected and atomic
87             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
88             // issues and ensure no TransactionOperation is missed and that they are processed
89             // in the order they occurred.
90
91             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
92             // in case a TransactionOperation results in another transaction operation being
93             // queued (eg a put operation from a client read Future callback that is notified
94             // synchronously).
95             Collection<TransactionOperation> operationsBatch = null;
96             synchronized(queuedTxOperations) {
97                 if(queuedTxOperations.isEmpty()) {
98                     // We're done invoking the TransactionOperations so we can now publish the
99                     // TransactionContext.
100                     transactionContext = localTransactionContext;
101                     break;
102                 }
103
104                 operationsBatch = new ArrayList<>(queuedTxOperations);
105                 queuedTxOperations.clear();
106             }
107
108             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
109             // A slight down-side is that we need to re-acquire the lock below but this should
110             // be negligible.
111             for(TransactionOperation oper: operationsBatch) {
112                 oper.invoke(localTransactionContext);
113             }
114         }
115     }
116 }