2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorSelection;
11 import akka.dispatch.Futures;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Lists;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import javax.annotation.concurrent.GuardedBy;
18 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21 import scala.concurrent.Future;
22 import scala.concurrent.Promise;
25 * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
26 * TransactionContext instance are cached until the TransactionContext instance becomes available at which
27 * time they are executed.
29 * @author Thomas Pantelis
31 class TransactionContextWrapper {
32 private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
35 * The list of transaction operations to execute once the TransactionContext becomes available.
37 @GuardedBy("queuedTxOperations")
38 private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
41 * The resulting TransactionContext.
43 private volatile TransactionContext transactionContext;
45 private final OperationLimiter limiter;
47 TransactionContextWrapper(final OperationLimiter limiter) {
48 this.limiter = Preconditions.checkNotNull(limiter);
51 TransactionContext getTransactionContext() {
52 return transactionContext;
55 TransactionIdentifier getIdentifier() {
56 return limiter.getIdentifier();
60 * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
62 private void enqueueTransactionOperation(final TransactionOperation operation) {
63 final boolean invokeOperation;
64 synchronized (queuedTxOperations) {
65 if (transactionContext == null) {
66 LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
68 queuedTxOperations.add(operation);
69 invokeOperation = false;
71 invokeOperation = true;
75 if (invokeOperation) {
76 operation.invoke(transactionContext);
82 void maybeExecuteTransactionOperation(final TransactionOperation op) {
84 if (transactionContext != null) {
85 op.invoke(transactionContext);
87 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
88 // callback to be executed after the Tx is created.
89 enqueueTransactionOperation(op);
93 void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
95 // Access to queuedTxOperations and transactionContext must be protected and atomic
96 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
97 // issues and ensure no TransactionOperation is missed and that they are processed
98 // in the order they occurred.
100 // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
101 // in case a TransactionOperation results in another transaction operation being
102 // queued (eg a put operation from a client read Future callback that is notified
104 Collection<TransactionOperation> operationsBatch = null;
105 synchronized (queuedTxOperations) {
106 if (queuedTxOperations.isEmpty()) {
107 // We're done invoking the TransactionOperations so we can now publish the
108 // TransactionContext.
109 localTransactionContext.operationHandoffComplete();
110 transactionContext = localTransactionContext;
114 operationsBatch = new ArrayList<>(queuedTxOperations);
115 queuedTxOperations.clear();
118 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
119 // A slight down-side is that we need to re-acquire the lock below but this should
121 for (TransactionOperation oper : operationsBatch) {
122 oper.invoke(localTransactionContext);
127 Future<ActorSelection> readyTransaction() {
128 // avoid the creation of a promise and a TransactionOperation
129 if (transactionContext != null) {
130 return transactionContext.readyTransaction();
133 final Promise<ActorSelection> promise = Futures.promise();
134 enqueueTransactionOperation(new TransactionOperation() {
136 public void invoke(TransactionContext transactionContext) {
137 promise.completeWith(transactionContext.readyTransaction());
141 return promise.future();