Merge "Add missing copyright text"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
33 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
35 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
41 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import scala.concurrent.Future;
49 import scala.concurrent.Promise;
50
51 /**
52  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
53  * <p>
54  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
55  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
56  * be created on each of those shards by the TransactionProxy
57  *</p>
58  * <p>
59  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
60  * shards will be executed.
61  * </p>
62  */
63 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
64
65     public static enum TransactionType {
66         READ_ONLY,
67         WRITE_ONLY,
68         READ_WRITE;
69
70         // Cache all values
71         private static final TransactionType[] VALUES = values();
72
73         public static TransactionType fromInt(final int type) {
74             try {
75                 return VALUES[type];
76             } catch (IndexOutOfBoundsException e) {
77                 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
78             }
79         }
80     }
81
82     private static enum TransactionState {
83         OPEN,
84         READY,
85         CLOSED,
86     }
87
88     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
89                                                               new Mapper<Throwable, Throwable>() {
90         @Override
91         public Throwable apply(Throwable failure) {
92             return failure;
93         }
94     };
95
96     private static final AtomicLong counter = new AtomicLong();
97
98     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
99
100     /**
101      * Stores the remote Tx actors for each requested data store path to be used by the
102      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
103      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
104      * remoteTransactionActors list so they will be visible to the thread accessing the
105      * PhantomReference.
106      */
107     List<ActorSelection> remoteTransactionActors;
108     volatile AtomicBoolean remoteTransactionActorsMB;
109
110     /**
111      * Stores the create transaction results per shard.
112      */
113     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
114
115     private final TransactionType transactionType;
116     final ActorContext actorContext;
117     private final String transactionChainId;
118     private final SchemaContext schemaContext;
119     private TransactionState state = TransactionState.OPEN;
120
121     private volatile boolean initialized;
122     private Semaphore operationLimiter;
123     private OperationCompleter operationCompleter;
124
125     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
126         this(actorContext, transactionType, "");
127     }
128
129     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
130         super(createIdentifier(actorContext));
131         this.actorContext = Preconditions.checkNotNull(actorContext,
132             "actorContext should not be null");
133         this.transactionType = Preconditions.checkNotNull(transactionType,
134             "transactionType should not be null");
135         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
136             "schemaContext should not be null");
137         this.transactionChainId = transactionChainId;
138
139         LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
140     }
141
142     private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
143         String memberName = actorContext.getCurrentMemberName();
144         if (memberName == null) {
145             memberName = "UNKNOWN-MEMBER";
146         }
147
148         return new TransactionIdentifier(memberName, counter.getAndIncrement());
149     }
150
151     @VisibleForTesting
152     boolean hasTransactionContext() {
153         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
154             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
155             if(transactionContext != null) {
156                 return true;
157             }
158         }
159
160         return false;
161     }
162
163     private static boolean isRootPath(YangInstanceIdentifier path) {
164         return !path.getPathArguments().iterator().hasNext();
165     }
166
167     @Override
168     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
169
170         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
171                 "Read operation on write-only transaction is not allowed");
172
173         LOG.debug("Tx {} read {}", getIdentifier(), path);
174
175         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
176
177         if(isRootPath(path)){
178             readAllData(path, proxyFuture);
179         } else {
180             throttleOperation();
181
182             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
183             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
184                 @Override
185                 public void invoke(TransactionContext transactionContext) {
186                     transactionContext.readData(path, proxyFuture);
187                 }
188             });
189
190         }
191
192         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
193     }
194
195     private void readAllData(final YangInstanceIdentifier path,
196                              final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
197         Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
198         List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
199
200         for(String shardName : allShardNames){
201             final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
202
203             throttleOperation();
204
205             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
206             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
207                 @Override
208                 public void invoke(TransactionContext transactionContext) {
209                     transactionContext.readData(path, subProxyFuture);
210                 }
211             });
212
213             futures.add(subProxyFuture);
214         }
215
216         final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
217
218         future.addListener(new Runnable() {
219             @Override
220             public void run() {
221                 try {
222                     proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
223                             future.get(), actorContext.getSchemaContext()));
224                 } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
225                     proxyFuture.setException(e);
226                 }
227             }
228         }, actorContext.getActorSystem().dispatcher());
229     }
230
231     @Override
232     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
233
234         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
235                 "Exists operation on write-only transaction is not allowed");
236
237         LOG.debug("Tx {} exists {}", getIdentifier(), path);
238
239         throttleOperation();
240
241         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
242
243         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
244         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
245             @Override
246             public void invoke(TransactionContext transactionContext) {
247                 transactionContext.dataExists(path, proxyFuture);
248             }
249         });
250
251         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
252     }
253
254     private void checkModificationState() {
255         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
256                 "Modification operation on read-only transaction is not allowed");
257         Preconditions.checkState(state == TransactionState.OPEN,
258                 "Transaction is sealed - further modifications are not allowed");
259     }
260
261     private void throttleOperation() {
262         throttleOperation(1);
263     }
264
265     private void throttleOperation(int acquirePermits) {
266         if(!initialized) {
267             // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
268             operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
269             operationCompleter = new OperationCompleter(operationLimiter);
270
271             // Make sure we write this last because it's volatile and will also publish the non-volatile writes
272             // above as well so they'll be visible to other threads.
273             initialized = true;
274         }
275
276         try {
277             if(!operationLimiter.tryAcquire(acquirePermits,
278                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
279                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
280             }
281         } catch (InterruptedException e) {
282             if(LOG.isDebugEnabled()) {
283                 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
284             } else {
285                 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
286             }
287         }
288     }
289
290     final void ensureInitializied() {
291         Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier());
292     }
293
294     @Override
295     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
296
297         checkModificationState();
298
299         LOG.debug("Tx {} write {}", getIdentifier(), path);
300
301         throttleOperation();
302
303         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
304         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
305             @Override
306             public void invoke(TransactionContext transactionContext) {
307                 transactionContext.writeData(path, data);
308             }
309         });
310     }
311
312     @Override
313     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
314
315         checkModificationState();
316
317         LOG.debug("Tx {} merge {}", getIdentifier(), path);
318
319         throttleOperation();
320
321         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
322         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
323             @Override
324             public void invoke(TransactionContext transactionContext) {
325                 transactionContext.mergeData(path, data);
326             }
327         });
328     }
329
330     @Override
331     public void delete(final YangInstanceIdentifier path) {
332
333         checkModificationState();
334
335         LOG.debug("Tx {} delete {}", getIdentifier(), path);
336
337         throttleOperation();
338
339         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
340         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
341             @Override
342             public void invoke(TransactionContext transactionContext) {
343                 transactionContext.deleteData(path);
344             }
345         });
346     }
347
348     private boolean seal(final TransactionState newState) {
349         if (state == TransactionState.OPEN) {
350             state = newState;
351             return true;
352         } else {
353             return false;
354         }
355     }
356
357     @Override
358     public AbstractThreePhaseCommitCohort<?> ready() {
359         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
360                 "Read-only transactions cannot be readied");
361
362         final boolean success = seal(TransactionState.READY);
363         Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
364
365         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
366                     txFutureCallbackMap.size());
367
368         if (txFutureCallbackMap.isEmpty()) {
369             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
370             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
371         }
372
373         throttleOperation(txFutureCallbackMap.size());
374
375         final boolean isSingleShard = txFutureCallbackMap.size() == 1;
376         return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
377     }
378
379     @SuppressWarnings({ "rawtypes", "unchecked" })
380     private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
381         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
382
383         LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
384                 txFutureCallback.getShardName(), transactionChainId);
385
386         final OperationCallback.Reference operationCallbackRef =
387                 new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
388         final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
389         final Future future;
390         if (transactionContext != null) {
391             // avoid the creation of a promise and a TransactionOperation
392             future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
393         } else {
394             final Promise promise = akka.dispatch.Futures.promise();
395             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
396                 @Override
397                 public void invoke(TransactionContext transactionContext) {
398                     promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
399                 }
400             });
401             future = promise.future();
402         }
403
404         return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
405     }
406
407     private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
408             OperationCallback.Reference operationCallbackRef) {
409         if(transactionContext.supportsDirectCommit()) {
410             TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
411             operationCallbackRef.set(rateLimitingCallback);
412             rateLimitingCallback.run();
413             return transactionContext.directCommit();
414         } else {
415             return transactionContext.readyTransaction();
416         }
417     }
418
419     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
420         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
421         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
422
423             LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
424                         txFutureCallback.getShardName(), transactionChainId);
425
426             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
427             final Future<ActorSelection> future;
428             if (transactionContext != null) {
429                 // avoid the creation of a promise and a TransactionOperation
430                 future = transactionContext.readyTransaction();
431             } else {
432                 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
433                 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
434                     @Override
435                     public void invoke(TransactionContext transactionContext) {
436                         promise.completeWith(transactionContext.readyTransaction());
437                     }
438                 });
439                 future = promise.future();
440             }
441
442             cohortFutures.add(future);
443         }
444
445         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
446     }
447
448     @Override
449     public void close() {
450         if (!seal(TransactionState.CLOSED)) {
451             if (state == TransactionState.CLOSED) {
452                 // Idempotent no-op as per AutoCloseable recommendation
453                 return;
454             }
455
456             throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
457                 getIdentifier()));
458         }
459
460         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
461             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
462                 @Override
463                 public void invoke(TransactionContext transactionContext) {
464                     transactionContext.closeTransaction();
465                 }
466             });
467         }
468
469         txFutureCallbackMap.clear();
470
471         if(remoteTransactionActorsMB != null) {
472             remoteTransactionActors.clear();
473             remoteTransactionActorsMB.set(true);
474         }
475     }
476
477     private String shardNameFromIdentifier(YangInstanceIdentifier path){
478         return ShardStrategyFactory.getStrategy(path).findShard(path);
479     }
480
481     protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
482         return actorContext.findPrimaryShardAsync(shardName);
483     }
484
485     final TransactionType getTransactionType() {
486         return transactionType;
487     }
488
489     final Semaphore getOperationLimiter() {
490         return operationLimiter;
491     }
492
493     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
494         String shardName = shardNameFromIdentifier(path);
495         return getOrCreateTxFutureCallback(shardName);
496     }
497
498     private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
499         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
500         if(txFutureCallback == null) {
501             Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
502
503             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
504
505             txFutureCallback = newTxFutureCallback;
506             txFutureCallbackMap.put(shardName, txFutureCallback);
507
508             findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
509                 @Override
510                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
511                     if(failure != null) {
512                         newTxFutureCallback.createTransactionContext(failure, null);
513                     } else {
514                         newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
515                     }
516                 }
517             }, actorContext.getClientDispatcher());
518         }
519
520         return txFutureCallback;
521     }
522
523     String getTransactionChainId() {
524         return transactionChainId;
525     }
526
527     protected ActorContext getActorContext() {
528         return actorContext;
529     }
530
531     TransactionContext createValidTransactionContext(ActorSelection transactionActor,
532             String transactionPath, short remoteTransactionVersion) {
533
534         if (transactionType == TransactionType.READ_ONLY) {
535             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
536             // to close the remote Tx's when this instance is no longer in use and is garbage
537             // collected.
538
539             if(remoteTransactionActorsMB == null) {
540                 remoteTransactionActors = Lists.newArrayList();
541                 remoteTransactionActorsMB = new AtomicBoolean();
542
543                 TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
544             }
545
546             // Add the actor to the remoteTransactionActors list for access by the
547             // cleanup PhantonReference.
548             remoteTransactionActors.add(transactionActor);
549
550             // Write to the memory barrier volatile to publish the above update to the
551             // remoteTransactionActors list for thread visibility.
552             remoteTransactionActorsMB.set(true);
553         }
554
555         // TxActor is always created where the leader of the shard is.
556         // Check if TxActor is created in the same node
557         boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
558
559         if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
560             return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
561                     transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
562                     operationCompleter);
563         } else {
564             return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
565                     actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
566         }
567     }
568 }