2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorSelection;
11 import akka.dispatch.Futures;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Lists;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.concurrent.TimeUnit;
18 import javax.annotation.concurrent.GuardedBy;
19 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23 import scala.concurrent.Future;
24 import scala.concurrent.Promise;
27 * A helper class that wraps an eventual TransactionContext instance. Operations destined for the target
28 * TransactionContext instance are cached until the TransactionContext instance becomes available at which
29 * time they are executed.
31 * @author Thomas Pantelis
33 class TransactionContextWrapper {
34 private static final Logger LOG = LoggerFactory.getLogger(TransactionContextWrapper.class);
37 * The list of transaction operations to execute once the TransactionContext becomes available.
39 @GuardedBy("queuedTxOperations")
40 private final List<TransactionOperation> queuedTxOperations = Lists.newArrayList();
41 private final TransactionIdentifier identifier;
42 private final String shardName;
45 * The resulting TransactionContext.
47 private volatile TransactionContext transactionContext;
49 private final OperationLimiter limiter;
51 TransactionContextWrapper(final TransactionIdentifier identifier, final ActorContext actorContext,
52 final String shardName) {
53 this.identifier = Preconditions.checkNotNull(identifier);
54 this.limiter = new OperationLimiter(identifier,
55 // 1 extra permit for the ready operation
56 actorContext.getDatastoreContext().getShardBatchedModificationCount() + 1,
57 TimeUnit.MILLISECONDS.toSeconds(actorContext.getDatastoreContext().getOperationTimeoutInMillis()));
58 this.shardName = Preconditions.checkNotNull(shardName);
61 TransactionContext getTransactionContext() {
62 return transactionContext;
65 TransactionIdentifier getIdentifier() {
70 * Adds a TransactionOperation to be executed once the TransactionContext becomes available.
72 private void enqueueTransactionOperation(final TransactionOperation operation) {
73 final boolean invokeOperation;
74 synchronized (queuedTxOperations) {
75 if (transactionContext == null) {
76 LOG.debug("Tx {} Queuing TransactionOperation", identifier);
78 queuedTxOperations.add(operation);
79 invokeOperation = false;
81 invokeOperation = true;
85 if (invokeOperation) {
86 operation.invoke(transactionContext);
88 if (!limiter.acquire()) {
89 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", identifier,
95 void maybeExecuteTransactionOperation(final TransactionOperation op) {
97 if (transactionContext != null) {
98 op.invoke(transactionContext);
100 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
101 // callback to be executed after the Tx is created.
102 enqueueTransactionOperation(op);
106 void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
108 // Access to queuedTxOperations and transactionContext must be protected and atomic
109 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
110 // issues and ensure no TransactionOperation is missed and that they are processed
111 // in the order they occurred.
113 // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
114 // in case a TransactionOperation results in another transaction operation being
115 // queued (eg a put operation from a client read Future callback that is notified
117 final Collection<TransactionOperation> operationsBatch;
118 synchronized (queuedTxOperations) {
119 if (queuedTxOperations.isEmpty()) {
120 // We're done invoking the TransactionOperations so we can now publish the
121 // TransactionContext.
122 localTransactionContext.operationHandOffComplete();
123 if (!localTransactionContext.usesOperationLimiting()) {
124 limiter.releaseAll();
126 transactionContext = localTransactionContext;
130 operationsBatch = new ArrayList<>(queuedTxOperations);
131 queuedTxOperations.clear();
134 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
135 // A slight down-side is that we need to re-acquire the lock below but this should
137 for (TransactionOperation oper : operationsBatch) {
138 oper.invoke(localTransactionContext);
143 Future<ActorSelection> readyTransaction() {
144 // avoid the creation of a promise and a TransactionOperation
145 if (transactionContext != null) {
146 return transactionContext.readyTransaction();
149 final Promise<ActorSelection> promise = Futures.promise();
150 enqueueTransactionOperation(new TransactionOperation() {
152 public void invoke(TransactionContext newTransactionContext) {
153 promise.completeWith(newTransactionContext.readyTransaction());
157 return promise.future();
160 public OperationLimiter getLimiter() {