CDS: split TransactionType from TransactionProxy
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionFutureCallback.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Lists;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
21 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
22 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 import scala.concurrent.Future;
28 import scala.concurrent.duration.FiniteDuration;
29
30 /**
31  * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
32  * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
33  * retry task after a short delay.
34  * <p>
35  * The end result from a completed CreateTransaction message is a TransactionContext that is
36  * used to perform transaction operations. Transaction operations that occur before the
37  * CreateTransaction completes are cache and executed once the CreateTransaction completes,
38  * successfully or not.
39  */
40 final class TransactionFutureCallback extends OnComplete<Object> {
41     private static final Logger LOG = LoggerFactory.getLogger(TransactionFutureCallback.class);
42
43     /**
44      * Time interval in between transaction create retries.
45      */
46     private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS);
47
48     /**
49      * The list of transaction operations to execute once the CreateTransaction completes.
50      */
51     @GuardedBy("txOperationsOnComplete")
52     private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
53     private final TransactionProxy proxy;
54     private final String shardName;
55
56     /**
57      * The TransactionContext resulting from the CreateTransaction reply.
58      */
59     private volatile TransactionContext transactionContext;
60
61     /**
62      * The target primary shard.
63      */
64     private volatile ActorSelection primaryShard;
65     private volatile int createTxTries;
66
67     TransactionFutureCallback(final TransactionProxy proxy, final String shardName) {
68         this.proxy = Preconditions.checkNotNull(proxy);
69         this.shardName = shardName;
70         createTxTries = (int) (proxy.getActorContext().getDatastoreContext().
71                 getShardLeaderElectionTimeout().duration().toMillis() /
72                 CREATE_TX_TRY_INTERVAL.toMillis());
73     }
74
75     String getShardName() {
76         return shardName;
77     }
78
79     TransactionContext getTransactionContext() {
80         return transactionContext;
81     }
82
83     private TransactionType getTransactionType() {
84         return proxy.getTransactionType();
85     }
86
87     private TransactionIdentifier getIdentifier() {
88         return proxy.getIdentifier();
89     }
90
91     private ActorContext getActorContext() {
92         return proxy.getActorContext();
93     }
94
95     private Semaphore getOperationLimiter() {
96         return proxy.getOperationLimiter();
97     }
98
99     /**
100      * Sets the target primary shard and initiates a CreateTransaction try.
101      */
102     void setPrimaryShard(ActorSelection primaryShard) {
103         this.primaryShard = primaryShard;
104
105         if (getTransactionType() == TransactionType.WRITE_ONLY &&
106                 getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
107             LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
108                 getIdentifier(), primaryShard);
109
110             // For write-only Tx's we prepare the transaction modifications directly on the shard actor
111             // to avoid the overhead of creating a separate transaction actor.
112             // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
113             executeTxOperatonsOnComplete(proxy.createValidTransactionContext(this.primaryShard,
114                     this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
115         } else {
116             tryCreateTransaction();
117         }
118     }
119
120     /**
121      * Adds a TransactionOperation to be executed after the CreateTransaction completes.
122      */
123     private void addTxOperationOnComplete(TransactionOperation operation) {
124         boolean invokeOperation = true;
125         synchronized(txOperationsOnComplete) {
126             if(transactionContext == null) {
127                 LOG.debug("Tx {} Adding operation on complete", getIdentifier());
128
129                 invokeOperation = false;
130                 txOperationsOnComplete.add(operation);
131             }
132         }
133
134         if(invokeOperation) {
135             operation.invoke(transactionContext);
136         }
137     }
138
139     void enqueueTransactionOperation(final TransactionOperation op) {
140
141         if (transactionContext != null) {
142             op.invoke(transactionContext);
143         } else {
144             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
145             // callback to be executed after the Tx is created.
146             addTxOperationOnComplete(op);
147         }
148     }
149
150     /**
151      * Performs a CreateTransaction try async.
152      */
153     private void tryCreateTransaction() {
154         if(LOG.isDebugEnabled()) {
155             LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
156         }
157
158         Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
159             getTransactionType().ordinal(), proxy.getTransactionChainId()).toSerializable();
160
161         Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage);
162
163         createTxFuture.onComplete(this, getActorContext().getClientDispatcher());
164     }
165
166     @Override
167     public void onComplete(Throwable failure, Object response) {
168         if(failure instanceof NoShardLeaderException) {
169             // There's no leader for the shard yet - schedule and try again, unless we're out
170             // of retries. Note: createTxTries is volatile as it may be written by different
171             // threads however not concurrently, therefore decrementing it non-atomically here
172             // is ok.
173             if(--createTxTries > 0) {
174                 LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
175                     getIdentifier(), shardName);
176
177                 getActorContext().getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
178                         new Runnable() {
179                             @Override
180                             public void run() {
181                                 tryCreateTransaction();
182                             }
183                         }, getActorContext().getClientDispatcher());
184                 return;
185             }
186         }
187
188         createTransactionContext(failure, response);
189     }
190
191     void createTransactionContext(Throwable failure, Object response) {
192         // Mainly checking for state violation here to perform a volatile read of "initialized" to
193         // ensure updates to operationLimter et al are visible to this thread (ie we're doing
194         // "piggy-back" synchronization here).
195         proxy.ensureInitializied();
196
197         // Create the TransactionContext from the response or failure. Store the new
198         // TransactionContext locally until we've completed invoking the
199         // TransactionOperations. This avoids thread timing issues which could cause
200         // out-of-order TransactionOperations. Eg, on a modification operation, if the
201         // TransactionContext is non-null, then we directly call the TransactionContext.
202         // However, at the same time, the code may be executing the cached
203         // TransactionOperations. So to avoid thus timing, we don't publish the
204         // TransactionContext until after we've executed all cached TransactionOperations.
205         TransactionContext localTransactionContext;
206         if(failure != null) {
207             LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
208
209             localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter());
210         } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
211             localTransactionContext = createValidTransactionContext(
212                     CreateTransactionReply.fromSerializable(response));
213         } else {
214             IllegalArgumentException exception = new IllegalArgumentException(String.format(
215                     "Invalid reply type %s for CreateTransaction", response.getClass()));
216
217             localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
218         }
219
220         executeTxOperatonsOnComplete(localTransactionContext);
221     }
222
223     private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
224         while(true) {
225             // Access to txOperationsOnComplete and transactionContext must be protected and atomic
226             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
227             // issues and ensure no TransactionOperation is missed and that they are processed
228             // in the order they occurred.
229
230             // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
231             // in case a TransactionOperation results in another transaction operation being
232             // queued (eg a put operation from a client read Future callback that is notified
233             // synchronously).
234             Collection<TransactionOperation> operationsBatch = null;
235             synchronized(txOperationsOnComplete) {
236                 if(txOperationsOnComplete.isEmpty()) {
237                     // We're done invoking the TransactionOperations so we can now publish the
238                     // TransactionContext.
239                     transactionContext = localTransactionContext;
240                     break;
241                 }
242
243                 operationsBatch = new ArrayList<>(txOperationsOnComplete);
244                 txOperationsOnComplete.clear();
245             }
246
247             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
248             // A slight down-side is that we need to re-acquire the lock below but this should
249             // be negligible.
250             for(TransactionOperation oper: operationsBatch) {
251                 oper.invoke(localTransactionContext);
252             }
253         }
254     }
255
256     private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
257         LOG.debug("Tx {} Received {}", getIdentifier(), reply);
258
259         return proxy.createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
260                 reply.getTransactionPath(), reply.getVersion());
261     }
262
263 }