Cleanup: Remove passing around of DataPersistenceProvider
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
33 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
36 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
37 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
38 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
40 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import scala.concurrent.Future;
47 import scala.concurrent.Promise;
48
49 /**
50  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
51  * <p>
52  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
53  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
54  * be created on each of those shards by the TransactionProxy
55  *</p>
56  * <p>
57  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
58  * shards will be executed.
59  * </p>
60  */
61 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
62
63     public static enum TransactionType {
64         READ_ONLY,
65         WRITE_ONLY,
66         READ_WRITE;
67
68         // Cache all values
69         private static final TransactionType[] VALUES = values();
70
71         public static TransactionType fromInt(final int type) {
72             try {
73                 return VALUES[type];
74             } catch (IndexOutOfBoundsException e) {
75                 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
76             }
77         }
78     }
79
80     private static enum TransactionState {
81         OPEN,
82         READY,
83         CLOSED,
84     }
85
86     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
87                                                               new Mapper<Throwable, Throwable>() {
88         @Override
89         public Throwable apply(Throwable failure) {
90             return failure;
91         }
92     };
93
94     private static final AtomicLong counter = new AtomicLong();
95
96     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
97
98     /**
99      * Stores the remote Tx actors for each requested data store path to be used by the
100      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
101      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
102      * remoteTransactionActors list so they will be visible to the thread accessing the
103      * PhantomReference.
104      */
105     List<ActorSelection> remoteTransactionActors;
106     volatile AtomicBoolean remoteTransactionActorsMB;
107
108     /**
109      * Stores the create transaction results per shard.
110      */
111     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
112
113     private final TransactionType transactionType;
114     final ActorContext actorContext;
115     private final String transactionChainId;
116     private final SchemaContext schemaContext;
117     private TransactionState state = TransactionState.OPEN;
118
119     private volatile boolean initialized;
120     private Semaphore operationLimiter;
121     private OperationCompleter operationCompleter;
122
123     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
124         this(actorContext, transactionType, "");
125     }
126
127     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
128         super(createIdentifier(actorContext));
129         this.actorContext = Preconditions.checkNotNull(actorContext,
130             "actorContext should not be null");
131         this.transactionType = Preconditions.checkNotNull(transactionType,
132             "transactionType should not be null");
133         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
134             "schemaContext should not be null");
135         this.transactionChainId = transactionChainId;
136
137         LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
138     }
139
140     private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
141         String memberName = actorContext.getCurrentMemberName();
142         if (memberName == null) {
143             memberName = "UNKNOWN-MEMBER";
144         }
145
146         return new TransactionIdentifier(memberName, counter.getAndIncrement());
147     }
148
149     @VisibleForTesting
150     boolean hasTransactionContext() {
151         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
152             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
153             if(transactionContext != null) {
154                 return true;
155             }
156         }
157
158         return false;
159     }
160
161     private boolean isRootPath(YangInstanceIdentifier path){
162         return !path.getPathArguments().iterator().hasNext();
163     }
164
165     @Override
166     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
167
168         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
169                 "Read operation on write-only transaction is not allowed");
170
171         LOG.debug("Tx {} read {}", getIdentifier(), path);
172
173         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
174
175         if(isRootPath(path)){
176             readAllData(path, proxyFuture);
177         } else {
178             throttleOperation();
179
180             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
181             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
182                 @Override
183                 public void invoke(TransactionContext transactionContext) {
184                     transactionContext.readData(path, proxyFuture);
185                 }
186             });
187
188         }
189
190         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
191     }
192
193     private void readAllData(final YangInstanceIdentifier path,
194                              final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
195         Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
196         List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
197
198         for(String shardName : allShardNames){
199             final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
200
201             throttleOperation();
202
203             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
204             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
205                 @Override
206                 public void invoke(TransactionContext transactionContext) {
207                     transactionContext.readData(path, subProxyFuture);
208                 }
209             });
210
211             futures.add(subProxyFuture);
212         }
213
214         final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
215
216         future.addListener(new Runnable() {
217             @Override
218             public void run() {
219                 try {
220                     proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
221                             future.get(), actorContext.getSchemaContext()));
222                 } catch (InterruptedException | ExecutionException e) {
223                     proxyFuture.setException(e);
224                 }
225             }
226         }, actorContext.getActorSystem().dispatcher());
227     }
228
229     @Override
230     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
231
232         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
233                 "Exists operation on write-only transaction is not allowed");
234
235         LOG.debug("Tx {} exists {}", getIdentifier(), path);
236
237         throttleOperation();
238
239         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
240
241         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
242         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
243             @Override
244             public void invoke(TransactionContext transactionContext) {
245                 transactionContext.dataExists(path, proxyFuture);
246             }
247         });
248
249         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
250     }
251
252     private void checkModificationState() {
253         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
254                 "Modification operation on read-only transaction is not allowed");
255         Preconditions.checkState(state == TransactionState.OPEN,
256                 "Transaction is sealed - further modifications are not allowed");
257     }
258
259     private void throttleOperation() {
260         throttleOperation(1);
261     }
262
263     private void throttleOperation(int acquirePermits) {
264         if(!initialized) {
265             // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
266             operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
267             operationCompleter = new OperationCompleter(operationLimiter);
268
269             // Make sure we write this last because it's volatile and will also publish the non-volatile writes
270             // above as well so they'll be visible to other threads.
271             initialized = true;
272         }
273
274         try {
275             if(!operationLimiter.tryAcquire(acquirePermits,
276                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
277                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
278             }
279         } catch (InterruptedException e) {
280             if(LOG.isDebugEnabled()) {
281                 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
282             } else {
283                 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
284             }
285         }
286     }
287
288     final void ensureInitializied() {
289         Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier());
290     }
291
292     @Override
293     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
294
295         checkModificationState();
296
297         LOG.debug("Tx {} write {}", getIdentifier(), path);
298
299         throttleOperation();
300
301         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
302         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
303             @Override
304             public void invoke(TransactionContext transactionContext) {
305                 transactionContext.writeData(path, data);
306             }
307         });
308     }
309
310     @Override
311     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
312
313         checkModificationState();
314
315         LOG.debug("Tx {} merge {}", getIdentifier(), path);
316
317         throttleOperation();
318
319         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
320         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
321             @Override
322             public void invoke(TransactionContext transactionContext) {
323                 transactionContext.mergeData(path, data);
324             }
325         });
326     }
327
328     @Override
329     public void delete(final YangInstanceIdentifier path) {
330
331         checkModificationState();
332
333         LOG.debug("Tx {} delete {}", getIdentifier(), path);
334
335         throttleOperation();
336
337         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
338         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
339             @Override
340             public void invoke(TransactionContext transactionContext) {
341                 transactionContext.deleteData(path);
342             }
343         });
344     }
345
346     private boolean seal(final TransactionState newState) {
347         if (state == TransactionState.OPEN) {
348             state = newState;
349             return true;
350         } else {
351             return false;
352         }
353     }
354
355     @Override
356     public AbstractThreePhaseCommitCohort ready() {
357         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
358                 "Read-only transactions cannot be readied");
359
360         final boolean success = seal(TransactionState.READY);
361         Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
362
363         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
364                     txFutureCallbackMap.size());
365
366         if (txFutureCallbackMap.isEmpty()) {
367             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
368             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
369         }
370
371         throttleOperation(txFutureCallbackMap.size());
372
373         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
374         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
375
376             LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
377                         txFutureCallback.getShardName(), transactionChainId);
378
379             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
380             final Future<ActorSelection> future;
381             if (transactionContext != null) {
382                 // avoid the creation of a promise and a TransactionOperation
383                 future = transactionContext.readyTransaction();
384             } else {
385                 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
386                 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
387                     @Override
388                     public void invoke(TransactionContext transactionContext) {
389                         promise.completeWith(transactionContext.readyTransaction());
390                     }
391                 });
392                 future = promise.future();
393             }
394
395             cohortFutures.add(future);
396         }
397
398         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
399             getIdentifier().toString());
400     }
401
402     @Override
403     public void close() {
404         if (!seal(TransactionState.CLOSED)) {
405             if (state == TransactionState.CLOSED) {
406                 // Idempotent no-op as per AutoCloseable recommendation
407                 return;
408             }
409
410             throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
411                 getIdentifier()));
412         }
413
414         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
415             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
416                 @Override
417                 public void invoke(TransactionContext transactionContext) {
418                     transactionContext.closeTransaction();
419                 }
420             });
421         }
422
423         txFutureCallbackMap.clear();
424
425         if(remoteTransactionActorsMB != null) {
426             remoteTransactionActors.clear();
427             remoteTransactionActorsMB.set(true);
428         }
429     }
430
431     private String shardNameFromIdentifier(YangInstanceIdentifier path){
432         return ShardStrategyFactory.getStrategy(path).findShard(path);
433     }
434
435     protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
436         return actorContext.findPrimaryShardAsync(shardName);
437     }
438
439     final TransactionType getTransactionType() {
440         return transactionType;
441     }
442
443     final Semaphore getOperationLimiter() {
444         return operationLimiter;
445     }
446
447     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
448         String shardName = shardNameFromIdentifier(path);
449         return getOrCreateTxFutureCallback(shardName);
450     }
451
452     private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
453         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
454         if(txFutureCallback == null) {
455             Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
456
457             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
458
459             txFutureCallback = newTxFutureCallback;
460             txFutureCallbackMap.put(shardName, txFutureCallback);
461
462             findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
463                 @Override
464                 public void onComplete(Throwable failure, ActorSelection primaryShard) {
465                     if(failure != null) {
466                         newTxFutureCallback.createTransactionContext(failure, null);
467                     } else {
468                         newTxFutureCallback.setPrimaryShard(primaryShard);
469                     }
470                 }
471             }, actorContext.getClientDispatcher());
472         }
473
474         return txFutureCallback;
475     }
476
477     String getTransactionChainId() {
478         return transactionChainId;
479     }
480
481     protected ActorContext getActorContext() {
482         return actorContext;
483     }
484
485     TransactionContext createValidTransactionContext(ActorSelection transactionActor,
486             String transactionPath, short remoteTransactionVersion) {
487
488         if (transactionType == TransactionType.READ_ONLY) {
489             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
490             // to close the remote Tx's when this instance is no longer in use and is garbage
491             // collected.
492
493             if(remoteTransactionActorsMB == null) {
494                 remoteTransactionActors = Lists.newArrayList();
495                 remoteTransactionActorsMB = new AtomicBoolean();
496
497                 TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
498             }
499
500             // Add the actor to the remoteTransactionActors list for access by the
501             // cleanup PhantonReference.
502             remoteTransactionActors.add(transactionActor);
503
504             // Write to the memory barrier volatile to publish the above update to the
505             // remoteTransactionActors list for thread visibility.
506             remoteTransactionActorsMB.set(true);
507         }
508
509         // TxActor is always created where the leader of the shard is.
510         // Check if TxActor is created in the same node
511         boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
512
513         if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
514             return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
515                     transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
516                     operationCompleter);
517         } else {
518             return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
519                     actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
520         }
521     }
522 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.