2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
33 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
35 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
41 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import scala.concurrent.Future;
49 import scala.concurrent.Promise;
52 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
54 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
55 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
56 * be created on each of those shards by the TransactionProxy
59 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
60 * shards will be executed.
63 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
65 public static enum TransactionType {
71 private static final TransactionType[] VALUES = values();
73 public static TransactionType fromInt(final int type) {
76 } catch (IndexOutOfBoundsException e) {
77 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
82 private static enum TransactionState {
88 static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
89 new Mapper<Throwable, Throwable>() {
91 public Throwable apply(Throwable failure) {
96 private static final AtomicLong counter = new AtomicLong();
98 private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
101 * Stores the remote Tx actors for each requested data store path to be used by the
102 * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
103 * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
104 * remoteTransactionActors list so they will be visible to the thread accessing the
107 List<ActorSelection> remoteTransactionActors;
108 volatile AtomicBoolean remoteTransactionActorsMB;
111 * Stores the create transaction results per shard.
113 private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
115 private final TransactionType transactionType;
116 final ActorContext actorContext;
117 private final String transactionChainId;
118 private final SchemaContext schemaContext;
119 private TransactionState state = TransactionState.OPEN;
121 private volatile boolean initialized;
122 private Semaphore operationLimiter;
123 private OperationCompleter operationCompleter;
125 public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
126 this(actorContext, transactionType, "");
129 public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
130 super(createIdentifier(actorContext));
131 this.actorContext = Preconditions.checkNotNull(actorContext,
132 "actorContext should not be null");
133 this.transactionType = Preconditions.checkNotNull(transactionType,
134 "transactionType should not be null");
135 this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
136 "schemaContext should not be null");
137 this.transactionChainId = transactionChainId;
139 LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
142 private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
143 String memberName = actorContext.getCurrentMemberName();
144 if (memberName == null) {
145 memberName = "UNKNOWN-MEMBER";
148 return new TransactionIdentifier(memberName, counter.getAndIncrement());
152 boolean hasTransactionContext() {
153 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
154 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
155 if(transactionContext != null) {
163 private static boolean isRootPath(YangInstanceIdentifier path) {
164 return !path.getPathArguments().iterator().hasNext();
168 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
170 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
171 "Read operation on write-only transaction is not allowed");
173 LOG.debug("Tx {} read {}", getIdentifier(), path);
175 final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
177 if(isRootPath(path)){
178 readAllData(path, proxyFuture);
182 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
183 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
185 public void invoke(TransactionContext transactionContext) {
186 transactionContext.readData(path, proxyFuture);
192 return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
195 private void readAllData(final YangInstanceIdentifier path,
196 final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
197 Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
198 List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
200 for(String shardName : allShardNames){
201 final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
205 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
206 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
208 public void invoke(TransactionContext transactionContext) {
209 transactionContext.readData(path, subProxyFuture);
213 futures.add(subProxyFuture);
216 final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
218 future.addListener(new Runnable() {
222 proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
223 future.get(), actorContext.getSchemaContext()));
224 } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
225 proxyFuture.setException(e);
228 }, actorContext.getActorSystem().dispatcher());
232 public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
234 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
235 "Exists operation on write-only transaction is not allowed");
237 LOG.debug("Tx {} exists {}", getIdentifier(), path);
241 final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
243 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
244 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
246 public void invoke(TransactionContext transactionContext) {
247 transactionContext.dataExists(path, proxyFuture);
251 return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
254 private void checkModificationState() {
255 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
256 "Modification operation on read-only transaction is not allowed");
257 Preconditions.checkState(state == TransactionState.OPEN,
258 "Transaction is sealed - further modifications are not allowed");
261 private void throttleOperation() {
262 throttleOperation(1);
265 private void throttleOperation(int acquirePermits) {
267 // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
268 operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
269 operationCompleter = new OperationCompleter(operationLimiter);
271 // Make sure we write this last because it's volatile and will also publish the non-volatile writes
272 // above as well so they'll be visible to other threads.
277 if(!operationLimiter.tryAcquire(acquirePermits,
278 actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
279 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
281 } catch (InterruptedException e) {
282 if(LOG.isDebugEnabled()) {
283 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
285 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
290 final void ensureInitializied() {
291 Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier());
295 public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
297 checkModificationState();
299 LOG.debug("Tx {} write {}", getIdentifier(), path);
303 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
304 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
306 public void invoke(TransactionContext transactionContext) {
307 transactionContext.writeData(path, data);
313 public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
315 checkModificationState();
317 LOG.debug("Tx {} merge {}", getIdentifier(), path);
321 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
322 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
324 public void invoke(TransactionContext transactionContext) {
325 transactionContext.mergeData(path, data);
331 public void delete(final YangInstanceIdentifier path) {
333 checkModificationState();
335 LOG.debug("Tx {} delete {}", getIdentifier(), path);
339 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
340 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
342 public void invoke(TransactionContext transactionContext) {
343 transactionContext.deleteData(path);
348 private boolean seal(final TransactionState newState) {
349 if (state == TransactionState.OPEN) {
358 public AbstractThreePhaseCommitCohort<?> ready() {
359 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
360 "Read-only transactions cannot be readied");
362 final boolean success = seal(TransactionState.READY);
363 Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
365 LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
366 txFutureCallbackMap.size());
368 if (txFutureCallbackMap.isEmpty()) {
369 TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
370 return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
373 throttleOperation(txFutureCallbackMap.size());
375 final boolean isSingleShard = txFutureCallbackMap.size() == 1;
376 return isSingleShard ? createSingleCommitCohort() : createMultiCommitCohort();
379 @SuppressWarnings({ "rawtypes", "unchecked" })
380 private AbstractThreePhaseCommitCohort<Object> createSingleCommitCohort() {
381 TransactionFutureCallback txFutureCallback = txFutureCallbackMap.values().iterator().next();
383 LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
384 txFutureCallback.getShardName(), transactionChainId);
386 final OperationCallback.Reference operationCallbackRef =
387 new OperationCallback.Reference(OperationCallback.NO_OP_CALLBACK);
388 final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
390 if (transactionContext != null) {
391 // avoid the creation of a promise and a TransactionOperation
392 future = getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef);
394 final Promise promise = akka.dispatch.Futures.promise();
395 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
397 public void invoke(TransactionContext transactionContext) {
398 promise.completeWith(getReadyOrDirectCommitFuture(transactionContext, operationCallbackRef));
401 future = promise.future();
404 return new SingleCommitCohortProxy(actorContext, future, getIdentifier().toString(), operationCallbackRef);
407 private Future<?> getReadyOrDirectCommitFuture(TransactionContext transactionContext,
408 OperationCallback.Reference operationCallbackRef) {
409 if(transactionContext.supportsDirectCommit()) {
410 TransactionRateLimitingCallback rateLimitingCallback = new TransactionRateLimitingCallback(actorContext);
411 operationCallbackRef.set(rateLimitingCallback);
412 rateLimitingCallback.run();
413 return transactionContext.directCommit();
415 return transactionContext.readyTransaction();
419 private AbstractThreePhaseCommitCohort<ActorSelection> createMultiCommitCohort() {
420 List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
421 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
423 LOG.debug("Tx {} Readying transaction for shard {} on chain {}", getIdentifier(),
424 txFutureCallback.getShardName(), transactionChainId);
426 final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
427 final Future<ActorSelection> future;
428 if (transactionContext != null) {
429 // avoid the creation of a promise and a TransactionOperation
430 future = transactionContext.readyTransaction();
432 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
433 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
435 public void invoke(TransactionContext transactionContext) {
436 promise.completeWith(transactionContext.readyTransaction());
439 future = promise.future();
442 cohortFutures.add(future);
445 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, getIdentifier().toString());
449 public void close() {
450 if (!seal(TransactionState.CLOSED)) {
451 if (state == TransactionState.CLOSED) {
452 // Idempotent no-op as per AutoCloseable recommendation
456 throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
460 for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
461 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
463 public void invoke(TransactionContext transactionContext) {
464 transactionContext.closeTransaction();
469 txFutureCallbackMap.clear();
471 if(remoteTransactionActorsMB != null) {
472 remoteTransactionActors.clear();
473 remoteTransactionActorsMB.set(true);
477 private String shardNameFromIdentifier(YangInstanceIdentifier path){
478 return ShardStrategyFactory.getStrategy(path).findShard(path);
481 protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
482 return actorContext.findPrimaryShardAsync(shardName);
485 final TransactionType getTransactionType() {
486 return transactionType;
489 final Semaphore getOperationLimiter() {
490 return operationLimiter;
493 private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
494 String shardName = shardNameFromIdentifier(path);
495 return getOrCreateTxFutureCallback(shardName);
498 private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
499 TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
500 if(txFutureCallback == null) {
501 Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
503 final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
505 txFutureCallback = newTxFutureCallback;
506 txFutureCallbackMap.put(shardName, txFutureCallback);
508 findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
510 public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
511 if(failure != null) {
512 newTxFutureCallback.createTransactionContext(failure, null);
514 newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
517 }, actorContext.getClientDispatcher());
520 return txFutureCallback;
523 String getTransactionChainId() {
524 return transactionChainId;
527 protected ActorContext getActorContext() {
531 TransactionContext createValidTransactionContext(ActorSelection transactionActor,
532 String transactionPath, short remoteTransactionVersion) {
534 if (transactionType == TransactionType.READ_ONLY) {
535 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
536 // to close the remote Tx's when this instance is no longer in use and is garbage
539 if(remoteTransactionActorsMB == null) {
540 remoteTransactionActors = Lists.newArrayList();
541 remoteTransactionActorsMB = new AtomicBoolean();
543 TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
546 // Add the actor to the remoteTransactionActors list for access by the
547 // cleanup PhantonReference.
548 remoteTransactionActors.add(transactionActor);
550 // Write to the memory barrier volatile to publish the above update to the
551 // remoteTransactionActors list for thread visibility.
552 remoteTransactionActorsMB.set(true);
555 // TxActor is always created where the leader of the shard is.
556 // Check if TxActor is created in the same node
557 boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
559 if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
560 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
561 transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
564 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
565 actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);