Specialize TransactionContextWrapper
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RemoteTransactionContextSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorSelection;
14 import akka.dispatch.OnComplete;
15 import akka.pattern.AskTimeoutException;
16 import akka.util.Timeout;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.concurrent.TimeUnit;
19 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
20 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
22 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
25 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 import scala.concurrent.Future;
29 import scala.concurrent.duration.FiniteDuration;
30
31 /**
32  * Handles creation of TransactionContext instances for remote transactions. This class creates
33  * remote transactions, if necessary, by sending CreateTransaction messages with retries, up to a limit,
34  * if the shard doesn't have a leader yet. This is done by scheduling a retry task after a short delay.
35  * <p/>
36  * The end result from a completed CreateTransaction message is a TransactionContext that is
37  * used to perform transaction operations. Transaction operations that occur before the
38  * CreateTransaction completes are cached via a DelayedTransactionContextWrapper and executed once the
39  * CreateTransaction completes, successfully or not.
40  */
41 final class RemoteTransactionContextSupport {
42     private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContextSupport.class);
43
44     private static final long CREATE_TX_TRY_INTERVAL_IN_MS = 1000;
45     private static final long MAX_CREATE_TX_MSG_TIMEOUT_IN_MS = 5000;
46
47     private final TransactionProxy parent;
48     private final String shardName;
49
50     /**
51      * The target primary shard.
52      */
53     private volatile PrimaryShardInfo primaryShardInfo;
54
55     /**
56      * The total timeout for creating a tx on the primary shard.
57      */
58     private volatile long totalCreateTxTimeout;
59
60     private final Timeout createTxMessageTimeout;
61
62     private final DelayedTransactionContextWrapper transactionContextWrapper;
63
64     RemoteTransactionContextSupport(final DelayedTransactionContextWrapper transactionContextWrapper,
65             final TransactionProxy parent, final String shardName) {
66         this.parent = requireNonNull(parent);
67         this.shardName = shardName;
68         this.transactionContextWrapper = transactionContextWrapper;
69
70         // For the total create tx timeout, use 2 times the election timeout. This should be enough time for
71         // a leader re-election to occur if we happen to hit it in transition.
72         totalCreateTxTimeout = parent.getActorUtils().getDatastoreContext().getShardRaftConfig()
73                 .getElectionTimeOutInterval().toMillis() * 2;
74
75         // We'll use the operationTimeout for the the create Tx message timeout so it can be set appropriately
76         // for unit tests but cap it at MAX_CREATE_TX_MSG_TIMEOUT_IN_MS. The operationTimeout could be set
77         // larger than the totalCreateTxTimeout in production which we don't want.
78         long operationTimeout = parent.getActorUtils().getOperationTimeout().duration().toMillis();
79         createTxMessageTimeout = new Timeout(Math.min(operationTimeout, MAX_CREATE_TX_MSG_TIMEOUT_IN_MS),
80                 TimeUnit.MILLISECONDS);
81     }
82
83     String getShardName() {
84         return shardName;
85     }
86
87     private TransactionType getTransactionType() {
88         return parent.getType();
89     }
90
91     private ActorUtils getActorUtils() {
92         return parent.getActorUtils();
93     }
94
95     private TransactionIdentifier getIdentifier() {
96         return parent.getIdentifier();
97     }
98
99     /**
100      * Sets the target primary shard and initiates a CreateTransaction try.
101      */
102     void setPrimaryShard(final PrimaryShardInfo newPrimaryShardInfo) {
103         this.primaryShardInfo = newPrimaryShardInfo;
104
105         if (getTransactionType() == TransactionType.WRITE_ONLY
106                 && getActorUtils().getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
107             ActorSelection primaryShard = newPrimaryShardInfo.getPrimaryShardActor();
108
109             LOG.debug("Tx {} Primary shard {} found - creating WRITE_ONLY transaction context",
110                 getIdentifier(), primaryShard);
111
112             // For write-only Tx's we prepare the transaction modifications directly on the shard actor
113             // to avoid the overhead of creating a separate transaction actor.
114             transactionContextWrapper.executePriorTransactionOperations(createValidTransactionContext(
115                     primaryShard, String.valueOf(primaryShard.path()), newPrimaryShardInfo.getPrimaryShardVersion()));
116         } else {
117             tryCreateTransaction();
118         }
119     }
120
121     /**
122       Performs a CreateTransaction try async.
123      */
124     private void tryCreateTransaction() {
125         LOG.debug("Tx {} Primary shard {} found - trying create transaction", getIdentifier(),
126                 primaryShardInfo.getPrimaryShardActor());
127
128         Object serializedCreateMessage = new CreateTransaction(getIdentifier(), getTransactionType().ordinal(),
129                     primaryShardInfo.getPrimaryShardVersion()).toSerializable();
130
131         Future<Object> createTxFuture = getActorUtils().executeOperationAsync(
132                 primaryShardInfo.getPrimaryShardActor(), serializedCreateMessage, createTxMessageTimeout);
133
134         createTxFuture.onComplete(new OnComplete<Object>() {
135             @Override
136             public void onComplete(final Throwable failure, final Object response) {
137                 onCreateTransactionComplete(failure, response);
138             }
139         }, getActorUtils().getClientDispatcher());
140     }
141
142     private void tryFindPrimaryShard() {
143         LOG.debug("Tx {} Retrying findPrimaryShardAsync for shard {}", getIdentifier(), shardName);
144
145         this.primaryShardInfo = null;
146         Future<PrimaryShardInfo> findPrimaryFuture = getActorUtils().findPrimaryShardAsync(shardName);
147         findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
148             @Override
149             public void onComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) {
150                 onFindPrimaryShardComplete(failure, newPrimaryShardInfo);
151             }
152         }, getActorUtils().getClientDispatcher());
153     }
154
155     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
156             justification = "https://github.com/spotbugs/spotbugs/issues/811")
157     private void onFindPrimaryShardComplete(final Throwable failure, final PrimaryShardInfo newPrimaryShardInfo) {
158         if (failure == null) {
159             this.primaryShardInfo = newPrimaryShardInfo;
160             tryCreateTransaction();
161         } else {
162             LOG.debug("Tx {}: Find primary for shard {} failed", getIdentifier(), shardName, failure);
163
164             onCreateTransactionComplete(failure, null);
165         }
166     }
167
168     private void onCreateTransactionComplete(final Throwable failure, final Object response) {
169         // An AskTimeoutException will occur if the local shard forwards to an unavailable remote leader or
170         // the cached remote leader actor is no longer available.
171         boolean retryCreateTransaction = primaryShardInfo != null
172                 && (failure instanceof NoShardLeaderException || failure instanceof AskTimeoutException);
173
174         // Schedule a retry unless we're out of retries. Note: totalCreateTxTimeout is volatile as it may
175         // be written by different threads however not concurrently, therefore decrementing it
176         // non-atomically here is ok.
177         if (retryCreateTransaction && totalCreateTxTimeout > 0) {
178             long scheduleInterval = CREATE_TX_TRY_INTERVAL_IN_MS;
179             if (failure instanceof AskTimeoutException) {
180                 // Since we use the createTxMessageTimeout for the CreateTransaction request and it timed
181                 // out, subtract it from the total timeout. Also since the createTxMessageTimeout period
182                 // has already elapsed, we can immediately schedule the retry (10 ms is virtually immediate).
183                 totalCreateTxTimeout -= createTxMessageTimeout.duration().toMillis();
184                 scheduleInterval = 10;
185             }
186
187             totalCreateTxTimeout -= scheduleInterval;
188
189             LOG.debug("Tx {}: create tx on shard {} failed with exception \"{}\" - scheduling retry in {} ms",
190                     getIdentifier(), shardName, failure, scheduleInterval);
191
192             getActorUtils().getActorSystem().scheduler().scheduleOnce(
193                     FiniteDuration.create(scheduleInterval, TimeUnit.MILLISECONDS),
194                     this::tryFindPrimaryShard, getActorUtils().getClientDispatcher());
195             return;
196         }
197
198         createTransactionContext(failure, response);
199     }
200
201     private void createTransactionContext(final Throwable failure, final Object response) {
202         // Create the TransactionContext from the response or failure. Store the new
203         // TransactionContext locally until we've completed invoking the
204         // TransactionOperations. This avoids thread timing issues which could cause
205         // out-of-order TransactionOperations. Eg, on a modification operation, if the
206         // TransactionContext is non-null, then we directly call the TransactionContext.
207         // However, at the same time, the code may be executing the cached
208         // TransactionOperations. So to avoid thus timing, we don't publish the
209         // TransactionContext until after we've executed all cached TransactionOperations.
210         TransactionContext localTransactionContext;
211         if (failure != null) {
212             LOG.debug("Tx {} Creating NoOpTransaction because of error", getIdentifier(), failure);
213
214             Throwable resultingEx = failure;
215             if (failure instanceof AskTimeoutException) {
216                 resultingEx = new ShardLeaderNotRespondingException(String.format(
217                         "Could not create a %s transaction on shard %s. The shard leader isn't responding.",
218                         parent.getType(), shardName), failure);
219             } else if (!(failure instanceof NoShardLeaderException)) {
220                 resultingEx = new Exception(String.format(
221                     "Error creating %s transaction on shard %s", parent.getType(), shardName), failure);
222             }
223
224             localTransactionContext = new NoOpTransactionContext(resultingEx, getIdentifier());
225         } else if (CreateTransactionReply.isSerializedType(response)) {
226             localTransactionContext = createValidTransactionContext(
227                     CreateTransactionReply.fromSerializable(response));
228         } else {
229             IllegalArgumentException exception = new IllegalArgumentException(String.format(
230                     "Invalid reply type %s for CreateTransaction", response.getClass()));
231
232             localTransactionContext = new NoOpTransactionContext(exception, getIdentifier());
233         }
234         transactionContextWrapper.executePriorTransactionOperations(localTransactionContext);
235     }
236
237     private TransactionContext createValidTransactionContext(final CreateTransactionReply reply) {
238         LOG.debug("Tx {} Received {}", getIdentifier(), reply);
239
240         return createValidTransactionContext(getActorUtils().actorSelection(reply.getTransactionPath()),
241                 reply.getTransactionPath(), primaryShardInfo.getPrimaryShardVersion());
242     }
243
244     private TransactionContext createValidTransactionContext(final ActorSelection transactionActor,
245             final String transactionPath, final short remoteTransactionVersion) {
246         final TransactionContext ret = new RemoteTransactionContext(transactionContextWrapper.getIdentifier(),
247                 transactionActor, getActorUtils(), remoteTransactionVersion, transactionContextWrapper.getLimiter());
248
249         if (parent.getType() == TransactionType.READ_ONLY) {
250             TransactionContextCleanup.track(parent, ret);
251         }
252
253         return ret;
254     }
255 }
256