CDS: Introduce ChainedTransactionIdentifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
33 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
35 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
41 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import scala.concurrent.Future;
49 import scala.concurrent.Promise;
50
51 /**
52  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
53  * <p>
54  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
55  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
56  * be created on each of those shards by the TransactionProxy
57  *</p>
58  * <p>
59  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
60  * shards will be executed.
61  * </p>
62  */
63 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
64
65     private static enum TransactionState {
66         OPEN,
67         READY,
68         CLOSED,
69     }
70
71     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
72                                                               new Mapper<Throwable, Throwable>() {
73         @Override
74         public Throwable apply(Throwable failure) {
75             return failure;
76         }
77     };
78
79     private static final AtomicLong counter = new AtomicLong();
80
81     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
82
83     /**
84      * Stores the remote Tx actors for each requested data store path to be used by the
85      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
86      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
87      * remoteTransactionActors list so they will be visible to the thread accessing the
88      * PhantomReference.
89      */
90     List<ActorSelection> remoteTransactionActors;
91     volatile AtomicBoolean remoteTransactionActorsMB;
92
93     /**
94      * Stores the create transaction results per shard.
95      */
96     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
97
98     private final TransactionType transactionType;
99     final ActorContext actorContext;
100     private final SchemaContext schemaContext;
101     private TransactionState state = TransactionState.OPEN;
102
103     private volatile boolean initialized;
104     private Semaphore operationLimiter;
105     private OperationCompleter operationCompleter;
106
107     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
108         this(actorContext, transactionType, "");
109     }
110
111     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
112         super(createIdentifier(actorContext, transactionChainId));
113         this.actorContext = Preconditions.checkNotNull(actorContext,
114             "actorContext should not be null");
115         this.transactionType = Preconditions.checkNotNull(transactionType,
116             "transactionType should not be null");
117         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
118             "schemaContext should not be null");
119
120         LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
121     }
122
123     private static TransactionIdentifier createIdentifier(ActorContext actorContext, String transactionChainId) {
124         String memberName = actorContext.getCurrentMemberName();
125         if (memberName == null) {
126             memberName = "UNKNOWN-MEMBER";
127         }
128
129         return TransactionIdentifier.create(memberName, counter.getAndIncrement(), transactionChainId);
130     }
131
132     @VisibleForTesting
133     boolean hasTransactionContext() {
134         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
135             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
136             if(transactionContext != null) {
137                 return true;
138             }
139         }
140
141         return false;
142     }
143
144     private static boolean isRootPath(YangInstanceIdentifier path) {
145         return !path.getPathArguments().iterator().hasNext();
146     }
147
148     @Override
149     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
150
151         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
152                 "Read operation on write-only transaction is not allowed");
153
154         LOG.debug("Tx {} read {}", getIdentifier(), path);
155
156         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
157
158         if(isRootPath(path)){
159             readAllData(path, proxyFuture);
160         } else {
161             throttleOperation();
162
163             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
164             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
165                 @Override
166                 public void invoke(TransactionContext transactionContext) {
167                     transactionContext.readData(path, proxyFuture);
168                 }
169             });
170
171         }
172
173         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
174     }
175
176     private void readAllData(final YangInstanceIdentifier path,
177                              final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
178         Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
179         List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
180
181         for(String shardName : allShardNames){
182             final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
183
184             throttleOperation();
185
186             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
187             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
188                 @Override
189                 public void invoke(TransactionContext transactionContext) {
190                     transactionContext.readData(path, subProxyFuture);
191                 }
192             });
193
194             futures.add(subProxyFuture);
195         }
196
197         final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
198
199         future.addListener(new Runnable() {
200             @Override
201             public void run() {
202                 try {
203                     proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
204                             future.get(), actorContext.getSchemaContext()));
205                 } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
206                     proxyFuture.setException(e);
207                 }
208             }
209         }, actorContext.getActorSystem().dispatcher());
210     }
211
212     @Override
213     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
214
215         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
216                 "Exists operation on write-only transaction is not allowed");
217
218         LOG.debug("Tx {} exists {}", getIdentifier(), path);
219
220         throttleOperation();
221
222         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
223
224         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
225         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
226             @Override
227             public void invoke(TransactionContext transactionContext) {
228                 transactionContext.dataExists(path, proxyFuture);
229             }
230         });
231
232         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
233     }
234
235     private void checkModificationState() {
236         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
237                 "Modification operation on read-only transaction is not allowed");
238         Preconditions.checkState(state == TransactionState.OPEN,
239                 "Transaction is sealed - further modifications are not allowed");
240     }
241
242     private void throttleOperation() {
243         throttleOperation(1);
244     }
245
246     private void throttleOperation(int acquirePermits) {
247         if(!initialized) {
248             // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
249             operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
250             operationCompleter = new OperationCompleter(operationLimiter);
251
252             // Make sure we write this last because it's volatile and will also publish the non-volatile writes
253             // above as well so they'll be visible to other threads.
254             initialized = true;
255         }
256
257         try {
258             if(!operationLimiter.tryAcquire(acquirePermits,
259                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
260                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
261             }
262         } catch (InterruptedException e) {
263             if(LOG.isDebugEnabled()) {
264                 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
265             } else {
266                 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
267             }
268         }
269     }
270
271     final void ensureInitializied() {
272         Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier());
273     }
274
275     @Override
276     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
277
278         checkModificationState();
279
280         LOG.debug("Tx {} write {}", getIdentifier(), path);
281
282         throttleOperation();
283
284         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
285         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
286             @Override
287             public void invoke(TransactionContext transactionContext) {
288                 transactionContext.writeData(path, data);
289             }
290         });
291     }
292
293     @Override
294     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
295
296         checkModificationState();
297
298         LOG.debug("Tx {} merge {}", getIdentifier(), path);
299
300         throttleOperation();
301
302         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
303         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
304             @Override
305             public void invoke(TransactionContext transactionContext) {
306                 transactionContext.mergeData(path, data);
307             }
308         });
309     }
310
311     @Override
312     public void delete(final YangInstanceIdentifier path) {
313
314         checkModificationState();
315
316         LOG.debug("Tx {} delete {}", getIdentifier(), path);
317
318         throttleOperation();
319
320         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
321         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
322             @Override
323             public void invoke(TransactionContext transactionContext) {
324                 transactionContext.deleteData(path);
325             }
326         });
327     }
328
329     private boolean seal(final TransactionState newState) {
330         if (state == TransactionState.OPEN) {
331             state = newState;
332             return true;
333         } else {
334             return false;
335         }
336     }
337
338     @Override
339     public AbstractThreePhaseCommitCohort<?> ready() {
340         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
341                 "Read-only transactions cannot be readied");
342
343         final boolean success = seal(TransactionState.READY);
344         Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
345
346         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
347                     txFutureCallbackMap.size());
348
349         if (txFutureCallbackMap.isEmpty()) {
350             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
351             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
352         }
353
354         throttleOperation(txFutureCallbackMap.size());
355
356         final boolean isSingleShard = txFutureCallbackMap.size() == 1;
357         return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
358     }
359
360     @SuppressWarnings({ "rawtypes", "unchecked" })
361     private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
362         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
363
364         LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
365                 txFutureCallback.getShardName(), getTransactionChainId());
366
367         final OperationCallback.Reference operationCallbackRef =
368                 new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
369         final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
370         final Future future;
371         if (transactionContext != null) {
372             // avoid the creation of a promise and a TransactionOperation
373             future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
374         } else {
375             final Promise promise = akka.dispatch.Futures.promise();
376             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
377                 @Override
378                 public void invoke(TransactionContext transactionContext) {
379                     promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
380                 }
381             });
382             future = promise.future();
383         }
384
385         return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
386     }
387
388     private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
389             OperationCallback.Reference operationCallbackRef) {
390         if(transactionContext.supportsDirectCommit()) {
391             TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
392             operationCallbackRef.set(rateLimitingCallback);
393             rateLimitingCallback.run();
394             return transactionContext.directCommit();
395         } else {
396             return transactionContext.readyTransaction();
397         }
398     }
399
400     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
401         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
402         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
403
404             LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
405                         txFutureCallback.getShardName(), getTransactionChainId());
406
407             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
408             final Future<ActorSelection> future;
409             if (transactionContext != null) {
410                 // avoid the creation of a promise and a TransactionOperation
411                 future = transactionContext.readyTransaction();
412             } else {
413                 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
414                 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
415                     @Override
416                     public void invoke(TransactionContext transactionContext) {
417                         promise.completeWith(transactionContext.readyTransaction());
418                     }
419                 });
420                 future = promise.future();
421             }
422
423             cohortFutures.add(future);
424         }
425
426         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
427     }
428
429     @Override
430     public void close() {
431         if (!seal(TransactionState.CLOSED)) {
432             if (state == TransactionState.CLOSED) {
433                 // Idempotent no-op as per AutoCloseable recommendation
434                 return;
435             }
436
437             throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
438                 getIdentifier()));
439         }
440
441         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
442             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
443                 @Override
444                 public void invoke(TransactionContext transactionContext) {
445                     transactionContext.closeTransaction();
446                 }
447             });
448         }
449
450         txFutureCallbackMap.clear();
451
452         if(remoteTransactionActorsMB != null) {
453             remoteTransactionActors.clear();
454             remoteTransactionActorsMB.set(true);
455         }
456     }
457
458     private String shardNameFromIdentifier(YangInstanceIdentifier path){
459         return ShardStrategyFactory.getStrategy(path).findShard(path);
460     }
461
462     protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
463         return actorContext.findPrimaryShardAsync(shardName);
464     }
465
466     final TransactionType getTransactionType() {
467         return transactionType;
468     }
469
470     final Semaphore getOperationLimiter() {
471         return operationLimiter;
472     }
473
474     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
475         String shardName = shardNameFromIdentifier(path);
476         return getOrCreateTxFutureCallback(shardName);
477     }
478
479     private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
480         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
481         if(txFutureCallback == null) {
482             Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
483
484             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
485
486             txFutureCallback = newTxFutureCallback;
487             txFutureCallbackMap.put(shardName, txFutureCallback);
488
489             findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
490                 @Override
491                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
492                     if(failure != null) {
493                         newTxFutureCallback.createTransactionContext(failure, null);
494                     } else {
495                         newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
496                     }
497                 }
498             }, actorContext.getClientDispatcher());
499         }
500
501         return txFutureCallback;
502     }
503
504     String getTransactionChainId() {
505         return getIdentifier().getChainId();
506     }
507
508     protected ActorContext getActorContext() {
509         return actorContext;
510     }
511
512     TransactionContext createValidTransactionContext(ActorSelection transactionActor,
513             String transactionPath, short remoteTransactionVersion) {
514
515         if (transactionType == TransactionType.READ_ONLY) {
516             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
517             // to close the remote Tx's when this instance is no longer in use and is garbage
518             // collected.
519
520             if(remoteTransactionActorsMB == null) {
521                 remoteTransactionActors = Lists.newArrayList();
522                 remoteTransactionActorsMB = new AtomicBoolean();
523
524                 TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
525             }
526
527             // Add the actor to the remoteTransactionActors list for access by the
528             // cleanup PhantonReference.
529             remoteTransactionActors.add(transactionActor);
530
531             // Write to the memory barrier volatile to publish the above update to the
532             // remoteTransactionActors list for thread visibility.
533             remoteTransactionActorsMB.set(true);
534         }
535
536         // TxActor is always created where the leader of the shard is.
537         // Check if TxActor is created in the same node
538         boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
539
540         if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
541             return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
542                     actorContext, isTxActorLocal, remoteTransactionVersion,
543                     operationCompleter);
544         } else {
545             return new TransactionContextImpl(transactionActor, getIdentifier(),
546                     actorContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
547         }
548     }
549 }