Merge "Bug 2358: Fixed warnings in Restconf"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionFutureCallback.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import akka.actor.ActorSelection;
11 import akka.dispatch.OnComplete;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.Lists;
14 import java.util.ArrayList;
15 import java.util.Collection;
16 import java.util.List;
17 import java.util.concurrent.Semaphore;
18 import java.util.concurrent.TimeUnit;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
21 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29 import scala.concurrent.duration.FiniteDuration;
30
31 /**
32  * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
33  * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
34  * retry task after a short delay.
35  * <p>
36  * The end result from a completed CreateTransaction message is a TransactionContext that is
37  * used to perform transaction operations. Transaction operations that occur before the
38  * CreateTransaction completes are cache and executed once the CreateTransaction completes,
39  * successfully or not.
40  */
41 final class TransactionFutureCallback extends OnComplete<Object> {
42     private static final Logger LOG = LoggerFactory.getLogger(TransactionFutureCallback.class);
43
44     /**
45      * Time interval in between transaction create retries.
46      */
47     private static final FiniteDuration CREATE_TX_TRY_INTERVAL = FiniteDuration.create(1, TimeUnit.SECONDS);
48
49     /**
50      * The list of transaction operations to execute once the CreateTransaction completes.
51      */
52     @GuardedBy("txOperationsOnComplete")
53     private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
54     private final TransactionProxy proxy;
55     private final String shardName;
56
57     /**
58      * The TransactionContext resulting from the CreateTransaction reply.
59      */
60     private volatile TransactionContext transactionContext;
61
62     /**
63      * The target primary shard.
64      */
65     private volatile ActorSelection primaryShard;
66     private volatile int createTxTries;
67
68     TransactionFutureCallback(final TransactionProxy proxy, final String shardName) {
69         this.proxy = Preconditions.checkNotNull(proxy);
70         this.shardName = shardName;
71         createTxTries = (int) (proxy.getActorContext().getDatastoreContext().
72                 getShardLeaderElectionTimeout().duration().toMillis() /
73                 CREATE_TX_TRY_INTERVAL.toMillis());
74     }
75
76     String getShardName() {
77         return shardName;
78     }
79
80     TransactionContext getTransactionContext() {
81         return transactionContext;
82     }
83
84     private TransactionType getTransactionType() {
85         return proxy.getTransactionType();
86     }
87
88     private TransactionIdentifier getIdentifier() {
89         return proxy.getIdentifier();
90     }
91
92     private ActorContext getActorContext() {
93         return proxy.getActorContext();
94     }
95
96     private Semaphore getOperationLimiter() {
97         return proxy.getOperationLimiter();
98     }
99
100     /**
101      * Sets the target primary shard and initiates a CreateTransaction try.
102      */
103     void setPrimaryShard(ActorSelection primaryShard) {
104         this.primaryShard = primaryShard;
105
106         if (getTransactionType() == TransactionType.WRITE_ONLY &&
107                 getActorContext().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
108             LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
109                 getIdentifier(), primaryShard);
110
111             // For write-only Tx's we prepare the transaction modifications directly on the shard actor
112             // to avoid the overhead of creating a separate transaction actor.
113             // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
114             executeTxOperatonsOnComplete(proxy.createValidTransactionContext(this.primaryShard,
115                     this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
116         } else {
117             tryCreateTransaction();
118         }
119     }
120
121     /**
122      * Adds a TransactionOperation to be executed after the CreateTransaction completes.
123      */
124     private void addTxOperationOnComplete(TransactionOperation operation) {
125         boolean invokeOperation = true;
126         synchronized(txOperationsOnComplete) {
127             if(transactionContext == null) {
128                 LOG.debug("Tx {} Adding operation on complete", getIdentifier());
129
130                 invokeOperation = false;
131                 txOperationsOnComplete.add(operation);
132             }
133         }
134
135         if(invokeOperation) {
136             operation.invoke(transactionContext);
137         }
138     }
139
140     void enqueueTransactionOperation(final TransactionOperation op) {
141
142         if (transactionContext != null) {
143             op.invoke(transactionContext);
144         } else {
145             // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
146             // callback to be executed after the Tx is created.
147             addTxOperationOnComplete(op);
148         }
149     }
150
151     /**
152      * Performs a CreateTransaction try async.
153      */
154     private void tryCreateTransaction() {
155         if(LOG.isDebugEnabled()) {
156             LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(), primaryShard);
157         }
158
159         Object serializedCreateMessage = new CreateTransaction(getIdentifier().toString(),
160             getTransactionType().ordinal(), proxy.getTransactionChainId()).toSerializable();
161
162         Future<Object> createTxFuture = getActorContext().executeOperationAsync(primaryShard, serializedCreateMessage);
163
164         createTxFuture.onComplete(this, getActorContext().getClientDispatcher());
165     }
166
167     @Override
168     public void onComplete(Throwable failure, Object response) {
169         if(failure instanceof NoShardLeaderException) {
170             // There's no leader for the shard yet - schedule and try again, unless we're out
171             // of retries. Note: createTxTries is volatile as it may be written by different
172             // threads however not concurrently, therefore decrementing it non-atomically here
173             // is ok.
174             if(--createTxTries > 0) {
175                 LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
176                     getIdentifier(), shardName);
177
178                 getActorContext().getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
179                         new Runnable() {
180                             @Override
181                             public void run() {
182                                 tryCreateTransaction();
183                             }
184                         }, getActorContext().getClientDispatcher());
185                 return;
186             }
187         }
188
189         createTransactionContext(failure, response);
190     }
191
192     void createTransactionContext(Throwable failure, Object response) {
193         // Mainly checking for state violation here to perform a volatile read of "initialized" to
194         // ensure updates to operationLimter et al are visible to this thread (ie we're doing
195         // "piggy-back" synchronization here).
196         proxy.ensureInitializied();
197
198         // Create the TransactionContext from the response or failure. Store the new
199         // TransactionContext locally until we've completed invoking the
200         // TransactionOperations. This avoids thread timing issues which could cause
201         // out-of-order TransactionOperations. Eg, on a modification operation, if the
202         // TransactionContext is non-null, then we directly call the TransactionContext.
203         // However, at the same time, the code may be executing the cached
204         // TransactionOperations. So to avoid thus timing, we don't publish the
205         // TransactionContext until after we've executed all cached TransactionOperations.
206         TransactionContext localTransactionContext;
207         if(failure != null) {
208             LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
209
210             localTransactionContext = new NoOpTransactionContext(failure, getIdentifier(), getOperationLimiter());
211         } else if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(response.getClass())) {
212             localTransactionContext = createValidTransactionContext(
213                     CreateTransactionReply.fromSerializable(response));
214         } else {
215             IllegalArgumentException exception = new IllegalArgumentException(String.format(
216                     "Invalid reply type %s for CreateTransaction", response.getClass()));
217
218             localTransactionContext = new NoOpTransactionContext(exception, getIdentifier(), getOperationLimiter());
219         }
220
221         executeTxOperatonsOnComplete(localTransactionContext);
222     }
223
224     private void executeTxOperatonsOnComplete(TransactionContext localTransactionContext) {
225         while(true) {
226             // Access to txOperationsOnComplete and transactionContext must be protected and atomic
227             // (ie synchronized) with respect to #addTxOperationOnComplete to handle timing
228             // issues and ensure no TransactionOperation is missed and that they are processed
229             // in the order they occurred.
230
231             // We'll make a local copy of the txOperationsOnComplete list to handle re-entrancy
232             // in case a TransactionOperation results in another transaction operation being
233             // queued (eg a put operation from a client read Future callback that is notified
234             // synchronously).
235             Collection<TransactionOperation> operationsBatch = null;
236             synchronized(txOperationsOnComplete) {
237                 if(txOperationsOnComplete.isEmpty()) {
238                     // We're done invoking the TransactionOperations so we can now publish the
239                     // TransactionContext.
240                     transactionContext = localTransactionContext;
241                     break;
242                 }
243
244                 operationsBatch = new ArrayList<>(txOperationsOnComplete);
245                 txOperationsOnComplete.clear();
246             }
247
248             // Invoke TransactionOperations outside the sync block to avoid unnecessary blocking.
249             // A slight down-side is that we need to re-acquire the lock below but this should
250             // be negligible.
251             for(TransactionOperation oper: operationsBatch) {
252                 oper.invoke(localTransactionContext);
253             }
254         }
255     }
256
257     private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
258         LOG.debug("Tx {} Received {}", getIdentifier(), reply);
259
260         return proxy.createValidTransactionContext(getActorContext().actorSelection(reply.getTransactionPath()),
261                 reply.getTransactionPath(), reply.getVersion());
262     }
263
264 }