2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.Lists;
18 import com.google.common.util.concurrent.CheckedFuture;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
27 import java.util.concurrent.ExecutionException;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicLong;
32 import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
33 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
35 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
36 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
37 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
38 import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
40 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import scala.concurrent.Future;
48 import scala.concurrent.Promise;
51 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
53 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
54 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
55 * be created on each of those shards by the TransactionProxy
58 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
59 * shards will be executed.
62 public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIdentifier> implements DOMStoreReadWriteTransaction {
64 public static enum TransactionType {
70 private static final TransactionType[] VALUES = values();
72 public static TransactionType fromInt(final int type) {
75 } catch (IndexOutOfBoundsException e) {
76 throw new IllegalArgumentException("In TransactionType enum value " + type, e);
81 private static enum TransactionState {
87 static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
88 new Mapper<Throwable, Throwable>() {
90 public Throwable apply(Throwable failure) {
95 private static final AtomicLong counter = new AtomicLong();
97 private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
100 * Stores the remote Tx actors for each requested data store path to be used by the
101 * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
102 * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
103 * remoteTransactionActors list so they will be visible to the thread accessing the
106 List<ActorSelection> remoteTransactionActors;
107 volatile AtomicBoolean remoteTransactionActorsMB;
110 * Stores the create transaction results per shard.
112 private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
114 private final TransactionType transactionType;
115 final ActorContext actorContext;
116 private final String transactionChainId;
117 private final SchemaContext schemaContext;
118 private TransactionState state = TransactionState.OPEN;
120 private volatile boolean initialized;
121 private Semaphore operationLimiter;
122 private OperationCompleter operationCompleter;
124 public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
125 this(actorContext, transactionType, "");
128 public TransactionProxy(ActorContext actorContext, TransactionType transactionType, String transactionChainId) {
129 super(createIdentifier(actorContext));
130 this.actorContext = Preconditions.checkNotNull(actorContext,
131 "actorContext should not be null");
132 this.transactionType = Preconditions.checkNotNull(transactionType,
133 "transactionType should not be null");
134 this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
135 "schemaContext should not be null");
136 this.transactionChainId = transactionChainId;
138 LOG.debug("Created txn {} of type {} on chain {}", getIdentifier(), transactionType, transactionChainId);
141 private static TransactionIdentifier createIdentifier(ActorContext actorContext) {
142 String memberName = actorContext.getCurrentMemberName();
143 if (memberName == null) {
144 memberName = "UNKNOWN-MEMBER";
147 return new TransactionIdentifier(memberName, counter.getAndIncrement());
151 boolean hasTransactionContext() {
152 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
153 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
154 if(transactionContext != null) {
162 private boolean isRootPath(YangInstanceIdentifier path){
163 return !path.getPathArguments().iterator().hasNext();
167 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
169 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
170 "Read operation on write-only transaction is not allowed");
172 LOG.debug("Tx {} read {}", getIdentifier(), path);
174 final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
176 if(isRootPath(path)){
177 readAllData(path, proxyFuture);
181 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
182 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
184 public void invoke(TransactionContext transactionContext) {
185 transactionContext.readData(path, proxyFuture);
191 return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
194 private void readAllData(final YangInstanceIdentifier path,
195 final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
196 Set<String> allShardNames = actorContext.getConfiguration().getAllShardNames();
197 List<SettableFuture<Optional<NormalizedNode<?, ?>>>> futures = new ArrayList<>(allShardNames.size());
199 for(String shardName : allShardNames){
200 final SettableFuture<Optional<NormalizedNode<?, ?>>> subProxyFuture = SettableFuture.create();
204 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(shardName);
205 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
207 public void invoke(TransactionContext transactionContext) {
208 transactionContext.readData(path, subProxyFuture);
212 futures.add(subProxyFuture);
215 final ListenableFuture<List<Optional<NormalizedNode<?, ?>>>> future = Futures.allAsList(futures);
217 future.addListener(new Runnable() {
221 proxyFuture.set(NormalizedNodeAggregator.aggregate(YangInstanceIdentifier.builder().build(),
222 future.get(), actorContext.getSchemaContext()));
223 } catch (DataValidationFailedException | InterruptedException | ExecutionException e) {
224 proxyFuture.setException(e);
227 }, actorContext.getActorSystem().dispatcher());
231 public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
233 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
234 "Exists operation on write-only transaction is not allowed");
236 LOG.debug("Tx {} exists {}", getIdentifier(), path);
240 final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
242 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
243 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
245 public void invoke(TransactionContext transactionContext) {
246 transactionContext.dataExists(path, proxyFuture);
250 return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
253 private void checkModificationState() {
254 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
255 "Modification operation on read-only transaction is not allowed");
256 Preconditions.checkState(state == TransactionState.OPEN,
257 "Transaction is sealed - further modifications are not allowed");
260 private void throttleOperation() {
261 throttleOperation(1);
264 private void throttleOperation(int acquirePermits) {
266 // Note : Currently mailbox-capacity comes from akka.conf and not from the config-subsystem
267 operationLimiter = new Semaphore(actorContext.getTransactionOutstandingOperationLimit());
268 operationCompleter = new OperationCompleter(operationLimiter);
270 // Make sure we write this last because it's volatile and will also publish the non-volatile writes
271 // above as well so they'll be visible to other threads.
276 if(!operationLimiter.tryAcquire(acquirePermits,
277 actorContext.getDatastoreContext().getOperationTimeoutInSeconds(), TimeUnit.SECONDS)){
278 LOG.warn("Failed to acquire operation permit for transaction {}", getIdentifier());
280 } catch (InterruptedException e) {
281 if(LOG.isDebugEnabled()) {
282 LOG.debug("Interrupted when trying to acquire operation permit for transaction " + getIdentifier().toString(), e);
284 LOG.warn("Interrupted when trying to acquire operation permit for transaction {}", getIdentifier());
289 final void ensureInitializied() {
290 Preconditions.checkState(initialized, "Transaction %s was not propertly initialized.", getIdentifier());
294 public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
296 checkModificationState();
298 LOG.debug("Tx {} write {}", getIdentifier(), path);
302 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
303 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
305 public void invoke(TransactionContext transactionContext) {
306 transactionContext.writeData(path, data);
312 public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
314 checkModificationState();
316 LOG.debug("Tx {} merge {}", getIdentifier(), path);
320 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
321 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
323 public void invoke(TransactionContext transactionContext) {
324 transactionContext.mergeData(path, data);
330 public void delete(final YangInstanceIdentifier path) {
332 checkModificationState();
334 LOG.debug("Tx {} delete {}", getIdentifier(), path);
338 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
339 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
341 public void invoke(TransactionContext transactionContext) {
342 transactionContext.deleteData(path);
347 private boolean seal(final TransactionState newState) {
348 if (state == TransactionState.OPEN) {
357 public AbstractThreePhaseCommitCohort ready() {
358 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
359 "Read-only transactions cannot be readied");
361 final boolean success = seal(TransactionState.READY);
362 Preconditions.checkState(success, "Transaction %s is %s, it cannot be readied", getIdentifier(), state);
364 LOG.debug("Tx {} Readying {} transactions for commit", getIdentifier(),
365 txFutureCallbackMap.size());
367 if (txFutureCallbackMap.isEmpty()) {
368 TransactionRateLimitingCallback.adjustRateLimitForUnusedTransaction(actorContext);
369 return NoOpDOMStoreThreePhaseCommitCohort.INSTANCE;
372 throttleOperation(txFutureCallbackMap.size());
374 List<Future<ActorSelection>> cohortFutures = new ArrayList<>(txFutureCallbackMap.size());
375 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
377 LOG.debug("Tx {} Readying transaction for shard {} chain {}", getIdentifier(),
378 txFutureCallback.getShardName(), transactionChainId);
380 final TransactionContext transactionContext = txFutureCallback.getTransactionContext();
381 final Future<ActorSelection> future;
382 if (transactionContext != null) {
383 // avoid the creation of a promise and a TransactionOperation
384 future = transactionContext.readyTransaction();
386 final Promise<ActorSelection> promise = akka.dispatch.Futures.promise();
387 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
389 public void invoke(TransactionContext transactionContext) {
390 promise.completeWith(transactionContext.readyTransaction());
393 future = promise.future();
396 cohortFutures.add(future);
399 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
400 getIdentifier().toString());
404 public void close() {
405 if (!seal(TransactionState.CLOSED)) {
406 if (state == TransactionState.CLOSED) {
407 // Idempotent no-op as per AutoCloseable recommendation
411 throw new IllegalStateException(String.format("Transaction %s is ready, it cannot be closed",
415 for (TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
416 txFutureCallback.enqueueTransactionOperation(new TransactionOperation() {
418 public void invoke(TransactionContext transactionContext) {
419 transactionContext.closeTransaction();
424 txFutureCallbackMap.clear();
426 if(remoteTransactionActorsMB != null) {
427 remoteTransactionActors.clear();
428 remoteTransactionActorsMB.set(true);
432 private String shardNameFromIdentifier(YangInstanceIdentifier path){
433 return ShardStrategyFactory.getStrategy(path).findShard(path);
436 protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
437 return actorContext.findPrimaryShardAsync(shardName);
440 final TransactionType getTransactionType() {
441 return transactionType;
444 final Semaphore getOperationLimiter() {
445 return operationLimiter;
448 private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
449 String shardName = shardNameFromIdentifier(path);
450 return getOrCreateTxFutureCallback(shardName);
453 private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
454 TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
455 if(txFutureCallback == null) {
456 Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
458 final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
460 txFutureCallback = newTxFutureCallback;
461 txFutureCallbackMap.put(shardName, txFutureCallback);
463 findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
465 public void onComplete(Throwable failure, ActorSelection primaryShard) {
466 if(failure != null) {
467 newTxFutureCallback.createTransactionContext(failure, null);
469 newTxFutureCallback.setPrimaryShard(primaryShard);
472 }, actorContext.getClientDispatcher());
475 return txFutureCallback;
478 String getTransactionChainId() {
479 return transactionChainId;
482 protected ActorContext getActorContext() {
486 TransactionContext createValidTransactionContext(ActorSelection transactionActor,
487 String transactionPath, short remoteTransactionVersion) {
489 if (transactionType == TransactionType.READ_ONLY) {
490 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
491 // to close the remote Tx's when this instance is no longer in use and is garbage
494 if(remoteTransactionActorsMB == null) {
495 remoteTransactionActors = Lists.newArrayList();
496 remoteTransactionActorsMB = new AtomicBoolean();
498 TransactionProxyCleanupPhantomReference.track(TransactionProxy.this);
501 // Add the actor to the remoteTransactionActors list for access by the
502 // cleanup PhantonReference.
503 remoteTransactionActors.add(transactionActor);
505 // Write to the memory barrier volatile to publish the above update to the
506 // remoteTransactionActors list for thread visibility.
507 remoteTransactionActorsMB.set(true);
510 // TxActor is always created where the leader of the shard is.
511 // Check if TxActor is created in the same node
512 boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
514 if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
515 return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, getIdentifier(),
516 transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
519 return new TransactionContextImpl(transactionActor, getIdentifier(), transactionChainId,
520 actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);