2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.FinalizablePhantomReference;
16 import com.google.common.base.FinalizableReferenceQueue;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.CheckedFuture;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.concurrent.Future;
50 import scala.concurrent.Promise;
51 import scala.concurrent.duration.FiniteDuration;
53 import javax.annotation.concurrent.GuardedBy;
54 import java.util.HashMap;
55 import java.util.List;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicBoolean;
60 import java.util.concurrent.atomic.AtomicLong;
63 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
65 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
66 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
67 * be created on each of those shards by the TransactionProxy
70 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
71 * shards will be executed.
74 public class TransactionProxy implements DOMStoreReadWriteTransaction {
76 public static enum TransactionType {
82 static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
83 new Mapper<Throwable, Throwable>() {
85 public Throwable apply(Throwable failure) {
90 private static final AtomicLong counter = new AtomicLong();
92 private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
95 * Time interval in between transaction create retries.
97 private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
98 FiniteDuration.create(1, TimeUnit.SECONDS);
101 * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
102 * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
103 * trickery to clean up its internal thread when the bundle is unloaded.
105 private static final FinalizableReferenceQueue phantomReferenceQueue =
106 new FinalizableReferenceQueue();
109 * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
110 * necessary because PhantomReferences need a hard reference so they're not garbage collected.
111 * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
112 * and thus becomes eligible for garbage collection.
114 private static final Map<TransactionProxyCleanupPhantomReference,
115 TransactionProxyCleanupPhantomReference> phantomReferenceCache =
116 new ConcurrentHashMap<>();
119 * A PhantomReference that closes remote transactions for a TransactionProxy when it's
120 * garbage collected. This is used for read-only transactions as they're not explicitly closed
121 * by clients. So the only way to detect that a transaction is no longer in use and it's safe
122 * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
123 * but TransactionProxy instances should generally be short-lived enough to avoid being moved
124 * to the old generation space and thus should be cleaned up in a timely manner as the GC
125 * runs on the young generation (eden, swap1...) space much more frequently.
127 private static class TransactionProxyCleanupPhantomReference
128 extends FinalizablePhantomReference<TransactionProxy> {
130 private final List<ActorSelection> remoteTransactionActors;
131 private final AtomicBoolean remoteTransactionActorsMB;
132 private final ActorContext actorContext;
133 private final TransactionIdentifier identifier;
135 protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
136 super(referent, phantomReferenceQueue);
138 // Note we need to cache the relevant fields from the TransactionProxy as we can't
139 // have a hard reference to the TransactionProxy instance itself.
141 remoteTransactionActors = referent.remoteTransactionActors;
142 remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
143 actorContext = referent.actorContext;
144 identifier = referent.identifier;
148 public void finalizeReferent() {
149 LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
150 remoteTransactionActors.size(), identifier);
152 phantomReferenceCache.remove(this);
154 // Access the memory barrier volatile to ensure all previous updates to the
155 // remoteTransactionActors list are visible to this thread.
157 if(remoteTransactionActorsMB.get()) {
158 for(ActorSelection actor : remoteTransactionActors) {
159 LOG.trace("Sending CloseTransaction to {}", actor);
160 actorContext.sendOperationAsync(actor,
161 new CloseTransaction().toSerializable());
168 * Stores the remote Tx actors for each requested data store path to be used by the
169 * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
170 * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
171 * remoteTransactionActors list so they will be visible to the thread accessing the
174 private List<ActorSelection> remoteTransactionActors;
175 private AtomicBoolean remoteTransactionActorsMB;
178 * Stores the create transaction results per shard.
180 private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
182 private final TransactionType transactionType;
183 private final ActorContext actorContext;
184 private final TransactionIdentifier identifier;
185 private final TransactionChainProxy transactionChainProxy;
186 private final SchemaContext schemaContext;
187 private boolean inReadyState;
189 public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
190 this(actorContext, transactionType, null);
193 public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
194 TransactionChainProxy transactionChainProxy) {
195 this.actorContext = Preconditions.checkNotNull(actorContext,
196 "actorContext should not be null");
197 this.transactionType = Preconditions.checkNotNull(transactionType,
198 "transactionType should not be null");
199 this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
200 "schemaContext should not be null");
201 this.transactionChainProxy = transactionChainProxy;
203 String memberName = actorContext.getCurrentMemberName();
204 if(memberName == null){
205 memberName = "UNKNOWN-MEMBER";
208 this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
209 counter.getAndIncrement()).build();
211 if(transactionType == TransactionType.READ_ONLY) {
212 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
213 // to close the remote Tx's when this instance is no longer in use and is garbage
216 remoteTransactionActors = Lists.newArrayList();
217 remoteTransactionActorsMB = new AtomicBoolean();
219 TransactionProxyCleanupPhantomReference cleanup =
220 new TransactionProxyCleanupPhantomReference(this);
221 phantomReferenceCache.put(cleanup, cleanup);
224 LOG.debug("Created txn {} of type {}", identifier, transactionType);
228 List<Future<Object>> getRecordedOperationFutures() {
229 List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
230 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
231 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
232 if(transactionContext != null) {
233 recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
237 return recordedOperationFutures;
241 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
242 final YangInstanceIdentifier path) {
244 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
245 "Read operation on write-only transaction is not allowed");
247 LOG.debug("Tx {} read {}", identifier, path);
249 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
250 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
252 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future;
253 if(transactionContext != null) {
254 future = transactionContext.readData(path);
256 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
257 // callback to be executed after the Tx is created.
258 final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
259 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
261 public void invoke(TransactionContext transactionContext) {
262 Futures.addCallback(transactionContext.readData(path),
263 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
265 public void onSuccess(Optional<NormalizedNode<?, ?>> data) {
266 proxyFuture.set(data);
270 public void onFailure(Throwable t) {
271 proxyFuture.setException(t);
277 future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
284 public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
286 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
287 "Exists operation on write-only transaction is not allowed");
289 LOG.debug("Tx {} exists {}", identifier, path);
291 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
292 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
294 CheckedFuture<Boolean, ReadFailedException> future;
295 if(transactionContext != null) {
296 future = transactionContext.dataExists(path);
298 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
299 // callback to be executed after the Tx is created.
300 final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
301 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
303 public void invoke(TransactionContext transactionContext) {
304 Futures.addCallback(transactionContext.dataExists(path),
305 new FutureCallback<Boolean>() {
307 public void onSuccess(Boolean exists) {
308 proxyFuture.set(exists);
312 public void onFailure(Throwable t) {
313 proxyFuture.setException(t);
319 future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
325 private void checkModificationState() {
326 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
327 "Modification operation on read-only transaction is not allowed");
328 Preconditions.checkState(!inReadyState,
329 "Transaction is sealed - further modifications are not allowed");
333 public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
335 checkModificationState();
337 LOG.debug("Tx {} write {}", identifier, path);
339 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
340 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
341 if(transactionContext != null) {
342 transactionContext.writeData(path, data);
344 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
345 // callback to be executed after the Tx is created.
346 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
348 public void invoke(TransactionContext transactionContext) {
349 transactionContext.writeData(path, data);
356 public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
358 checkModificationState();
360 LOG.debug("Tx {} merge {}", identifier, path);
362 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
363 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
364 if(transactionContext != null) {
365 transactionContext.mergeData(path, data);
367 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
368 // callback to be executed after the Tx is created.
369 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
371 public void invoke(TransactionContext transactionContext) {
372 transactionContext.mergeData(path, data);
379 public void delete(final YangInstanceIdentifier path) {
381 checkModificationState();
383 LOG.debug("Tx {} delete {}", identifier, path);
385 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
386 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
387 if(transactionContext != null) {
388 transactionContext.deleteData(path);
390 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
391 // callback to be executed after the Tx is created.
392 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
394 public void invoke(TransactionContext transactionContext) {
395 transactionContext.deleteData(path);
402 public DOMStoreThreePhaseCommitCohort ready() {
404 checkModificationState();
408 LOG.debug("Tx {} Readying {} transactions for commit", identifier,
409 txFutureCallbackMap.size());
411 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
413 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
415 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
416 txFutureCallback.getShardName());
418 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
419 if(transactionContext != null) {
420 cohortFutures.add(transactionContext.readyTransaction());
422 // The shard Tx hasn't been created yet so create a promise to ready the Tx later
423 // after it's created.
424 final Promise<ActorSelection> cohortPromise = akka.dispatch.Futures.promise();
425 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
427 public void invoke(TransactionContext transactionContext) {
428 cohortPromise.completeWith(transactionContext.readyTransaction());
432 cohortFutures.add(cohortPromise.future());
436 if(transactionChainProxy != null){
437 transactionChainProxy.onTransactionReady(cohortFutures);
440 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
441 identifier.toString());
445 public Object getIdentifier() {
446 return this.identifier;
450 public void close() {
451 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
452 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
453 if(transactionContext != null) {
454 transactionContext.closeTransaction();
456 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
458 public void invoke(TransactionContext transactionContext) {
459 transactionContext.closeTransaction();
465 txFutureCallbackMap.clear();
467 if(transactionType == TransactionType.READ_ONLY) {
468 remoteTransactionActors.clear();
469 remoteTransactionActorsMB.set(true);
473 private String shardNameFromIdentifier(YangInstanceIdentifier path){
474 return ShardStrategyFactory.getStrategy(path).findShard(path);
477 private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
478 String shardName = shardNameFromIdentifier(path);
479 TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
480 if(txFutureCallback == null) {
481 Future<ActorSelection> findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName);
483 final TransactionFutureCallback newTxFutureCallback =
484 new TransactionFutureCallback(shardName);
486 txFutureCallback = newTxFutureCallback;
487 txFutureCallbackMap.put(shardName, txFutureCallback);
489 findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
491 public void onComplete(Throwable failure, ActorSelection primaryShard) {
492 if(failure != null) {
493 newTxFutureCallback.onComplete(failure, null);
495 newTxFutureCallback.setPrimaryShard(primaryShard);
498 }, actorContext.getActorSystem().dispatcher());
501 return txFutureCallback;
504 public String getTransactionChainId() {
505 if(transactionChainProxy == null){
508 return transactionChainProxy.getTransactionChainId();
512 * Interface for a transaction operation to be invoked later.
514 private static interface TransactionOperation {
515 void invoke(TransactionContext transactionContext);
519 * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
520 * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
521 * retry task after a short delay.
523 * The end result from a completed CreateTransaction message is a TransactionContext that is
524 * used to perform transaction operations. Transaction operations that occur before the
525 * CreateTransaction completes are cache and executed once the CreateTransaction completes,
526 * successfully or not.
528 private class TransactionFutureCallback extends OnComplete<Object> {
531 * The list of transaction operations to execute once the CreateTransaction completes.
533 @GuardedBy("txOperationsOnComplete")
534 private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
537 * The TransactionContext resulting from the CreateTransaction reply.
539 private volatile TransactionContext transactionContext;
542 * The target primary shard.
544 private volatile ActorSelection primaryShard;
546 private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
547 getShardLeaderElectionTimeout().duration().toMillis() /
548 CREATE_TX_TRY_INTERVAL.toMillis());
550 private final String shardName;
552 TransactionFutureCallback(String shardName) {
553 this.shardName = shardName;
556 String getShardName() {
560 TransactionContext getTransactionContext() {
561 return transactionContext;
566 * Sets the target primary shard and initiates a CreateTransaction try.
568 void setPrimaryShard(ActorSelection primaryShard) {
569 LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
571 this.primaryShard = primaryShard;
572 tryCreateTransaction();
576 * Adds a TransactionOperation to be executed after the CreateTransaction completes.
578 void addTxOperationOnComplete(TransactionOperation operation) {
579 synchronized(txOperationsOnComplete) {
580 if(transactionContext == null) {
581 LOG.debug("Tx {} Adding operation on complete {}", identifier);
583 txOperationsOnComplete.add(operation);
585 operation.invoke(transactionContext);
591 * Performs a CreateTransaction try async.
593 private void tryCreateTransaction() {
594 Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
595 new CreateTransaction(identifier.toString(),
596 TransactionProxy.this.transactionType.ordinal(),
597 getTransactionChainId()).toSerializable());
599 createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher());
603 public void onComplete(Throwable failure, Object response) {
604 if(failure instanceof NoShardLeaderException) {
605 // There's no leader for the shard yet - schedule and try again, unless we're out
606 // of retries. Note: createTxTries is volatile as it may be written by different
607 // threads however not concurrently, therefore decrementing it non-atomically here
609 if(--createTxTries > 0) {
610 LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
611 identifier, shardName);
613 actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
617 tryCreateTransaction();
619 }, actorContext.getActorSystem().dispatcher());
624 // Create the TransactionContext from the response or failure and execute delayed
625 // TransactionOperations. This entire section is done atomically (ie synchronized) with
626 // respect to #addTxOperationOnComplete to handle timing issues and ensure no
627 // TransactionOperation is missed and that they are processed in the order they occurred.
628 synchronized(txOperationsOnComplete) {
629 if(failure != null) {
630 LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
631 failure.getMessage());
633 transactionContext = new NoOpTransactionContext(failure, identifier);
634 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
635 createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
637 IllegalArgumentException exception = new IllegalArgumentException(String.format(
638 "Invalid reply type %s for CreateTransaction", response.getClass()));
640 transactionContext = new NoOpTransactionContext(exception, identifier);
643 for(TransactionOperation oper: txOperationsOnComplete) {
644 oper.invoke(transactionContext);
647 txOperationsOnComplete.clear();
651 private void createValidTransactionContext(CreateTransactionReply reply) {
652 String transactionPath = reply.getTransactionPath();
654 LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
656 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
658 if (transactionType == TransactionType.READ_ONLY) {
659 // Add the actor to the remoteTransactionActors list for access by the
660 // cleanup PhantonReference.
661 remoteTransactionActors.add(transactionActor);
663 // Write to the memory barrier volatile to publish the above update to the
664 // remoteTransactionActors list for thread visibility.
665 remoteTransactionActorsMB.set(true);
668 // TxActor is always created where the leader of the shard is.
669 // Check if TxActor is created in the same node
670 boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
672 transactionContext = new TransactionContextImpl(transactionPath, transactionActor, identifier,
673 actorContext, schemaContext, isTxActorLocal, reply.getVersion());
677 private interface TransactionContext {
678 void closeTransaction();
680 Future<ActorSelection> readyTransaction();
682 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
684 void deleteData(YangInstanceIdentifier path);
686 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
688 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
689 final YangInstanceIdentifier path);
691 CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
693 List<Future<Object>> getRecordedOperationFutures();
696 private static abstract class AbstractTransactionContext implements TransactionContext {
698 protected final TransactionIdentifier identifier;
699 protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
701 AbstractTransactionContext(TransactionIdentifier identifier) {
702 this.identifier = identifier;
706 public List<Future<Object>> getRecordedOperationFutures() {
707 return recordedOperationFutures;
711 private static class TransactionContextImpl extends AbstractTransactionContext {
712 private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
714 private final ActorContext actorContext;
715 private final SchemaContext schemaContext;
716 private final String transactionPath;
717 private final ActorSelection actor;
718 private final boolean isTxActorLocal;
719 private final int remoteTransactionVersion;
721 private TransactionContextImpl(String transactionPath, ActorSelection actor, TransactionIdentifier identifier,
722 ActorContext actorContext, SchemaContext schemaContext,
723 boolean isTxActorLocal, int remoteTransactionVersion) {
725 this.transactionPath = transactionPath;
727 this.actorContext = actorContext;
728 this.schemaContext = schemaContext;
729 this.isTxActorLocal = isTxActorLocal;
730 this.remoteTransactionVersion = remoteTransactionVersion;
733 private ActorSelection getActor() {
738 public void closeTransaction() {
739 LOG.debug("Tx {} closeTransaction called", identifier);
741 actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
745 public Future<ActorSelection> readyTransaction() {
746 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
747 identifier, recordedOperationFutures.size());
749 // Send the ReadyTransaction message to the Tx actor.
751 ReadyTransaction readyTransaction = new ReadyTransaction();
752 final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
753 isTxActorLocal ? readyTransaction : readyTransaction.toSerializable());
755 // Combine all the previously recorded put/merge/delete operation reply Futures and the
756 // ReadyTransactionReply Future into one Future. If any one fails then the combined
757 // Future will fail. We need all prior operations and the ready operation to succeed
758 // in order to attempt commit.
760 List<Future<Object>> futureList =
761 Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
762 futureList.addAll(recordedOperationFutures);
763 futureList.add(replyFuture);
765 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
766 actorContext.getActorSystem().dispatcher());
768 // Transform the combined Future into a Future that returns the cohort actor path from
769 // the ReadyTransactionReply. That's the end result of the ready operation.
771 return combinedFutures.transform(new Mapper<Iterable<Object>, ActorSelection>() {
773 public ActorSelection checkedApply(Iterable<Object> notUsed) {
774 LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
777 // At this point all the Futures succeeded and we need to extract the cohort
778 // actor path from the ReadyTransactionReply. For the recorded operations, they
779 // don't return any data so we're only interested that they completed
780 // successfully. We could be paranoid and verify the correct reply types but
781 // that really should never happen so it's not worth the overhead of
782 // de-serializing each reply.
784 // Note the Future get call here won't block as it's complete.
785 Object serializedReadyReply = replyFuture.value().get().get();
786 if (serializedReadyReply instanceof ReadyTransactionReply) {
787 return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
789 } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
790 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
791 String cohortPath = reply.getCohortPath();
793 // In Helium we used to return the local path of the actor which represented
794 // a remote ThreePhaseCommitCohort. The local path would then be converted to
795 // a remote path using this resolvePath method. To maintain compatibility with
796 // a Helium node we need to continue to do this conversion.
797 // At some point in the future when upgrades from Helium are not supported
798 // we could remove this code to resolvePath and just use the cohortPath as the
799 // resolved cohortPath
800 if(TransactionContextImpl.this.remoteTransactionVersion < CreateTransaction.HELIUM_1_VERSION) {
801 cohortPath = actorContext.resolvePath(transactionPath, cohortPath);
804 return actorContext.actorSelection(cohortPath);
807 // Throwing an exception here will fail the Future.
808 throw new IllegalArgumentException(String.format("Invalid reply type {}",
809 serializedReadyReply.getClass()));
812 }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
816 public void deleteData(YangInstanceIdentifier path) {
817 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
819 DeleteData deleteData = new DeleteData(path);
820 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
821 isTxActorLocal ? deleteData : deleteData.toSerializable()));
825 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
826 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
828 MergeData mergeData = new MergeData(path, data, schemaContext);
829 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
830 isTxActorLocal ? mergeData : mergeData.toSerializable()));
834 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
835 LOG.debug("Tx {} writeData called path = {}", identifier, path);
837 WriteData writeData = new WriteData(path, data, schemaContext);
838 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
839 isTxActorLocal ? writeData : writeData.toSerializable()));
843 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
844 final YangInstanceIdentifier path) {
846 LOG.debug("Tx {} readData called path = {}", identifier, path);
848 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
850 // If there were any previous recorded put/merge/delete operation reply Futures then we
851 // must wait for them to successfully complete. This is necessary to honor the read
852 // uncommitted semantics of the public API contract. If any one fails then fail the read.
854 if(recordedOperationFutures.isEmpty()) {
855 finishReadData(path, returnFuture);
857 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
858 identifier, recordedOperationFutures.size());
860 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
861 // Futures#sequence accesses the passed List on a different thread, as
862 // recordedOperationFutures is not synchronized.
864 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
865 Lists.newArrayList(recordedOperationFutures),
866 actorContext.getActorSystem().dispatcher());
868 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
870 public void onComplete(Throwable failure, Iterable<Object> notUsed)
872 if(failure != null) {
873 LOG.debug("Tx {} readData: a recorded operation failed: {}",
874 identifier, failure);
875 returnFuture.setException(new ReadFailedException(
876 "The read could not be performed because a previous put, merge,"
877 + "or delete operation failed", failure));
879 finishReadData(path, returnFuture);
884 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
887 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
890 private void finishReadData(final YangInstanceIdentifier path,
891 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
893 LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
895 OnComplete<Object> onComplete = new OnComplete<Object>() {
897 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
898 if(failure != null) {
899 LOG.debug("Tx {} read operation failed: {}", identifier, failure);
900 returnFuture.setException(new ReadFailedException(
901 "Error reading data for path " + path, failure));
904 LOG.debug("Tx {} read operation succeeded", identifier, failure);
906 if (readResponse instanceof ReadDataReply) {
907 ReadDataReply reply = (ReadDataReply) readResponse;
908 returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
910 } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
911 ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
912 returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
915 returnFuture.setException(new ReadFailedException(
916 "Invalid response reading data for path " + path));
922 ReadData readData = new ReadData(path);
923 Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
924 isTxActorLocal ? readData : readData.toSerializable());
926 readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
930 public CheckedFuture<Boolean, ReadFailedException> dataExists(
931 final YangInstanceIdentifier path) {
933 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
935 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
937 // If there were any previous recorded put/merge/delete operation reply Futures then we
938 // must wait for them to successfully complete. This is necessary to honor the read
939 // uncommitted semantics of the public API contract. If any one fails then fail this
942 if(recordedOperationFutures.isEmpty()) {
943 finishDataExists(path, returnFuture);
945 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
946 identifier, recordedOperationFutures.size());
948 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
949 // Futures#sequence accesses the passed List on a different thread, as
950 // recordedOperationFutures is not synchronized.
952 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
953 Lists.newArrayList(recordedOperationFutures),
954 actorContext.getActorSystem().dispatcher());
955 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
957 public void onComplete(Throwable failure, Iterable<Object> notUsed)
959 if(failure != null) {
960 LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
961 identifier, failure);
962 returnFuture.setException(new ReadFailedException(
963 "The data exists could not be performed because a previous "
964 + "put, merge, or delete operation failed", failure));
966 finishDataExists(path, returnFuture);
971 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
974 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
977 private void finishDataExists(final YangInstanceIdentifier path,
978 final SettableFuture<Boolean> returnFuture) {
980 LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
982 OnComplete<Object> onComplete = new OnComplete<Object>() {
984 public void onComplete(Throwable failure, Object response) throws Throwable {
985 if(failure != null) {
986 LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
987 returnFuture.setException(new ReadFailedException(
988 "Error checking data exists for path " + path, failure));
990 LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
992 if (response instanceof DataExistsReply) {
993 returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
995 } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
996 returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
999 returnFuture.setException(new ReadFailedException(
1000 "Invalid response checking exists for path " + path));
1006 DataExists dataExists = new DataExists(path);
1007 Future<Object> future = actorContext.executeOperationAsync(getActor(),
1008 isTxActorLocal ? dataExists : dataExists.toSerializable());
1010 future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
1014 private static class NoOpTransactionContext extends AbstractTransactionContext {
1016 private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
1018 private final Throwable failure;
1020 public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){
1022 this.failure = failure;
1026 public void closeTransaction() {
1027 LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
1031 public Future<ActorSelection> readyTransaction() {
1032 LOG.debug("Tx {} readyTransaction called", identifier);
1033 return akka.dispatch.Futures.failed(failure);
1037 public void deleteData(YangInstanceIdentifier path) {
1038 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
1042 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
1043 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
1047 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
1048 LOG.debug("Tx {} writeData called path = {}", identifier, path);
1052 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
1053 YangInstanceIdentifier path) {
1054 LOG.debug("Tx {} readData called path = {}", identifier, path);
1055 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
1056 "Error reading data for path " + path, failure));
1060 public CheckedFuture<Boolean, ReadFailedException> dataExists(
1061 YangInstanceIdentifier path) {
1062 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
1063 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
1064 "Error checking exists for path " + path, failure));