2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.FinalizablePhantomReference;
16 import com.google.common.base.FinalizableReferenceQueue;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.CheckedFuture;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import java.util.HashMap;
25 import java.util.List;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicLong;
31 import javax.annotation.concurrent.GuardedBy;
32 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
33 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
35 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
38 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
39 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
40 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
43 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
44 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
45 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
46 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
47 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
48 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
49 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
50 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
51 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import scala.concurrent.Future;
58 import scala.concurrent.Promise;
59 import scala.concurrent.duration.FiniteDuration;
62 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
64 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
65 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
66 * be created on each of those shards by the TransactionProxy
69 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
70 * shards will be executed.
73 public class TransactionProxy implements DOMStoreReadWriteTransaction {
75 public static enum TransactionType {
81 static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
82 new Mapper<Throwable, Throwable>() {
84 public Throwable apply(Throwable failure) {
89 private static final AtomicLong counter = new AtomicLong();
91 private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
94 * Time interval in between transaction create retries.
96 private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
97 FiniteDuration.create(1, TimeUnit.SECONDS);
100 * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
101 * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
102 * trickery to clean up its internal thread when the bundle is unloaded.
104 private static final FinalizableReferenceQueue phantomReferenceQueue =
105 new FinalizableReferenceQueue();
108 * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
109 * necessary because PhantomReferences need a hard reference so they're not garbage collected.
110 * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
111 * and thus becomes eligible for garbage collection.
113 private static final Map<TransactionProxyCleanupPhantomReference,
114 TransactionProxyCleanupPhantomReference> phantomReferenceCache =
115 new ConcurrentHashMap<>();
118 * A PhantomReference that closes remote transactions for a TransactionProxy when it's
119 * garbage collected. This is used for read-only transactions as they're not explicitly closed
120 * by clients. So the only way to detect that a transaction is no longer in use and it's safe
121 * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
122 * but TransactionProxy instances should generally be short-lived enough to avoid being moved
123 * to the old generation space and thus should be cleaned up in a timely manner as the GC
124 * runs on the young generation (eden, swap1...) space much more frequently.
126 private static class TransactionProxyCleanupPhantomReference
127 extends FinalizablePhantomReference<TransactionProxy> {
129 private final List<ActorSelection> remoteTransactionActors;
130 private final AtomicBoolean remoteTransactionActorsMB;
131 private final ActorContext actorContext;
132 private final TransactionIdentifier identifier;
134 protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
135 super(referent, phantomReferenceQueue);
137 // Note we need to cache the relevant fields from the TransactionProxy as we can't
138 // have a hard reference to the TransactionProxy instance itself.
140 remoteTransactionActors = referent.remoteTransactionActors;
141 remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
142 actorContext = referent.actorContext;
143 identifier = referent.identifier;
147 public void finalizeReferent() {
148 LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
149 remoteTransactionActors.size(), identifier);
151 phantomReferenceCache.remove(this);
153 // Access the memory barrier volatile to ensure all previous updates to the
154 // remoteTransactionActors list are visible to this thread.
156 if(remoteTransactionActorsMB.get()) {
157 for(ActorSelection actor : remoteTransactionActors) {
158 LOG.trace("Sending CloseTransaction to {}", actor);
159 actorContext.sendOperationAsync(actor,
160 new CloseTransaction().toSerializable());
167 * Stores the remote Tx actors for each requested data store path to be used by the
168 * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
169 * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
170 * remoteTransactionActors list so they will be visible to the thread accessing the
173 private List<ActorSelection> remoteTransactionActors;
174 private AtomicBoolean remoteTransactionActorsMB;
177 * Stores the create transaction results per shard.
179 private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
181 private final TransactionType transactionType;
182 private final ActorContext actorContext;
183 private final TransactionIdentifier identifier;
184 private final String transactionChainId;
185 private final SchemaContext schemaContext;
186 private boolean inReadyState;
188 public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
189 this(actorContext, transactionType, "");
192 public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
193 String transactionChainId) {
194 this.actorContext = Preconditions.checkNotNull(actorContext,
195 "actorContext should not be null");
196 this.transactionType = Preconditions.checkNotNull(transactionType,
197 "transactionType should not be null");
198 this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
199 "schemaContext should not be null");
200 this.transactionChainId = transactionChainId;
202 String memberName = actorContext.getCurrentMemberName();
203 if(memberName == null){
204 memberName = "UNKNOWN-MEMBER";
207 this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
208 counter.getAndIncrement()).build();
210 if(transactionType == TransactionType.READ_ONLY) {
211 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
212 // to close the remote Tx's when this instance is no longer in use and is garbage
215 remoteTransactionActors = Lists.newArrayList();
216 remoteTransactionActorsMB = new AtomicBoolean();
218 TransactionProxyCleanupPhantomReference cleanup =
219 new TransactionProxyCleanupPhantomReference(this);
220 phantomReferenceCache.put(cleanup, cleanup);
223 LOG.debug("Created txn {} of type {}", identifier, transactionType);
227 List<Future<Object>> getRecordedOperationFutures() {
228 List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
229 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
230 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
231 if(transactionContext != null) {
232 recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
236 return recordedOperationFutures;
240 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
241 final YangInstanceIdentifier path) {
243 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
244 "Read operation on write-only transaction is not allowed");
246 LOG.debug("Tx {} read {}", identifier, path);
248 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
249 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
251 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future;
252 if(transactionContext != null) {
253 future = transactionContext.readData(path);
255 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
256 // callback to be executed after the Tx is created.
257 final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
258 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
260 public void invoke(TransactionContext transactionContext) {
261 Futures.addCallback(transactionContext.readData(path),
262 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
264 public void onSuccess(Optional<NormalizedNode<?, ?>> data) {
265 proxyFuture.set(data);
269 public void onFailure(Throwable t) {
270 proxyFuture.setException(t);
276 future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
283 public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
285 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
286 "Exists operation on write-only transaction is not allowed");
288 LOG.debug("Tx {} exists {}", identifier, path);
290 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
291 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
293 CheckedFuture<Boolean, ReadFailedException> future;
294 if(transactionContext != null) {
295 future = transactionContext.dataExists(path);
297 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
298 // callback to be executed after the Tx is created.
299 final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
300 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
302 public void invoke(TransactionContext transactionContext) {
303 Futures.addCallback(transactionContext.dataExists(path),
304 new FutureCallback<Boolean>() {
306 public void onSuccess(Boolean exists) {
307 proxyFuture.set(exists);
311 public void onFailure(Throwable t) {
312 proxyFuture.setException(t);
318 future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
324 private void checkModificationState() {
325 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
326 "Modification operation on read-only transaction is not allowed");
327 Preconditions.checkState(!inReadyState,
328 "Transaction is sealed - further modifications are not allowed");
332 public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
334 checkModificationState();
336 LOG.debug("Tx {} write {}", identifier, path);
338 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
339 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
340 if(transactionContext != null) {
341 transactionContext.writeData(path, data);
343 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
344 // callback to be executed after the Tx is created.
345 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
347 public void invoke(TransactionContext transactionContext) {
348 transactionContext.writeData(path, data);
355 public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
357 checkModificationState();
359 LOG.debug("Tx {} merge {}", identifier, path);
361 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
362 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
363 if(transactionContext != null) {
364 transactionContext.mergeData(path, data);
366 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
367 // callback to be executed after the Tx is created.
368 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
370 public void invoke(TransactionContext transactionContext) {
371 transactionContext.mergeData(path, data);
378 public void delete(final YangInstanceIdentifier path) {
380 checkModificationState();
382 LOG.debug("Tx {} delete {}", identifier, path);
384 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
385 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
386 if(transactionContext != null) {
387 transactionContext.deleteData(path);
389 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
390 // callback to be executed after the Tx is created.
391 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
393 public void invoke(TransactionContext transactionContext) {
394 transactionContext.deleteData(path);
401 public DOMStoreThreePhaseCommitCohort ready() {
403 checkModificationState();
407 LOG.debug("Tx {} Readying {} transactions for commit", identifier,
408 txFutureCallbackMap.size());
410 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
412 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
414 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
415 txFutureCallback.getShardName());
417 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
418 if(transactionContext != null) {
419 cohortFutures.add(transactionContext.readyTransaction());
421 // The shard Tx hasn't been created yet so create a promise to ready the Tx later
422 // after it's created.
423 final Promise<ActorSelection> cohortPromise = akka.dispatch.Futures.promise();
424 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
426 public void invoke(TransactionContext transactionContext) {
427 cohortPromise.completeWith(transactionContext.readyTransaction());
431 cohortFutures.add(cohortPromise.future());
435 onTransactionReady(cohortFutures);
437 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
438 identifier.toString());
442 * Method for derived classes to be notified when the transaction has been readied.
444 * @param cohortFutures the cohort Futures for each shard transaction.
446 protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
450 * Method called to send a CreateTransaction message to a shard.
452 * @param shard the shard actor to send to
453 * @param serializedCreateMessage the serialized message to send
454 * @return the response Future
456 protected Future<Object> sendCreateTransaction(ActorSelection shard,
457 Object serializedCreateMessage) {
458 return actorContext.executeOperationAsync(shard, serializedCreateMessage);
462 public Object getIdentifier() {
463 return this.identifier;
467 public void close() {
468 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
469 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
470 if(transactionContext != null) {
471 transactionContext.closeTransaction();
473 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
475 public void invoke(TransactionContext transactionContext) {
476 transactionContext.closeTransaction();
482 txFutureCallbackMap.clear();
484 if(transactionType == TransactionType.READ_ONLY) {
485 remoteTransactionActors.clear();
486 remoteTransactionActorsMB.set(true);
490 private String shardNameFromIdentifier(YangInstanceIdentifier path){
491 return ShardStrategyFactory.getStrategy(path).findShard(path);
494 private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
495 String shardName = shardNameFromIdentifier(path);
496 TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
497 if(txFutureCallback == null) {
498 Future<ActorSelection> findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName);
500 final TransactionFutureCallback newTxFutureCallback =
501 new TransactionFutureCallback(shardName);
503 txFutureCallback = newTxFutureCallback;
504 txFutureCallbackMap.put(shardName, txFutureCallback);
506 findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
508 public void onComplete(Throwable failure, ActorSelection primaryShard) {
509 if(failure != null) {
510 newTxFutureCallback.onComplete(failure, null);
512 newTxFutureCallback.setPrimaryShard(primaryShard);
515 }, actorContext.getActorSystem().dispatcher());
518 return txFutureCallback;
521 public String getTransactionChainId() {
522 return transactionChainId;
526 * Interface for a transaction operation to be invoked later.
528 private static interface TransactionOperation {
529 void invoke(TransactionContext transactionContext);
533 * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
534 * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
535 * retry task after a short delay.
537 * The end result from a completed CreateTransaction message is a TransactionContext that is
538 * used to perform transaction operations. Transaction operations that occur before the
539 * CreateTransaction completes are cache and executed once the CreateTransaction completes,
540 * successfully or not.
542 private class TransactionFutureCallback extends OnComplete<Object> {
545 * The list of transaction operations to execute once the CreateTransaction completes.
547 @GuardedBy("txOperationsOnComplete")
548 private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
551 * The TransactionContext resulting from the CreateTransaction reply.
553 private volatile TransactionContext transactionContext;
556 * The target primary shard.
558 private volatile ActorSelection primaryShard;
560 private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
561 getShardLeaderElectionTimeout().duration().toMillis() /
562 CREATE_TX_TRY_INTERVAL.toMillis());
564 private final String shardName;
566 TransactionFutureCallback(String shardName) {
567 this.shardName = shardName;
570 String getShardName() {
574 TransactionContext getTransactionContext() {
575 return transactionContext;
580 * Sets the target primary shard and initiates a CreateTransaction try.
582 void setPrimaryShard(ActorSelection primaryShard) {
583 LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
585 this.primaryShard = primaryShard;
586 tryCreateTransaction();
590 * Adds a TransactionOperation to be executed after the CreateTransaction completes.
592 void addTxOperationOnComplete(TransactionOperation operation) {
593 synchronized(txOperationsOnComplete) {
594 if(transactionContext == null) {
595 LOG.debug("Tx {} Adding operation on complete {}", identifier);
597 txOperationsOnComplete.add(operation);
599 operation.invoke(transactionContext);
605 * Performs a CreateTransaction try async.
607 private void tryCreateTransaction() {
608 Future<Object> createTxFuture = sendCreateTransaction(primaryShard,
609 new CreateTransaction(identifier.toString(),
610 TransactionProxy.this.transactionType.ordinal(),
611 getTransactionChainId()).toSerializable());
613 createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher());
617 public void onComplete(Throwable failure, Object response) {
618 if(failure instanceof NoShardLeaderException) {
619 // There's no leader for the shard yet - schedule and try again, unless we're out
620 // of retries. Note: createTxTries is volatile as it may be written by different
621 // threads however not concurrently, therefore decrementing it non-atomically here
623 if(--createTxTries > 0) {
624 LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
625 identifier, shardName);
627 actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
631 tryCreateTransaction();
633 }, actorContext.getActorSystem().dispatcher());
638 // Create the TransactionContext from the response or failure and execute delayed
639 // TransactionOperations. This entire section is done atomically (ie synchronized) with
640 // respect to #addTxOperationOnComplete to handle timing issues and ensure no
641 // TransactionOperation is missed and that they are processed in the order they occurred.
642 synchronized(txOperationsOnComplete) {
643 if(failure != null) {
644 LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
645 failure.getMessage());
647 transactionContext = new NoOpTransactionContext(failure, identifier);
648 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
649 createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
651 IllegalArgumentException exception = new IllegalArgumentException(String.format(
652 "Invalid reply type %s for CreateTransaction", response.getClass()));
654 transactionContext = new NoOpTransactionContext(exception, identifier);
657 for(TransactionOperation oper: txOperationsOnComplete) {
658 oper.invoke(transactionContext);
661 txOperationsOnComplete.clear();
665 private void createValidTransactionContext(CreateTransactionReply reply) {
666 String transactionPath = reply.getTransactionPath();
668 LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
670 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
672 if (transactionType == TransactionType.READ_ONLY) {
673 // Add the actor to the remoteTransactionActors list for access by the
674 // cleanup PhantonReference.
675 remoteTransactionActors.add(transactionActor);
677 // Write to the memory barrier volatile to publish the above update to the
678 // remoteTransactionActors list for thread visibility.
679 remoteTransactionActorsMB.set(true);
682 // TxActor is always created where the leader of the shard is.
683 // Check if TxActor is created in the same node
684 boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
686 transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
687 actorContext, schemaContext, isTxActorLocal, reply.getVersion());
691 private interface TransactionContext {
692 void closeTransaction();
694 Future<ActorSelection> readyTransaction();
696 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
698 void deleteData(YangInstanceIdentifier path);
700 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
702 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
703 final YangInstanceIdentifier path);
705 CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
707 List<Future<Object>> getRecordedOperationFutures();
710 private static abstract class AbstractTransactionContext implements TransactionContext {
712 protected final TransactionIdentifier identifier;
713 protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
715 AbstractTransactionContext(TransactionIdentifier identifier) {
716 this.identifier = identifier;
720 public List<Future<Object>> getRecordedOperationFutures() {
721 return recordedOperationFutures;
725 private static class TransactionContextImpl extends AbstractTransactionContext {
726 private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
728 private final ActorContext actorContext;
729 private final SchemaContext schemaContext;
730 private final String transactionPath;
731 private final ActorSelection actor;
732 private final boolean isTxActorLocal;
733 private final int remoteTransactionVersion;
735 private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
736 ActorContext actorContext, SchemaContext schemaContext,
737 boolean isTxActorLocal, int remoteTransactionVersion) {
739 this.transactionPath = transactionPath;
741 this.actorContext = actorContext;
742 this.schemaContext = schemaContext;
743 this.isTxActorLocal = isTxActorLocal;
744 this.remoteTransactionVersion = remoteTransactionVersion;
747 private ActorSelection getActor() {
752 public void closeTransaction() {
753 LOG.debug("Tx {} closeTransaction called", identifier);
755 actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
759 public Future<ActorSelection> readyTransaction() {
760 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
761 identifier, recordedOperationFutures.size());
763 // Send the ReadyTransaction message to the Tx actor.
765 ReadyTransaction readyTransaction = new ReadyTransaction();
766 final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
767 isTxActorLocal ? readyTransaction : readyTransaction.toSerializable());
769 // Combine all the previously recorded put/merge/delete operation reply Futures and the
770 // ReadyTransactionReply Future into one Future. If any one fails then the combined
771 // Future will fail. We need all prior operations and the ready operation to succeed
772 // in order to attempt commit.
774 List<Future<Object>> futureList =
775 Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
776 futureList.addAll(recordedOperationFutures);
777 futureList.add(replyFuture);
779 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
780 actorContext.getActorSystem().dispatcher());
782 // Transform the combined Future into a Future that returns the cohort actor path from
783 // the ReadyTransactionReply. That's the end result of the ready operation.
785 return combinedFutures.transform(new Mapper<Iterable<Object>, ActorSelection>() {
787 public ActorSelection checkedApply(Iterable<Object> notUsed) {
788 LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
791 // At this point all the Futures succeeded and we need to extract the cohort
792 // actor path from the ReadyTransactionReply. For the recorded operations, they
793 // don't return any data so we're only interested that they completed
794 // successfully. We could be paranoid and verify the correct reply types but
795 // that really should never happen so it's not worth the overhead of
796 // de-serializing each reply.
798 // Note the Future get call here won't block as it's complete.
799 Object serializedReadyReply = replyFuture.value().get().get();
800 if (serializedReadyReply instanceof ReadyTransactionReply) {
801 return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
803 } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
804 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
805 String cohortPath = reply.getCohortPath();
807 // In Helium we used to return the local path of the actor which represented
808 // a remote ThreePhaseCommitCohort. The local path would then be converted to
809 // a remote path using this resolvePath method. To maintain compatibility with
810 // a Helium node we need to continue to do this conversion.
811 // At some point in the future when upgrades from Helium are not supported
812 // we could remove this code to resolvePath and just use the cohortPath as the
813 // resolved cohortPath
814 if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
815 cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
818 return actorContext.actorSelection(cohortPath);
821 // Throwing an exception here will fail the Future.
822 throw new IllegalArgumentException(String.format("Invalid reply type {}",
823 serializedReadyReply.getClass()));
826 }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
830 public void deleteData(YangInstanceIdentifier path) {
831 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
833 DeleteData deleteData = new DeleteData(path);
834 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
835 isTxActorLocal ? deleteData : deleteData.toSerializable()));
839 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
840 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
842 MergeData mergeData = new MergeData(path, data, schemaContext);
843 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
844 isTxActorLocal ? mergeData : mergeData.toSerializable()));
848 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
849 LOG.debug("Tx {} writeData called path = {}", identifier, path);
851 WriteData writeData = new WriteData(path, data, schemaContext);
852 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
853 isTxActorLocal ? writeData : writeData.toSerializable()));
857 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
858 final YangInstanceIdentifier path) {
860 LOG.debug("Tx {} readData called path = {}", identifier, path);
862 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
864 // If there were any previous recorded put/merge/delete operation reply Futures then we
865 // must wait for them to successfully complete. This is necessary to honor the read
866 // uncommitted semantics of the public API contract. If any one fails then fail the read.
868 if(recordedOperationFutures.isEmpty()) {
869 finishReadData(path, returnFuture);
871 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
872 identifier, recordedOperationFutures.size());
874 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
875 // Futures#sequence accesses the passed List on a different thread, as
876 // recordedOperationFutures is not synchronized.
878 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
879 Lists.newArrayList(recordedOperationFutures),
880 actorContext.getActorSystem().dispatcher());
882 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
884 public void onComplete(Throwable failure, Iterable<Object> notUsed)
886 if(failure != null) {
887 LOG.debug("Tx {} readData: a recorded operation failed: {}",
888 identifier, failure);
889 returnFuture.setException(new ReadFailedException(
890 "The read could not be performed because a previous put, merge,"
891 + "or delete operation failed", failure));
893 finishReadData(path, returnFuture);
898 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
901 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
904 private void finishReadData(final YangInstanceIdentifier path,
905 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
907 LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
909 OnComplete<Object> onComplete = new OnComplete<Object>() {
911 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
912 if(failure != null) {
913 LOG.debug("Tx {} read operation failed: {}", identifier, failure);
914 returnFuture.setException(new ReadFailedException(
915 "Error reading data for path " + path, failure));
918 LOG.debug("Tx {} read operation succeeded", identifier, failure);
920 if (readResponse instanceof ReadDataReply) {
921 ReadDataReply reply = (ReadDataReply) readResponse;
922 returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
924 } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
925 ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
926 returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
929 returnFuture.setException(new ReadFailedException(
930 "Invalid response reading data for path " + path));
936 ReadData readData = new ReadData(path);
937 Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
938 isTxActorLocal ? readData : readData.toSerializable());
940 readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
944 public CheckedFuture<Boolean, ReadFailedException> dataExists(
945 final YangInstanceIdentifier path) {
947 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
949 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
951 // If there were any previous recorded put/merge/delete operation reply Futures then we
952 // must wait for them to successfully complete. This is necessary to honor the read
953 // uncommitted semantics of the public API contract. If any one fails then fail this
956 if(recordedOperationFutures.isEmpty()) {
957 finishDataExists(path, returnFuture);
959 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
960 identifier, recordedOperationFutures.size());
962 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
963 // Futures#sequence accesses the passed List on a different thread, as
964 // recordedOperationFutures is not synchronized.
966 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
967 Lists.newArrayList(recordedOperationFutures),
968 actorContext.getActorSystem().dispatcher());
969 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
971 public void onComplete(Throwable failure, Iterable<Object> notUsed)
973 if(failure != null) {
974 LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
975 identifier, failure);
976 returnFuture.setException(new ReadFailedException(
977 "The data exists could not be performed because a previous "
978 + "put, merge, or delete operation failed", failure));
980 finishDataExists(path, returnFuture);
985 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
988 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
991 private void finishDataExists(final YangInstanceIdentifier path,
992 final SettableFuture<Boolean> returnFuture) {
994 LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
996 OnComplete<Object> onComplete = new OnComplete<Object>() {
998 public void onComplete(Throwable failure, Object response) throws Throwable {
999 if(failure != null) {
1000 LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
1001 returnFuture.setException(new ReadFailedException(
1002 "Error checking data exists for path " + path, failure));
1004 LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
1006 if (response instanceof DataExistsReply) {
1007 returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
1009 } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
1010 returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
1013 returnFuture.setException(new ReadFailedException(
1014 "Invalid response checking exists for path " + path));
1020 DataExists dataExists = new DataExists(path);
1021 Future<Object> future = actorContext.executeOperationAsync(getActor(),
1022 isTxActorLocal ? dataExists : dataExists.toSerializable());
1024 future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
1028 private static class NoOpTransactionContext extends AbstractTransactionContext {
1030 private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
1032 private final Throwable failure;
1034 public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){
1036 this.failure = failure;
1040 public void closeTransaction() {
1041 LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
1045 public Future<ActorSelection> readyTransaction() {
1046 LOG.debug("Tx {} readyTransaction called", identifier);
1047 return akka.dispatch.Futures.failed(failure);
1051 public void deleteData(YangInstanceIdentifier path) {
1052 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
1056 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
1057 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
1061 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
1062 LOG.debug("Tx {} writeData called path = {}", identifier, path);
1066 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
1067 YangInstanceIdentifier path) {
1068 LOG.debug("Tx {} readData called path = {}", identifier, path);
1069 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
1070 "Error reading data for path " + path, failure));
1074 public CheckedFuture<Boolean, ReadFailedException> dataExists(
1075 YangInstanceIdentifier path) {
1076 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
1077 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
1078 "Error checking exists for path " + path, failure));