2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.Mapper;
13 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.FinalizablePhantomReference;
16 import com.google.common.base.FinalizableReferenceQueue;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.CheckedFuture;
21 import com.google.common.util.concurrent.FutureCallback;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.concurrent.Future;
50 import scala.concurrent.Promise;
51 import scala.concurrent.duration.FiniteDuration;
52 import java.util.HashMap;
53 import java.util.List;
55 import java.util.concurrent.ConcurrentHashMap;
56 import java.util.concurrent.TimeUnit;
57 import java.util.concurrent.atomic.AtomicBoolean;
58 import java.util.concurrent.atomic.AtomicLong;
59 import javax.annotation.concurrent.GuardedBy;
62 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
64 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
65 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
66 * be created on each of those shards by the TransactionProxy
69 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
70 * shards will be executed.
73 public class TransactionProxy implements DOMStoreReadWriteTransaction {
75 public static enum TransactionType {
81 static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
82 new Mapper<Throwable, Throwable>() {
84 public Throwable apply(Throwable failure) {
89 private static final AtomicLong counter = new AtomicLong();
91 private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class);
94 * Time interval in between transaction create retries.
96 private static final FiniteDuration CREATE_TX_TRY_INTERVAL =
97 FiniteDuration.create(1, TimeUnit.SECONDS);
100 * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
101 * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
102 * trickery to clean up its internal thread when the bundle is unloaded.
104 private static final FinalizableReferenceQueue phantomReferenceQueue =
105 new FinalizableReferenceQueue();
108 * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
109 * necessary because PhantomReferences need a hard reference so they're not garbage collected.
110 * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
111 * and thus becomes eligible for garbage collection.
113 private static final Map<TransactionProxyCleanupPhantomReference,
114 TransactionProxyCleanupPhantomReference> phantomReferenceCache =
115 new ConcurrentHashMap<>();
118 * A PhantomReference that closes remote transactions for a TransactionProxy when it's
119 * garbage collected. This is used for read-only transactions as they're not explicitly closed
120 * by clients. So the only way to detect that a transaction is no longer in use and it's safe
121 * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
122 * but TransactionProxy instances should generally be short-lived enough to avoid being moved
123 * to the old generation space and thus should be cleaned up in a timely manner as the GC
124 * runs on the young generation (eden, swap1...) space much more frequently.
126 private static class TransactionProxyCleanupPhantomReference
127 extends FinalizablePhantomReference<TransactionProxy> {
129 private final List<ActorSelection> remoteTransactionActors;
130 private final AtomicBoolean remoteTransactionActorsMB;
131 private final ActorContext actorContext;
132 private final TransactionIdentifier identifier;
134 protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
135 super(referent, phantomReferenceQueue);
137 // Note we need to cache the relevant fields from the TransactionProxy as we can't
138 // have a hard reference to the TransactionProxy instance itself.
140 remoteTransactionActors = referent.remoteTransactionActors;
141 remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
142 actorContext = referent.actorContext;
143 identifier = referent.identifier;
147 public void finalizeReferent() {
148 LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
149 remoteTransactionActors.size(), identifier);
151 phantomReferenceCache.remove(this);
153 // Access the memory barrier volatile to ensure all previous updates to the
154 // remoteTransactionActors list are visible to this thread.
156 if(remoteTransactionActorsMB.get()) {
157 for(ActorSelection actor : remoteTransactionActors) {
158 LOG.trace("Sending CloseTransaction to {}", actor);
159 actorContext.sendOperationAsync(actor,
160 new CloseTransaction().toSerializable());
167 * Stores the remote Tx actors for each requested data store path to be used by the
168 * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
169 * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
170 * remoteTransactionActors list so they will be visible to the thread accessing the
173 private List<ActorSelection> remoteTransactionActors;
174 private AtomicBoolean remoteTransactionActorsMB;
177 * Stores the create transaction results per shard.
179 private final Map<String, TransactionFutureCallback> txFutureCallbackMap = new HashMap<>();
181 private final TransactionType transactionType;
182 private final ActorContext actorContext;
183 private final TransactionIdentifier identifier;
184 private final TransactionChainProxy transactionChainProxy;
185 private final SchemaContext schemaContext;
186 private boolean inReadyState;
188 public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
189 this(actorContext, transactionType, null);
192 public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
193 TransactionChainProxy transactionChainProxy) {
194 this.actorContext = Preconditions.checkNotNull(actorContext,
195 "actorContext should not be null");
196 this.transactionType = Preconditions.checkNotNull(transactionType,
197 "transactionType should not be null");
198 this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
199 "schemaContext should not be null");
200 this.transactionChainProxy = transactionChainProxy;
202 String memberName = actorContext.getCurrentMemberName();
203 if(memberName == null){
204 memberName = "UNKNOWN-MEMBER";
207 this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
208 counter.getAndIncrement()).build();
210 if(transactionType == TransactionType.READ_ONLY) {
211 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
212 // to close the remote Tx's when this instance is no longer in use and is garbage
215 remoteTransactionActors = Lists.newArrayList();
216 remoteTransactionActorsMB = new AtomicBoolean();
218 TransactionProxyCleanupPhantomReference cleanup =
219 new TransactionProxyCleanupPhantomReference(this);
220 phantomReferenceCache.put(cleanup, cleanup);
223 LOG.debug("Created txn {} of type {}", identifier, transactionType);
227 List<Future<Object>> getRecordedOperationFutures() {
228 List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
229 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
230 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
231 if(transactionContext != null) {
232 recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
236 return recordedOperationFutures;
240 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
241 final YangInstanceIdentifier path) {
243 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
244 "Read operation on write-only transaction is not allowed");
246 LOG.debug("Tx {} read {}", identifier, path);
248 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
249 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
251 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future;
252 if(transactionContext != null) {
253 future = transactionContext.readData(path);
255 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
256 // callback to be executed after the Tx is created.
257 final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
258 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
260 public void invoke(TransactionContext transactionContext) {
261 Futures.addCallback(transactionContext.readData(path),
262 new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
264 public void onSuccess(Optional<NormalizedNode<?, ?>> data) {
265 proxyFuture.set(data);
269 public void onFailure(Throwable t) {
270 proxyFuture.setException(t);
276 future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
283 public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
285 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
286 "Exists operation on write-only transaction is not allowed");
288 LOG.debug("Tx {} exists {}", identifier, path);
290 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
291 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
293 CheckedFuture<Boolean, ReadFailedException> future;
294 if(transactionContext != null) {
295 future = transactionContext.dataExists(path);
297 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
298 // callback to be executed after the Tx is created.
299 final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
300 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
302 public void invoke(TransactionContext transactionContext) {
303 Futures.addCallback(transactionContext.dataExists(path),
304 new FutureCallback<Boolean>() {
306 public void onSuccess(Boolean exists) {
307 proxyFuture.set(exists);
311 public void onFailure(Throwable t) {
312 proxyFuture.setException(t);
318 future = MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
324 private void checkModificationState() {
325 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
326 "Modification operation on read-only transaction is not allowed");
327 Preconditions.checkState(!inReadyState,
328 "Transaction is sealed - further modifications are not allowed");
332 public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
334 checkModificationState();
336 LOG.debug("Tx {} write {}", identifier, path);
338 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
339 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
340 if(transactionContext != null) {
341 transactionContext.writeData(path, data);
343 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
344 // callback to be executed after the Tx is created.
345 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
347 public void invoke(TransactionContext transactionContext) {
348 transactionContext.writeData(path, data);
355 public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
357 checkModificationState();
359 LOG.debug("Tx {} merge {}", identifier, path);
361 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
362 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
363 if(transactionContext != null) {
364 transactionContext.mergeData(path, data);
366 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
367 // callback to be executed after the Tx is created.
368 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
370 public void invoke(TransactionContext transactionContext) {
371 transactionContext.mergeData(path, data);
378 public void delete(final YangInstanceIdentifier path) {
380 checkModificationState();
382 LOG.debug("Tx {} delete {}", identifier, path);
384 TransactionFutureCallback txFutureCallback = getOrCreateTxFutureCallback(path);
385 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
386 if(transactionContext != null) {
387 transactionContext.deleteData(path);
389 // The shard Tx hasn't been created yet so add the Tx operation to the Tx Future
390 // callback to be executed after the Tx is created.
391 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
393 public void invoke(TransactionContext transactionContext) {
394 transactionContext.deleteData(path);
401 public DOMStoreThreePhaseCommitCohort ready() {
403 checkModificationState();
407 LOG.debug("Tx {} Readying {} transactions for commit", identifier,
408 txFutureCallbackMap.size());
410 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
412 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
414 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
415 txFutureCallback.getShardName());
417 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
418 if(transactionContext != null) {
419 cohortFutures.add(transactionContext.readyTransaction());
421 // The shard Tx hasn't been created yet so create a promise to ready the Tx later
422 // after it's created.
423 final Promise<ActorSelection> cohortPromise = akka.dispatch.Futures.promise();
424 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
426 public void invoke(TransactionContext transactionContext) {
427 cohortPromise.completeWith(transactionContext.readyTransaction());
431 cohortFutures.add(cohortPromise.future());
435 if(transactionChainProxy != null){
436 transactionChainProxy.onTransactionReady(cohortFutures);
439 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
440 identifier.toString());
444 public Object getIdentifier() {
445 return this.identifier;
449 public void close() {
450 for(TransactionFutureCallback txFutureCallback : txFutureCallbackMap.values()) {
451 TransactionContext transactionContext = txFutureCallback.getTransactionContext();
452 if(transactionContext != null) {
453 transactionContext.closeTransaction();
455 txFutureCallback.addTxOperationOnComplete(new TransactionOperation() {
457 public void invoke(TransactionContext transactionContext) {
458 transactionContext.closeTransaction();
464 txFutureCallbackMap.clear();
466 if(transactionType == TransactionType.READ_ONLY) {
467 remoteTransactionActors.clear();
468 remoteTransactionActorsMB.set(true);
472 private String shardNameFromIdentifier(YangInstanceIdentifier path){
473 return ShardStrategyFactory.getStrategy(path).findShard(path);
476 private TransactionFutureCallback getOrCreateTxFutureCallback(YangInstanceIdentifier path) {
477 String shardName = shardNameFromIdentifier(path);
478 TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
479 if(txFutureCallback == null) {
480 Future<ActorSelection> findPrimaryFuture = actorContext.findPrimaryShardAsync(shardName);
482 final TransactionFutureCallback newTxFutureCallback =
483 new TransactionFutureCallback(shardName);
485 txFutureCallback = newTxFutureCallback;
486 txFutureCallbackMap.put(shardName, txFutureCallback);
488 findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
490 public void onComplete(Throwable failure, ActorSelection primaryShard) {
491 if(failure != null) {
492 newTxFutureCallback.onComplete(failure, null);
494 newTxFutureCallback.setPrimaryShard(primaryShard);
497 }, actorContext.getActorSystem().dispatcher());
500 return txFutureCallback;
503 public String getTransactionChainId() {
504 if(transactionChainProxy == null){
507 return transactionChainProxy.getTransactionChainId();
511 * Interface for a transaction operation to be invoked later.
513 private static interface TransactionOperation {
514 void invoke(TransactionContext transactionContext);
518 * Implements a Future OnComplete callback for a CreateTransaction message. This class handles
519 * retries, up to a limit, if the shard doesn't have a leader yet. This is done by scheduling a
520 * retry task after a short delay.
522 * The end result from a completed CreateTransaction message is a TransactionContext that is
523 * used to perform transaction operations. Transaction operations that occur before the
524 * CreateTransaction completes are cache and executed once the CreateTransaction completes,
525 * successfully or not.
527 private class TransactionFutureCallback extends OnComplete<Object> {
530 * The list of transaction operations to execute once the CreateTransaction completes.
532 @GuardedBy("txOperationsOnComplete")
533 private final List<TransactionOperation> txOperationsOnComplete = Lists.newArrayList();
536 * The TransactionContext resulting from the CreateTransaction reply.
538 private volatile TransactionContext transactionContext;
541 * The target primary shard.
543 private volatile ActorSelection primaryShard;
545 private volatile int createTxTries = (int) (actorContext.getDatastoreContext().
546 getShardLeaderElectionTimeout().duration().toMillis() /
547 CREATE_TX_TRY_INTERVAL.toMillis());
549 private final String shardName;
551 TransactionFutureCallback(String shardName) {
552 this.shardName = shardName;
555 String getShardName() {
559 TransactionContext getTransactionContext() {
560 return transactionContext;
565 * Sets the target primary shard and initiates a CreateTransaction try.
567 void setPrimaryShard(ActorSelection primaryShard) {
568 LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
570 this.primaryShard = primaryShard;
571 tryCreateTransaction();
575 * Adds a TransactionOperation to be executed after the CreateTransaction completes.
577 void addTxOperationOnComplete(TransactionOperation operation) {
578 synchronized(txOperationsOnComplete) {
579 if(transactionContext == null) {
580 LOG.debug("Tx {} Adding operation on complete {}", identifier);
582 txOperationsOnComplete.add(operation);
584 operation.invoke(transactionContext);
590 * Performs a CreateTransaction try async.
592 private void tryCreateTransaction() {
593 Future<Object> createTxFuture = actorContext.executeOperationAsync(primaryShard,
594 new CreateTransaction(identifier.toString(),
595 TransactionProxy.this.transactionType.ordinal(),
596 getTransactionChainId()).toSerializable());
598 createTxFuture.onComplete(this, actorContext.getActorSystem().dispatcher());
602 public void onComplete(Throwable failure, Object response) {
603 if(failure instanceof NoShardLeaderException) {
604 // There's no leader for the shard yet - schedule and try again, unless we're out
605 // of retries. Note: createTxTries is volatile as it may be written by different
606 // threads however not concurrently, therefore decrementing it non-atomically here
608 if(--createTxTries > 0) {
609 LOG.debug("Tx {} Shard {} has no leader yet - scheduling create Tx retry",
610 identifier, shardName);
612 actorContext.getActorSystem().scheduler().scheduleOnce(CREATE_TX_TRY_INTERVAL,
616 tryCreateTransaction();
618 }, actorContext.getActorSystem().dispatcher());
623 // Create the TransactionContext from the response or failure and execute delayed
624 // TransactionOperations. This entire section is done atomically (ie synchronized) with
625 // respect to #addTxOperationOnComplete to handle timing issues and ensure no
626 // TransactionOperation is missed and that they are processed in the order they occurred.
627 synchronized(txOperationsOnComplete) {
628 if(failure != null) {
629 LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
630 failure.getMessage());
632 transactionContext = new NoOpTransactionContext(failure, identifier);
633 } else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
634 createValidTransactionContext(CreateTransactionReply.fromSerializable(response));
636 IllegalArgumentException exception = new IllegalArgumentException(String.format(
637 "Invalid reply type %s for CreateTransaction", response.getClass()));
639 transactionContext = new NoOpTransactionContext(exception, identifier);
642 for(TransactionOperation oper: txOperationsOnComplete) {
643 oper.invoke(transactionContext);
646 txOperationsOnComplete.clear();
650 private void createValidTransactionContext(CreateTransactionReply reply) {
651 String transactionPath = reply.getTransactionPath();
653 LOG.debug("Tx {} Received transaction actor path {}", identifier, transactionPath);
655 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
657 if (transactionType == TransactionType.READ_ONLY) {
658 // Add the actor to the remoteTransactionActors list for access by the
659 // cleanup PhantonReference.
660 remoteTransactionActors.add(transactionActor);
662 // Write to the memory barrier volatile to publish the above update to the
663 // remoteTransactionActors list for thread visibility.
664 remoteTransactionActorsMB.set(true);
667 // TxActor is always created where the leader of the shard is.
668 // Check if TxActor is created in the same node
669 boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
671 transactionContext = new TransactionContextImpl(transactionActor, identifier,
672 actorContext, schemaContext, isTxActorLocal);
676 private interface TransactionContext {
677 void closeTransaction();
679 Future<ActorSelection> readyTransaction();
681 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
683 void deleteData(YangInstanceIdentifier path);
685 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
687 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
688 final YangInstanceIdentifier path);
690 CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
692 List<Future<Object>> getRecordedOperationFutures();
695 private static abstract class AbstractTransactionContext implements TransactionContext {
697 protected final TransactionIdentifier identifier;
698 protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
700 AbstractTransactionContext(TransactionIdentifier identifier) {
701 this.identifier = identifier;
705 public List<Future<Object>> getRecordedOperationFutures() {
706 return recordedOperationFutures;
710 private static class TransactionContextImpl extends AbstractTransactionContext {
711 private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
713 private final ActorContext actorContext;
714 private final SchemaContext schemaContext;
715 private final ActorSelection actor;
716 private final boolean isTxActorLocal;
718 private TransactionContextImpl(ActorSelection actor, TransactionIdentifier identifier,
719 ActorContext actorContext, SchemaContext schemaContext,
720 boolean isTxActorLocal) {
723 this.actorContext = actorContext;
724 this.schemaContext = schemaContext;
725 this.isTxActorLocal = isTxActorLocal;
728 private ActorSelection getActor() {
733 public void closeTransaction() {
734 LOG.debug("Tx {} closeTransaction called", identifier);
736 actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
740 public Future<ActorSelection> readyTransaction() {
741 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
742 identifier, recordedOperationFutures.size());
744 // Send the ReadyTransaction message to the Tx actor.
746 ReadyTransaction readyTransaction = new ReadyTransaction();
747 final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
748 isTxActorLocal ? readyTransaction : readyTransaction.toSerializable());
750 // Combine all the previously recorded put/merge/delete operation reply Futures and the
751 // ReadyTransactionReply Future into one Future. If any one fails then the combined
752 // Future will fail. We need all prior operations and the ready operation to succeed
753 // in order to attempt commit.
755 List<Future<Object>> futureList =
756 Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
757 futureList.addAll(recordedOperationFutures);
758 futureList.add(replyFuture);
760 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
761 actorContext.getActorSystem().dispatcher());
763 // Transform the combined Future into a Future that returns the cohort actor path from
764 // the ReadyTransactionReply. That's the end result of the ready operation.
766 return combinedFutures.transform(new Mapper<Iterable<Object>, ActorSelection>() {
768 public ActorSelection checkedApply(Iterable<Object> notUsed) {
769 LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
772 // At this point all the Futures succeeded and we need to extract the cohort
773 // actor path from the ReadyTransactionReply. For the recorded operations, they
774 // don't return any data so we're only interested that they completed
775 // successfully. We could be paranoid and verify the correct reply types but
776 // that really should never happen so it's not worth the overhead of
777 // de-serializing each reply.
779 // Note the Future get call here won't block as it's complete.
780 Object serializedReadyReply = replyFuture.value().get().get();
781 if (serializedReadyReply instanceof ReadyTransactionReply) {
782 return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
784 } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
785 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
786 return actorContext.actorSelection(reply.getCohortPath());
789 // Throwing an exception here will fail the Future.
790 throw new IllegalArgumentException(String.format("Invalid reply type {}",
791 serializedReadyReply.getClass()));
794 }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
798 public void deleteData(YangInstanceIdentifier path) {
799 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
801 DeleteData deleteData = new DeleteData(path);
802 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
803 isTxActorLocal ? deleteData : deleteData.toSerializable()));
807 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
808 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
810 MergeData mergeData = new MergeData(path, data, schemaContext);
811 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
812 isTxActorLocal ? mergeData : mergeData.toSerializable()));
816 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
817 LOG.debug("Tx {} writeData called path = {}", identifier, path);
819 WriteData writeData = new WriteData(path, data, schemaContext);
820 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
821 isTxActorLocal ? writeData : writeData.toSerializable()));
825 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
826 final YangInstanceIdentifier path) {
828 LOG.debug("Tx {} readData called path = {}", identifier, path);
830 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
832 // If there were any previous recorded put/merge/delete operation reply Futures then we
833 // must wait for them to successfully complete. This is necessary to honor the read
834 // uncommitted semantics of the public API contract. If any one fails then fail the read.
836 if(recordedOperationFutures.isEmpty()) {
837 finishReadData(path, returnFuture);
839 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
840 identifier, recordedOperationFutures.size());
842 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
843 // Futures#sequence accesses the passed List on a different thread, as
844 // recordedOperationFutures is not synchronized.
846 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
847 Lists.newArrayList(recordedOperationFutures),
848 actorContext.getActorSystem().dispatcher());
850 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
852 public void onComplete(Throwable failure, Iterable<Object> notUsed)
854 if(failure != null) {
855 LOG.debug("Tx {} readData: a recorded operation failed: {}",
856 identifier, failure);
857 returnFuture.setException(new ReadFailedException(
858 "The read could not be performed because a previous put, merge,"
859 + "or delete operation failed", failure));
861 finishReadData(path, returnFuture);
866 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
869 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
872 private void finishReadData(final YangInstanceIdentifier path,
873 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
875 LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
877 OnComplete<Object> onComplete = new OnComplete<Object>() {
879 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
880 if(failure != null) {
881 LOG.debug("Tx {} read operation failed: {}", identifier, failure);
882 returnFuture.setException(new ReadFailedException(
883 "Error reading data for path " + path, failure));
886 LOG.debug("Tx {} read operation succeeded", identifier, failure);
888 if (readResponse instanceof ReadDataReply) {
889 ReadDataReply reply = (ReadDataReply) readResponse;
890 returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
892 } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
893 ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
894 returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
897 returnFuture.setException(new ReadFailedException(
898 "Invalid response reading data for path " + path));
904 ReadData readData = new ReadData(path);
905 Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
906 isTxActorLocal ? readData : readData.toSerializable());
908 readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
912 public CheckedFuture<Boolean, ReadFailedException> dataExists(
913 final YangInstanceIdentifier path) {
915 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
917 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
919 // If there were any previous recorded put/merge/delete operation reply Futures then we
920 // must wait for them to successfully complete. This is necessary to honor the read
921 // uncommitted semantics of the public API contract. If any one fails then fail this
924 if(recordedOperationFutures.isEmpty()) {
925 finishDataExists(path, returnFuture);
927 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
928 identifier, recordedOperationFutures.size());
930 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
931 // Futures#sequence accesses the passed List on a different thread, as
932 // recordedOperationFutures is not synchronized.
934 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
935 Lists.newArrayList(recordedOperationFutures),
936 actorContext.getActorSystem().dispatcher());
937 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
939 public void onComplete(Throwable failure, Iterable<Object> notUsed)
941 if(failure != null) {
942 LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
943 identifier, failure);
944 returnFuture.setException(new ReadFailedException(
945 "The data exists could not be performed because a previous "
946 + "put, merge, or delete operation failed", failure));
948 finishDataExists(path, returnFuture);
953 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
956 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
959 private void finishDataExists(final YangInstanceIdentifier path,
960 final SettableFuture<Boolean> returnFuture) {
962 LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
964 OnComplete<Object> onComplete = new OnComplete<Object>() {
966 public void onComplete(Throwable failure, Object response) throws Throwable {
967 if(failure != null) {
968 LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
969 returnFuture.setException(new ReadFailedException(
970 "Error checking data exists for path " + path, failure));
972 LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
974 if (response instanceof DataExistsReply) {
975 returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
977 } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
978 returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
981 returnFuture.setException(new ReadFailedException(
982 "Invalid response checking exists for path " + path));
988 DataExists dataExists = new DataExists(path);
989 Future<Object> future = actorContext.executeOperationAsync(getActor(),
990 isTxActorLocal ? dataExists : dataExists.toSerializable());
992 future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
996 private static class NoOpTransactionContext extends AbstractTransactionContext {
998 private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
1000 private final Throwable failure;
1002 public NoOpTransactionContext(Throwable failure, TransactionIdentifier identifier){
1004 this.failure = failure;
1008 public void closeTransaction() {
1009 LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
1013 public Future<ActorSelection> readyTransaction() {
1014 LOG.debug("Tx {} readyTransaction called", identifier);
1015 return akka.dispatch.Futures.failed(failure);
1019 public void deleteData(YangInstanceIdentifier path) {
1020 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
1024 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
1025 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
1029 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
1030 LOG.debug("Tx {} writeData called path = {}", identifier, path);
1034 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
1035 YangInstanceIdentifier path) {
1036 LOG.debug("Tx {} readData called path = {}", identifier, path);
1037 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
1038 "Error reading data for path " + path, failure));
1042 public CheckedFuture<Boolean, ReadFailedException> dataExists(
1043 YangInstanceIdentifier path) {
1044 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
1045 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
1046 "Error checking exists for path " + path, failure));