26d7ff8b02bf7beef1e8637fd513af26318777b4
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionContextWrapper.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import com.google.common.collect.Lists;
11 import java.util.ArrayList;
12 import java.util.Collection;
13 import java.util.List;
14 import javax.annotation.concurrent.GuardedBy;
15 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
16 import org.slf4j.Logger;
17 import org.slf4j.LoggerFactory;
18
19 /**
20  * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
21  * TransactionContext instance are cached until the TransactionContext instance becomes available at which
22  * time they are executed.
23  *
24  * @author Thomas Pantelis
25  */
26 class TransactionContextWrapper {
27     private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
28
29     /**
30      * The list of transaction operations to execute once the TransactionContext becomes available.
31      */
32     @GuardedBy("queuedTxOperations")
33     private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
34
35     /**
36      * The resulting TransactionContext.
37      */
38     private volatile TransactionContext transactionContext;
39
40     private final TransactionIdentifier identifier;
41
42     TransactionContextWrapper(final TransactionIdentifier identifier) {
43         this.identifier = identifier;
44     }
45
46     TransactionContext getTransactionContext() {
47         return transactionContext;
48     }
49
50     TransactionIdentifier getIdentifier() {
51         return identifier;
52     }
53
54     /**
55      * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
56      */
57     private void enqueueTransactionOperation(final TransactionOperation operation) {
58         final boolean invokeOperation;
59         synchronized (queuedTxOperations) {
60             if (transactionContext == null) {
61                 LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
62
63                 queuedTxOperations.add(operation);
64                 invokeOperation = false;
65             }  else {
66                 invokeOperation = true;
67             }
68         }
69
70         if (invokeOperation) {
71             operation.invoke(transactionContext);
72         }
73     }
74
75     void maybeExecuteTransactionOperation(final TransactionOperation op) {
76
77         if (transactionContext != null) {
78             op.invoke(transactionContext);
79         } else {
80             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
81             // callback to be executed after the Tx is created.
82             enqueueTransactionOperation(op);
83         }
84     }
85
86     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
87         while(true) {
88             // Access to queuedTxOperations and transactionContext must be protected and atomic
89             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
90             // issues and ensure no TransactionOperation is missed and that they are processed
91             // in the order they occurred.
92
93             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
94             // in case a TransactionOperation results in another transaction operation being
95             // queued (eg a put operation from a client read Future callback that is notified
96             // synchronously).
97             Collection<TransactionOperation> operationsBatch = null;
98             synchronized(queuedTxOperations) {
99                 if(queuedTxOperations.isEmpty()) {
100                     // We're done invoking the TransactionOperations so we can now publish the
101                     // TransactionContext.
102                     transactionContext = localTransactionContext;
103                     break;
104                 }
105
106                 operationsBatch = new ArrayList<>(queuedTxOperations);
107                 queuedTxOperations.clear();
108             }
109
110             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
111             // A slight down-side is that we need to re-acquire the lock below but this should
112             // be negligible.
113             for(TransactionOperation oper: operationsBatch) {
114                 oper.invoke(localTransactionContext);
115             }
116         }
117     }
118 }