Refactor TransactionContext.executeModification()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / RemoteTransactionContext.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9 package org.opendaylight.controller.cluster.datastore;
10
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
13
14 import akka.actor.ActorSelection;
15 import akka.dispatch.Futures;
16 import akka.dispatch.OnComplete;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.Optional;
19 import java.util.SortedSet;
20 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
21 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
22 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
23 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
24 import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
25 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
26 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
27 import org.opendaylight.controller.cluster.datastore.modification.Modification;
28 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
29 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
30 import org.opendaylight.mdsal.common.api.ReadFailedException;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import scala.concurrent.Future;
36
37 /**
38  * Redirects front-end transaction operations to a shard for processing. Instances of this class are used
39  * when the destination shard is remote to the caller.
40  *
41  * @author Thomas Pantelis
42  */
43 public class RemoteTransactionContext extends AbstractTransactionContext {
44     private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class);
45
46     private final ActorUtils actorUtils;
47     private final ActorSelection actor;
48     private final OperationLimiter limiter;
49
50     private BatchedModifications batchedModifications;
51     private int totalBatchedModificationsSent;
52     private int batchPermits;
53
54     /**
55      * We have observed a failed modification batch. This transaction context is effectively doomed, as the backend
56      * does not have a correct view of the world. If this happens, we do not limit operations but rather short-cut them
57      * to a either a no-op (modifications) or a failure (reads). Once the transaction is ready, though, we send the
58      * message to resynchronize with the backend, sharing a 'lost message' failure path.
59      */
60     private volatile Throwable failedModification;
61
62     protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor,
63             final ActorUtils actorUtils, final short remoteTransactionVersion, final OperationLimiter limiter) {
64         super(identifier, remoteTransactionVersion);
65         this.limiter = requireNonNull(limiter);
66         this.actor = actor;
67         this.actorUtils = actorUtils;
68     }
69
70     private ActorSelection getActor() {
71         return actor;
72     }
73
74     protected ActorUtils getActorUtils() {
75         return actorUtils;
76     }
77
78     @Override
79     public void closeTransaction() {
80         LOG.debug("Tx {} closeTransaction called", getIdentifier());
81         TransactionContextCleanup.untrack(this);
82
83         actorUtils.sendOperationAsync(getActor(), new CloseTransaction(getTransactionVersion()).toSerializable());
84     }
85
86     @Override
87     public Future<Object> directCommit(final Boolean havePermit) {
88         LOG.debug("Tx {} directCommit called", getIdentifier());
89
90         // Send the remaining batched modifications, if any, with the ready flag set.
91         bumpPermits(havePermit);
92         return sendBatchedModifications(true, true, Optional.empty());
93     }
94
95     @Override
96     public Future<ActorSelection> readyTransaction(final Boolean havePermit,
97             final Optional<SortedSet<String>> participatingShardNames) {
98         logModificationCount();
99
100         LOG.debug("Tx {} readyTransaction called", getIdentifier());
101
102         // Send the remaining batched modifications, if any, with the ready flag set.
103
104         bumpPermits(havePermit);
105         Future<Object> lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames);
106
107         return transformReadyReply(lastModificationsFuture);
108     }
109
110     private void bumpPermits(final Boolean havePermit) {
111         if (Boolean.TRUE.equals(havePermit)) {
112             ++batchPermits;
113         }
114     }
115
116     protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
117         // Transform the last reply Future into a Future that returns the cohort actor path from
118         // the last reply message. That's the end result of the ready operation.
119
120         return TransactionReadyReplyMapper.transform(readyReplyFuture, actorUtils, getIdentifier());
121     }
122
123     private BatchedModifications newBatchedModifications() {
124         return new BatchedModifications(getIdentifier(), getTransactionVersion());
125     }
126
127     private void batchModification(final Modification modification, final boolean havePermit) {
128         incrementModificationCount();
129         if (havePermit) {
130             ++batchPermits;
131         }
132
133         if (batchedModifications == null) {
134             batchedModifications = newBatchedModifications();
135         }
136
137         batchedModifications.addModification(modification);
138
139         if (batchedModifications.getModifications().size()
140                 >= actorUtils.getDatastoreContext().getShardBatchedModificationCount()) {
141             sendBatchedModifications();
142         }
143     }
144
145     protected Future<Object> sendBatchedModifications() {
146         return sendBatchedModifications(false, false, Optional.empty());
147     }
148
149     protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady,
150             final Optional<SortedSet<String>> participatingShardNames) {
151         Future<Object> sent = null;
152         if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
153             if (batchedModifications == null) {
154                 batchedModifications = newBatchedModifications();
155             }
156
157             LOG.debug("Tx {} sending {} batched modifications, ready: {}", getIdentifier(),
158                     batchedModifications.getModifications().size(), ready);
159
160             batchedModifications.setDoCommitOnReady(doCommitOnReady);
161             batchedModifications.setTotalMessagesSent(++totalBatchedModificationsSent);
162
163             final BatchedModifications toSend = batchedModifications;
164             final int permitsToRelease = batchPermits;
165             batchPermits = 0;
166
167             if (ready) {
168                 batchedModifications.setReady(participatingShardNames);
169                 batchedModifications.setDoCommitOnReady(doCommitOnReady);
170                 batchedModifications = null;
171             } else {
172                 batchedModifications = newBatchedModifications();
173
174                 final Throwable failure = failedModification;
175                 if (failure != null) {
176                     // We have observed a modification failure, it does not make sense to send this batch. This speeds
177                     // up the time when the application could be blocked due to messages timing out and operation
178                     // limiter kicking in.
179                     LOG.debug("Tx {} modifications previously failed, not sending a non-ready batch", getIdentifier());
180                     limiter.release(permitsToRelease);
181                     return Futures.failed(failure);
182                 }
183             }
184
185             sent = actorUtils.executeOperationAsync(getActor(), toSend.toSerializable(),
186                 actorUtils.getTransactionCommitOperationTimeout());
187             sent.onComplete(new OnComplete<>() {
188                 @Override
189                 public void onComplete(final Throwable failure, final Object success) {
190                     if (failure != null) {
191                         LOG.debug("Tx {} modifications failed", getIdentifier(), failure);
192                         failedModification = failure;
193                     } else {
194                         LOG.debug("Tx {} modifications completed with {}", getIdentifier(), success);
195                     }
196                     limiter.release(permitsToRelease);
197                 }
198             }, actorUtils.getClientDispatcher());
199         }
200
201         return sent;
202     }
203
204     @Override
205     public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
206         LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
207         executeModification(new DeleteModification(path), havePermit);
208     }
209
210     @Override
211     public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
212             final Boolean havePermit) {
213         LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
214         executeModification(new MergeModification(path, data), havePermit);
215     }
216
217     @Override
218     public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
219             final Boolean havePermit) {
220         LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
221         executeModification(new WriteModification(path, data), havePermit);
222     }
223
224     private void executeModification(final AbstractModification modification, final Boolean havePermit) {
225         final boolean permitToRelease;
226         if (havePermit == null) {
227             permitToRelease = failedModification == null && acquireOperation();
228         } else {
229             permitToRelease = havePermit.booleanValue();
230         }
231
232         batchModification(modification, permitToRelease);
233     }
234
235     @Override
236     public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture,
237             final Boolean havePermit) {
238         LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
239                 readCmd.getPath());
240
241         final Throwable failure = failedModification;
242         if (failure != null) {
243             // If we know there was a previous modification failure, we must not send a read request, as it risks
244             // returning incorrect data. We check this before acquiring an operation simply because we want the app
245             // to complete this transaction as soon as possible.
246             returnFuture.setException(new ReadFailedException("Previous modification failed, cannot "
247                     + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
248             return;
249         }
250
251         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
252         // public API contract.
253
254         final boolean permitToRelease = havePermit == null ? acquireOperation() : havePermit.booleanValue();
255         sendBatchedModifications();
256
257         OnComplete<Object> onComplete = new OnComplete<>() {
258             @Override
259             public void onComplete(final Throwable failure, final Object response) {
260                 // We have previously acquired an operation, now release it, no matter what happened
261                 if (permitToRelease) {
262                     limiter.release();
263                 }
264
265                 if (failure != null) {
266                     LOG.debug("Tx {} {} operation failed", getIdentifier(), readCmd.getClass().getSimpleName(),
267                         failure);
268
269                     returnFuture.setException(new ReadFailedException("Error checking "
270                         + readCmd.getClass().getSimpleName() + " for path " + readCmd.getPath(), failure));
271                 } else {
272                     LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName());
273                     readCmd.processResponse(response, returnFuture);
274                 }
275             }
276         };
277
278         final Future<Object> future = actorUtils.executeOperationAsync(getActor(),
279             readCmd.asVersion(getTransactionVersion()).toSerializable(), actorUtils.getOperationTimeout());
280         future.onComplete(onComplete, actorUtils.getClientDispatcher());
281     }
282
283     /**
284      * Acquire operation from the limiter if the hand-off has completed. If the hand-off is still ongoing, this method
285      * does nothing.
286      *
287      * @return True if a permit was successfully acquired, false otherwise
288      */
289     private boolean acquireOperation() {
290         checkState(isOperationHandOffComplete(),
291             "Attempted to acquire execute operation permit for transaction %s on actor %s during handoff",
292             getIdentifier(), actor);
293
294         if (limiter.acquire()) {
295             return true;
296         }
297
298         LOG.warn("Failed to acquire execute operation permit for transaction {} on actor {}", getIdentifier(), actor);
299         return false;
300     }
301
302     @Override
303     public boolean usesOperationLimiting() {
304         return true;
305     }
306 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.