Be mindful of non-existent actors
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / DelayedTransactionContextWrapper.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkState;
11
12 import akka.actor.ActorSelection;
13 import akka.dispatch.Futures;
14 import java.util.AbstractMap.SimpleImmutableEntry;
15 import java.util.ArrayList;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.Map.Entry;
19 import java.util.Optional;
20 import java.util.SortedSet;
21 import org.checkerframework.checker.lock.qual.GuardedBy;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
24 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import scala.concurrent.Future;
28 import scala.concurrent.Promise;
29
30 /**
31  * Delayed implementation of TransactionContextWrapper. Operations destined for the target
32  * TransactionContext instance are cached until the TransactionContext instance becomes
33  * available at which time they are executed.
34  *
35  * @author Thomas Pantelis
36  */
37 final class DelayedTransactionContextWrapper extends AbstractTransactionContextWrapper {
38     private static final Logger LOG = LoggerFactory.getLogger(DelayedTransactionContextWrapper.class);
39
40     /**
41      * The list of transaction operations to execute once the TransactionContext becomes available.
42      */
43     @GuardedBy("queuedTxOperations")
44     private final List<Entry<TransactionOperation, Boolean>> queuedTxOperations = new ArrayList<>();
45
46     /**
47      * The resulting TransactionContext.
48      */
49     private volatile TransactionContext transactionContext;
50     @GuardedBy("queuedTxOperations")
51     private TransactionContext deferredTransactionContext;
52     @GuardedBy("queuedTxOperations")
53     private boolean pendingEnqueue;
54
55     DelayedTransactionContextWrapper(@NonNull final TransactionIdentifier identifier,
56                                      @NonNull final ActorUtils actorUtils, @NonNull final String shardName) {
57         super(identifier, actorUtils, shardName);
58     }
59
60     @Override
61     TransactionContext getTransactionContext() {
62         return transactionContext;
63     }
64
65     @Override
66     void maybeExecuteTransactionOperation(final TransactionOperation op) {
67         final TransactionContext localContext = transactionContext;
68         if (localContext != null) {
69             op.invoke(localContext, null);
70         } else {
71             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
72             // callback to be executed after the Tx is created.
73             enqueueTransactionOperation(op);
74         }
75     }
76
77     @Override
78     Future<ActorSelection> readyTransaction(final Optional<SortedSet<String>> participatingShardNames) {
79         // avoid the creation of a promise and a TransactionOperation
80         final TransactionContext localContext = transactionContext;
81         if (localContext != null) {
82             return localContext.readyTransaction(null, participatingShardNames);
83         }
84
85         final Promise<ActorSelection> promise = Futures.promise();
86         enqueueTransactionOperation(new TransactionOperation() {
87             @Override
88             public void invoke(final TransactionContext newTransactionContext, final Boolean havePermit) {
89                 promise.completeWith(newTransactionContext.readyTransaction(havePermit, participatingShardNames));
90             }
91         });
92
93         return promise.future();
94     }
95
96     /**
97      * Adds a TransactionOperation to be executed once the TransactionContext becomes available. This method is called
98      * only after the caller has checked (without synchronizing with executePriorTransactionOperations()) that the
99      * context is not available.
100      */
101     private void enqueueTransactionOperation(final TransactionOperation operation) {
102         // We have three things to do here:
103         // - synchronize with executePriorTransactionOperations() so that logical operation ordering is maintained
104         // - acquire a permit for the operation if we still need to enqueue it
105         // - enqueue the operation
106         //
107         // Since each operation needs to acquire a permit exactly once and the limiter is shared between us and the
108         // TransactionContext, we need to know whether an operation has a permit before we enqueue it. Further
109         // complications are:
110         // - this method may be called from the thread invoking executePriorTransactionOperations()
111         // - user may be violating API contract of using the transaction from a single thread
112
113         // As a first step, we will synchronize on the queue and check if the handoff has completed. While we have
114         // the lock, we will assert that we will be enqueing another operation.
115         final TransactionContext contextOnEntry;
116         synchronized (queuedTxOperations) {
117             contextOnEntry = transactionContext;
118             if (contextOnEntry == null) {
119                 checkState(pendingEnqueue == false, "Concurrent access to transaction %s detected", getIdentifier());
120                 pendingEnqueue = true;
121             }
122         }
123
124         // Short-circuit if there is a context
125         if (contextOnEntry != null) {
126             operation.invoke(transactionContext, null);
127             return;
128         }
129
130         boolean cleanupEnqueue = true;
131         TransactionContext finishHandoff = null;
132         try {
133             // Acquire the permit,
134             final boolean havePermit = getLimiter().acquire();
135             if (!havePermit) {
136                 LOG.warn("Failed to acquire enqueue operation permit for transaction {} on shard {}", getIdentifier(),
137                         getShardName());
138             }
139
140             // Ready to enqueue, take the lock again and append the operation
141             synchronized (queuedTxOperations) {
142                 LOG.debug("Tx {} Queuing TransactionOperation", getIdentifier());
143                 queuedTxOperations.add(new SimpleImmutableEntry<>(operation, havePermit));
144                 pendingEnqueue = false;
145                 cleanupEnqueue = false;
146                 finishHandoff = deferredTransactionContext;
147                 deferredTransactionContext = null;
148             }
149         } finally {
150             if (cleanupEnqueue) {
151                 synchronized (queuedTxOperations) {
152                     pendingEnqueue = false;
153                     finishHandoff = deferredTransactionContext;
154                     deferredTransactionContext = null;
155                 }
156             }
157             if (finishHandoff != null) {
158                 executePriorTransactionOperations(finishHandoff);
159             }
160         }
161     }
162
163     void executePriorTransactionOperations(final TransactionContext localTransactionContext) {
164         while (true) {
165             // Access to queuedTxOperations and transactionContext must be protected and atomic
166             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
167             // issues and ensure no TransactionOperation is missed and that they are processed
168             // in the order they occurred.
169
170             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
171             // in case a TransactionOperation results in another transaction operation being
172             // queued (eg a put operation from a client read Future callback that is notified
173             // synchronously).
174             final Collection<Entry<TransactionOperation, Boolean>> operationsBatch;
175             synchronized (queuedTxOperations) {
176                 if (queuedTxOperations.isEmpty()) {
177                     if (!pendingEnqueue) {
178                         // We're done invoking the TransactionOperations so we can now publish the TransactionContext.
179                         localTransactionContext.operationHandOffComplete();
180
181                         // This is null-to-non-null transition after which we are releasing the lock and not doing
182                         // any further processing.
183                         transactionContext = localTransactionContext;
184                     } else {
185                         deferredTransactionContext = localTransactionContext;
186                     }
187                     return;
188                 }
189
190                 operationsBatch = new ArrayList<>(queuedTxOperations);
191                 queuedTxOperations.clear();
192             }
193
194             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking. A slight down-side is
195             // that we need to re-acquire the lock below but this should be negligible.
196             for (Entry<TransactionOperation, Boolean> oper : operationsBatch) {
197                 final Boolean permit = oper.getValue();
198                 if (permit.booleanValue() && !localTransactionContext.usesOperationLimiting()) {
199                     // If the context is not using limiting we need to release operations as we are queueing them, so
200                     // user threads are not charged for them.
201                     getLimiter().release();
202                 }
203                 oper.getKey().invoke(localTransactionContext, permit);
204             }
205         }
206     }
207
208 }