2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import akka.actor.ActorSelection;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Lists;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
21 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29 import scala.concurrent.duration.FiniteDuration;
32 * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
33 * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
34 * retry task after a short delay.
36 * The end result from a completed CreateTransaction message is a TransactionContext that is
37 * used to perform transaction operations. Transaction operations that occur before the
38 * CreateTransaction completes are cache and executed once the CreateTransaction completes,
39 * successfully or not.
41 final class TransactionFutureCallback extends OnComplete<Object> {
42 private static final Logger LOG = LoggerFactory.getLogger(TransactionFutureCallback.class);
45 * Time interval in between transaction create retries.
47 private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS);
50 * The list of transaction operations to execute once the CreateTransaction completes.
52 @GuardedBy("txOperationsOnComplete")
53 private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
54 private final TransactionProxy proxy;
55 private final String shardName;
58 * The TransactionContext resulting from the CreateTransaction reply.
60 private volatile TransactionContext transactionContext;
63 * The target primary shard.
65 private volatile ActorSelection primaryShard;
66 private volatile int createTxTries;
68 TransactionFutureCallback(final TransactionProxy proxy, final String shardName) {
69 this.proxy = Preconditions.checkNotNull(proxy);
70 this.shardName = shardName;
71 createTxTries = (int) (proxy.getActorContext().getDatastoreContext().
72 getShardLeaderElectionTimeout().duration().toMillis() /
73 CREATE_TX_TRY_INTERVAL.toMillis());
76 String getShardName() {
80 TransactionContext getTransactionContext() {
81 return transactionContext;
84 private TransactionType getTransactionType() {
85 return proxy.getTransactionType();
88 private TransactionIdentifier getIdentifier() {
89 return proxy.getIdentifier();
92 private ActorContext getActorContext() {
93 return proxy.getActorContext();
96 private Semaphore getOperationLimiter() {
97 return proxy.getOperationLimiter();
101 * Sets the target primary shard and initiates a CreateTransaction try.
103 void setPrimaryShard(ActorSelection primaryShard) {
104 this.primaryShard = primaryShard;
106 if (getTransactionType() == TransactionType.WRITE_ONLY &&
107 getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
108 LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
109 getIdentifier(), primaryShard);
111 // For write-only Tx's we prepare the transaction modifications directly on the shard actor
112 // to avoid the overhead of creating a separate transaction actor.
113 // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
114 executeTxOperatonsOnComplete(proxy.createValidTransactionContext(this.primaryShard,
115 this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
117 tryCreateTransaction();
122 * Adds a TransactionOperation to be executed after the CreateTransaction completes.
124 private void addTxOperationOnComplete(TransactionOperation operation) {
125 boolean invokeOperation = true;
126 synchronized(txOperationsOnComplete) {
127 if(transactionContext == null) {
128 LOG.debug("Tx {} Adding operation on complete", getIdentifier());
130 invokeOperation = false;
131 txOperationsOnComplete.add(operation);
135 if(invokeOperation) {
136 operation.invoke(transactionContext);
140 void enqueueTransactionOperation(final TransactionOperation op) {
142 if (transactionContext != null) {
143 op.invoke(transactionContext);
145 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
146 // callback to be executed after the Tx is created.
147 addTxOperationOnComplete(op);
152 * Performs a CreateTransaction try async.
154 private void tryCreateTransaction() {
155 if(LOG.isDebugEnabled()) {
156 LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
159 Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
160 getTransactionType().ordinal(), proxy.getTransactionChainId()).toSerializable();
162 Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage);
164 createTxFuture.onComplete(this, getActorContext().getClientDispatcher());
168 public void onComplete(Throwable failure, Object response) {
169 if(failure instanceof NoShardLeaderException) {
170 // There's no leader for the shard yet - schedule and try again, unless we're out
171 // of retries. Note: createTxTries is volatile as it may be written by different
172 // threads however not concurrently, therefore decrementing it non-atomically here
174 if(--createTxTries > 0) {
175 LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
176 getIdentifier(), shardName);
178 getActorContext().getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
182 tryCreateTransaction();
184 }, getActorContext().getClientDispatcher());
189 createTransactionContext(failure, response);
192 void createTransactionContext(Throwable failure, Object response) {
193 // Mainly checking for state violation here to perform a volatile read of "initialized" to
194 // ensure updates to operationLimter et al are visible to this thread (ie we're doing
195 // "piggy-back" synchronization here).
196 proxy.ensureInitializied();
198 // Create the TransactionContext from the response or failure. Store the new
199 // TransactionContext locally until we've completed invoking the
200 // TransactionOperations. This avoids thread timing issues which could cause
201 // out-of-order TransactionOperations. Eg, on a modification operation, if the
202 // TransactionContext is non-null, then we directly call the TransactionContext.
203 // However, at the same time, the code may be executing the cached
204 // TransactionOperations. So to avoid thus timing, we don't publish the
205 // TransactionContext until after we've executed all cached TransactionOperations.
206 TransactionContext localTransactionContext;
207 if(failure != null) {
208 LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
210 localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter());
211 } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
212 localTransactionContext = createValidTransactionContext(
213 CreateTransactionReply.fromSerializable(response));
215 IllegalArgumentException exception = new IllegalArgumentException(String.format(
216 "Invalid reply type %s for CreateTransaction", response.getClass()));
218 localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
221 executeTxOperatonsOnComplete(localTransactionContext);
224 private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
226 // Access to txOperationsOnComplete and transactionContext must be protected and atomic
227 // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
228 // issues and ensure no TransactionOperation is missed and that they are processed
229 // in the order they occurred.
231 // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
232 // in case a TransactionOperation results in another transaction operation being
233 // queued (eg a put operation from a client read Future callback that is notified
235 Collection<TransactionOperation> operationsBatch = null;
236 synchronized(txOperationsOnComplete) {
237 if(txOperationsOnComplete.isEmpty()) {
238 // We're done invoking the TransactionOperations so we can now publish the
239 // TransactionContext.
240 transactionContext = localTransactionContext;
244 operationsBatch = new ArrayList<>(txOperationsOnComplete);
245 txOperationsOnComplete.clear();
248 // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
249 // A slight down-side is that we need to re-acquire the lock below but this should
251 for(TransactionOperation oper: operationsBatch) {
252 oper.invoke(localTransactionContext);
257 private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
258 LOG.debug("Tx {} Received {}", getIdentifier(), reply);
260 return proxy.createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
261 reply.getTransactionPath(), reply.getVersion());