950434390dfe831192e8f1c847dcf35524d8f75a
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
33 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
35 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
41 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import scala.concurrent.Future;
49 import scala.concurrent.Promise;
50
51 /**
52  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
53  * <p>
54  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
55  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
56  * be created on each of those shards by the TransactionProxy
57  *</p>
58  * <p>
59  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
60  * shards will be executed.
61  * </p>
62  */
63 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
64
65     private static enum TransactionState {
66         OPEN,
67         READY,
68         CLOSED,
69     }
70
71     static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
72                                                               new Mapper<Throwable, Throwable>() {
73         @Override
74         public Throwable apply(Throwable failure) {
75             return failure;
76         }
77     };
78
79     private static final AtomicLong counter = new AtomicLong();
80
81     private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
82
83     /**
84      * Stores the remote Tx actors for each requested data store path to be used by the
85      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
86      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
87      * remoteTransactionActors list so they will be visible to the thread accessing the
88      * PhantomReference.
89      */
90     List<ActorSelection> remoteTransactionActors;
91     volatile AtomicBoolean remoteTransactionActorsMB;
92
93     /**
94      * Stores the create transaction results per shard.
95      */
96     private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
97
98     private final TransactionType transactionType;
99     final ActorContext actorContext;
100     private final String transactionChainId;
101     private final SchemaContext schemaContext;
102     private TransactionState state = TransactionState.OPEN;
103
104     private volatile boolean initialized;
105     private Semaphore operationLimiter;
106     private OperationCompleter operationCompleter;
107
108     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
109         this(actorContext, transactionType, "");
110     }
111
112     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
113         super(createIdentifier(actorContext));
114         this.actorContext = Preconditions.checkNotNull(actorContext,
115             "actorContext should not be null");
116         this.transactionType = Preconditions.checkNotNull(transactionType,
117             "transactionType should not be null");
118         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
119             "schemaContext should not be null");
120         this.transactionChainId = transactionChainId;
121
122         LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
123     }
124
125     private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
126         String memberName = actorContext.getCurrentMemberName();
127         if (memberName == null) {
128             memberName = "UNKNOWN-MEMBER";
129         }
130
131         return new TransactionIdentifier(memberName, counter.getAndIncrement());
132     }
133
134     @VisibleForTesting
135     boolean hasTransactionContext() {
136         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
137             TransactionContext transactionContext = txFutureCallback.getTransactionContext();
138             if(transactionContext != null) {
139                 return true;
140             }
141         }
142
143         return false;
144     }
145
146     private static boolean isRootPath(YangInstanceIdentifier path) {
147         return !path.getPathArguments().iterator().hasNext();
148     }
149
150     @Override
151     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
152
153         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
154                 "Read operation on write-only transaction is not allowed");
155
156         LOG.debug("Tx {} read {}", getIdentifier(), path);
157
158         final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
159
160         if(isRootPath(path)){
161             readAllData(path, proxyFuture);
162         } else {
163             throttleOperation();
164
165             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
166             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
167                 @Override
168                 public void invoke(TransactionContext transactionContext) {
169                     transactionContext.readData(path, proxyFuture);
170                 }
171             });
172
173         }
174
175         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
176     }
177
178     private void readAllData(final YangInstanceIdentifier path,
179                              final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
180         Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
181         List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
182
183         for(String shardName : allShardNames){
184             final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
185
186             throttleOperation();
187
188             TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
189             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
190                 @Override
191                 public void invoke(TransactionContext transactionContext) {
192                     transactionContext.readData(path, subProxyFuture);
193                 }
194             });
195
196             futures.add(subProxyFuture);
197         }
198
199         final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
200
201         future.addListener(new Runnable() {
202             @Override
203             public void run() {
204                 try {
205                     proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
206                             future.get(), actorContext.getSchemaContext()));
207                 } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
208                     proxyFuture.setException(e);
209                 }
210             }
211         }, actorContext.getActorSystem().dispatcher());
212     }
213
214     @Override
215     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
216
217         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
218                 "Exists operation on write-only transaction is not allowed");
219
220         LOG.debug("Tx {} exists {}", getIdentifier(), path);
221
222         throttleOperation();
223
224         final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
225
226         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
227         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
228             @Override
229             public void invoke(TransactionContext transactionContext) {
230                 transactionContext.dataExists(path, proxyFuture);
231             }
232         });
233
234         return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
235     }
236
237     private void checkModificationState() {
238         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
239                 "Modification operation on read-only transaction is not allowed");
240         Preconditions.checkState(state == TransactionState.OPEN,
241                 "Transaction is sealed - further modifications are not allowed");
242     }
243
244     private void throttleOperation() {
245         throttleOperation(1);
246     }
247
248     private void throttleOperation(int acquirePermits) {
249         if(!initialized) {
250             // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
251             operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
252             operationCompleter = new OperationCompleter(operationLimiter);
253
254             // Make sure we write this last because it's volatile and will also publish the non-volatile writes
255             // above as well so they'll be visible to other threads.
256             initialized = true;
257         }
258
259         try {
260             if(!operationLimiter.tryAcquire(acquirePermits,
261                     actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
262                 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
263             }
264         } catch (InterruptedException e) {
265             if(LOG.isDebugEnabled()) {
266                 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
267             } else {
268                 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
269             }
270         }
271     }
272
273     final void ensureInitializied() {
274         Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier());
275     }
276
277     @Override
278     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
279
280         checkModificationState();
281
282         LOG.debug("Tx {} write {}", getIdentifier(), path);
283
284         throttleOperation();
285
286         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
287         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
288             @Override
289             public void invoke(TransactionContext transactionContext) {
290                 transactionContext.writeData(path, data);
291             }
292         });
293     }
294
295     @Override
296     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
297
298         checkModificationState();
299
300         LOG.debug("Tx {} merge {}", getIdentifier(), path);
301
302         throttleOperation();
303
304         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
305         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
306             @Override
307             public void invoke(TransactionContext transactionContext) {
308                 transactionContext.mergeData(path, data);
309             }
310         });
311     }
312
313     @Override
314     public void delete(final YangInstanceIdentifier path) {
315
316         checkModificationState();
317
318         LOG.debug("Tx {} delete {}", getIdentifier(), path);
319
320         throttleOperation();
321
322         TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
323         txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
324             @Override
325             public void invoke(TransactionContext transactionContext) {
326                 transactionContext.deleteData(path);
327             }
328         });
329     }
330
331     private boolean seal(final TransactionState newState) {
332         if (state == TransactionState.OPEN) {
333             state = newState;
334             return true;
335         } else {
336             return false;
337         }
338     }
339
340     @Override
341     public AbstractThreePhaseCommitCohort<?> ready() {
342         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
343                 "Read-only transactions cannot be readied");
344
345         final boolean success = seal(TransactionState.READY);
346         Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
347
348         LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
349                     txFutureCallbackMap.size());
350
351         if (txFutureCallbackMap.isEmpty()) {
352             TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
353             return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
354         }
355
356         throttleOperation(txFutureCallbackMap.size());
357
358         final boolean isSingleShard = txFutureCallbackMap.size() == 1;
359         return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
360     }
361
362     @SuppressWarnings({ "rawtypes", "unchecked" })
363     private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
364         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
365
366         LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
367                 txFutureCallback.getShardName(), transactionChainId);
368
369         final OperationCallback.Reference operationCallbackRef =
370                 new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
371         final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
372         final Future future;
373         if (transactionContext != null) {
374             // avoid the creation of a promise and a TransactionOperation
375             future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
376         } else {
377             final Promise promise = akka.dispatch.Futures.promise();
378             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
379                 @Override
380                 public void invoke(TransactionContext transactionContext) {
381                     promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
382                 }
383             });
384             future = promise.future();
385         }
386
387         return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
388     }
389
390     private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
391             OperationCallback.Reference operationCallbackRef) {
392         if(transactionContext.supportsDirectCommit()) {
393             TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
394             operationCallbackRef.set(rateLimitingCallback);
395             rateLimitingCallback.run();
396             return transactionContext.directCommit();
397         } else {
398             return transactionContext.readyTransaction();
399         }
400     }
401
402     private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
403         List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
404         for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
405
406             LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
407                         txFutureCallback.getShardName(), transactionChainId);
408
409             final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
410             final Future<ActorSelection> future;
411             if (transactionContext != null) {
412                 // avoid the creation of a promise and a TransactionOperation
413                 future = transactionContext.readyTransaction();
414             } else {
415                 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
416                 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
417                     @Override
418                     public void invoke(TransactionContext transactionContext) {
419                         promise.completeWith(transactionContext.readyTransaction());
420                     }
421                 });
422                 future = promise.future();
423             }
424
425             cohortFutures.add(future);
426         }
427
428         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
429     }
430
431     @Override
432     public void close() {
433         if (!seal(TransactionState.CLOSED)) {
434             if (state == TransactionState.CLOSED) {
435                 // Idempotent no-op as per AutoCloseable recommendation
436                 return;
437             }
438
439             throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
440                 getIdentifier()));
441         }
442
443         for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
444             txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
445                 @Override
446                 public void invoke(TransactionContext transactionContext) {
447                     transactionContext.closeTransaction();
448                 }
449             });
450         }
451
452         txFutureCallbackMap.clear();
453
454         if(remoteTransactionActorsMB != null) {
455             remoteTransactionActors.clear();
456             remoteTransactionActorsMB.set(true);
457         }
458     }
459
460     private String shardNameFromIdentifier(YangInstanceIdentifier path){
461         return ShardStrategyFactory.getStrategy(path).findShard(path);
462     }
463
464     protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
465         return actorContext.findPrimaryShardAsync(shardName);
466     }
467
468     final TransactionType getTransactionType() {
469         return transactionType;
470     }
471
472     final Semaphore getOperationLimiter() {
473         return operationLimiter;
474     }
475
476     private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
477         String shardName = shardNameFromIdentifier(path);
478         return getOrCreateTxFutureCallback(shardName);
479     }
480
481     private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
482         TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
483         if(txFutureCallback == null) {
484             Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
485
486             final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
487
488             txFutureCallback = newTxFutureCallback;
489             txFutureCallbackMap.put(shardName, txFutureCallback);
490
491             findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
492                 @Override
493                 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
494                     if(failure != null) {
495                         newTxFutureCallback.createTransactionContext(failure, null);
496                     } else {
497                         newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
498                     }
499                 }
500             }, actorContext.getClientDispatcher());
501         }
502
503         return txFutureCallback;
504     }
505
506     String getTransactionChainId() {
507         return transactionChainId;
508     }
509
510     protected ActorContext getActorContext() {
511         return actorContext;
512     }
513
514     TransactionContext createValidTransactionContext(ActorSelection transactionActor,
515             String transactionPath, short remoteTransactionVersion) {
516
517         if (transactionType == TransactionType.READ_ONLY) {
518             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
519             // to close the remote Tx's when this instance is no longer in use and is garbage
520             // collected.
521
522             if(remoteTransactionActorsMB == null) {
523                 remoteTransactionActors = Lists.newArrayList();
524                 remoteTransactionActorsMB = new AtomicBoolean();
525
526                 TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
527             }
528
529             // Add the actor to the remoteTransactionActors list for access by the
530             // cleanup PhantonReference.
531             remoteTransactionActors.add(transactionActor);
532
533             // Write to the memory barrier volatile to publish the above update to the
534             // remoteTransactionActors list for thread visibility.
535             remoteTransactionActorsMB.set(true);
536         }
537
538         // TxActor is always created where the leader of the shard is.
539         // Check if TxActor is created in the same node
540         boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
541
542         if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
543             return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
544                     transactionChainId, actorContext, isTxActorLocal, remoteTransactionVersion,
545                     operationCompleter);
546         } else {
547             return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
548                     actorContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
549         }
550     }
551 }