2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.OnComplete;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.FinalizablePhantomReference;
17 import com.google.common.base.FinalizableReferenceQueue;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
25 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
26 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
33 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
43 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
44 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
51 import scala.Function1;
52 import scala.concurrent.Future;
53 import scala.runtime.AbstractFunction1;
55 import java.util.HashMap;
56 import java.util.List;
58 import java.util.concurrent.ConcurrentHashMap;
59 import java.util.concurrent.atomic.AtomicBoolean;
60 import java.util.concurrent.atomic.AtomicLong;
63 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
65 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
66 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
67 * be created on each of those shards by the TransactionProxy
70 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
71 * shards will be executed.
74 public class TransactionProxy implements DOMStoreReadWriteTransaction {
76 private final TransactionChainProxy transactionChainProxy;
80 public enum TransactionType {
86 static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
87 Throwable, Throwable>() {
89 public Throwable apply(Throwable failure) {
94 private static final AtomicLong counter = new AtomicLong();
96 private static final Logger
97 LOG = LoggerFactory.getLogger(TransactionProxy.class);
101 * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
102 * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
103 * trickery to clean up its internal thread when the bundle is unloaded.
105 private static final FinalizableReferenceQueue phantomReferenceQueue =
106 new FinalizableReferenceQueue();
109 * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
110 * necessary because PhantomReferences need a hard reference so they're not garbage collected.
111 * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
112 * and thus becomes eligible for garbage collection.
114 private static final Map<TransactionProxyCleanupPhantomReference,
115 TransactionProxyCleanupPhantomReference> phantomReferenceCache =
116 new ConcurrentHashMap<>();
119 * A PhantomReference that closes remote transactions for a TransactionProxy when it's
120 * garbage collected. This is used for read-only transactions as they're not explicitly closed
121 * by clients. So the only way to detect that a transaction is no longer in use and it's safe
122 * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
123 * but TransactionProxy instances should generally be short-lived enough to avoid being moved
124 * to the old generation space and thus should be cleaned up in a timely manner as the GC
125 * runs on the young generation (eden, swap1...) space much more frequently.
127 private static class TransactionProxyCleanupPhantomReference
128 extends FinalizablePhantomReference<TransactionProxy> {
130 private final List<ActorSelection> remoteTransactionActors;
131 private final AtomicBoolean remoteTransactionActorsMB;
132 private final ActorContext actorContext;
133 private final TransactionIdentifier identifier;
135 protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
136 super(referent, phantomReferenceQueue);
138 // Note we need to cache the relevant fields from the TransactionProxy as we can't
139 // have a hard reference to the TransactionProxy instance itself.
141 remoteTransactionActors = referent.remoteTransactionActors;
142 remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
143 actorContext = referent.actorContext;
144 identifier = referent.identifier;
148 public void finalizeReferent() {
149 LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
150 remoteTransactionActors.size(), identifier);
152 phantomReferenceCache.remove(this);
154 // Access the memory barrier volatile to ensure all previous updates to the
155 // remoteTransactionActors list are visible to this thread.
157 if(remoteTransactionActorsMB.get()) {
158 for(ActorSelection actor : remoteTransactionActors) {
159 LOG.trace("Sending CloseTransaction to {}", actor);
160 actorContext.sendOperationAsync(actor,
161 new CloseTransaction().toSerializable());
168 * Stores the remote Tx actors for each requested data store path to be used by the
169 * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
170 * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
171 * remoteTransactionActors list so they will be visible to the thread accessing the
174 private List<ActorSelection> remoteTransactionActors;
175 private AtomicBoolean remoteTransactionActorsMB;
177 private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
179 private final TransactionType transactionType;
180 private final ActorContext actorContext;
181 private final TransactionIdentifier identifier;
182 private final SchemaContext schemaContext;
183 private boolean inReadyState;
185 public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
186 this(actorContext, transactionType, null);
190 List<Future<Object>> getRecordedOperationFutures() {
191 List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
192 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
193 recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
196 return recordedOperationFutures;
199 public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
200 this.actorContext = Preconditions.checkNotNull(actorContext,
201 "actorContext should not be null");
202 this.transactionType = Preconditions.checkNotNull(transactionType,
203 "transactionType should not be null");
204 this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
205 "schemaContext should not be null");
206 this.transactionChainProxy = transactionChainProxy;
208 String memberName = actorContext.getCurrentMemberName();
209 if(memberName == null){
210 memberName = "UNKNOWN-MEMBER";
213 this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
214 counter.getAndIncrement()).build();
216 if(transactionType == TransactionType.READ_ONLY) {
217 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
218 // to close the remote Tx's when this instance is no longer in use and is garbage
221 remoteTransactionActors = Lists.newArrayList();
222 remoteTransactionActorsMB = new AtomicBoolean();
224 TransactionProxyCleanupPhantomReference cleanup =
225 new TransactionProxyCleanupPhantomReference(this);
226 phantomReferenceCache.put(cleanup, cleanup);
228 if(LOG.isDebugEnabled()) {
229 LOG.debug("Created txn {} of type {}", identifier, transactionType);
234 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
235 final YangInstanceIdentifier path) {
237 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
238 "Read operation on write-only transaction is not allowed");
240 if(LOG.isDebugEnabled()) {
241 LOG.debug("Tx {} read {}", identifier, path);
243 createTransactionIfMissing(actorContext, path);
245 return transactionContext(path).readData(path);
249 public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
251 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
252 "Exists operation on write-only transaction is not allowed");
254 if(LOG.isDebugEnabled()) {
255 LOG.debug("Tx {} exists {}", identifier, path);
257 createTransactionIfMissing(actorContext, path);
259 return transactionContext(path).dataExists(path);
262 private void checkModificationState() {
263 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
264 "Modification operation on read-only transaction is not allowed");
265 Preconditions.checkState(!inReadyState,
266 "Transaction is sealed - further modifications are not allowed");
270 public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
272 checkModificationState();
274 if(LOG.isDebugEnabled()) {
275 LOG.debug("Tx {} write {}", identifier, path);
277 createTransactionIfMissing(actorContext, path);
279 transactionContext(path).writeData(path, data);
283 public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
285 checkModificationState();
287 if(LOG.isDebugEnabled()) {
288 LOG.debug("Tx {} merge {}", identifier, path);
290 createTransactionIfMissing(actorContext, path);
292 transactionContext(path).mergeData(path, data);
296 public void delete(YangInstanceIdentifier path) {
298 checkModificationState();
299 if(LOG.isDebugEnabled()) {
300 LOG.debug("Tx {} delete {}", identifier, path);
302 createTransactionIfMissing(actorContext, path);
304 transactionContext(path).deleteData(path);
308 public DOMStoreThreePhaseCommitCohort ready() {
310 checkModificationState();
314 if(LOG.isDebugEnabled()) {
315 LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
316 remoteTransactionPaths.size());
318 List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
320 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
322 if(LOG.isDebugEnabled()) {
323 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
324 transactionContext.getShardName());
326 cohortPathFutures.add(transactionContext.readyTransaction());
329 if(transactionChainProxy != null){
330 transactionChainProxy.onTransactionReady(cohortPathFutures);
333 return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
334 identifier.toString());
338 public Object getIdentifier() {
339 return this.identifier;
343 public void close() {
344 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
345 transactionContext.closeTransaction();
348 remoteTransactionPaths.clear();
350 if(transactionType == TransactionType.READ_ONLY) {
351 remoteTransactionActors.clear();
352 remoteTransactionActorsMB.set(true);
356 private TransactionContext transactionContext(YangInstanceIdentifier path){
357 String shardName = shardNameFromIdentifier(path);
358 return remoteTransactionPaths.get(shardName);
361 private String shardNameFromIdentifier(YangInstanceIdentifier path){
362 return ShardStrategyFactory.getStrategy(path).findShard(path);
365 private void createTransactionIfMissing(ActorContext actorContext,
366 YangInstanceIdentifier path) {
368 if(transactionChainProxy != null){
369 transactionChainProxy.waitTillCurrentTransactionReady();
372 String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
374 TransactionContext transactionContext =
375 remoteTransactionPaths.get(shardName);
377 if (transactionContext != null) {
378 // A transaction already exists with that shard
383 Optional<ActorSelection> primaryShard = actorContext.findPrimaryShard(shardName);
384 if (!primaryShard.isPresent()) {
385 throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName);
388 Object response = actorContext.executeOperation(primaryShard.get(),
389 new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
390 getTransactionChainId()).toSerializable());
391 if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
392 CreateTransactionReply reply =
393 CreateTransactionReply.fromSerializable(response);
395 String transactionPath = reply.getTransactionPath();
397 if(LOG.isDebugEnabled()) {
398 LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
400 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
402 if (transactionType == TransactionType.READ_ONLY) {
403 // Add the actor to the remoteTransactionActors list for access by the
404 // cleanup PhantonReference.
405 remoteTransactionActors.add(transactionActor);
407 // Write to the memory barrier volatile to publish the above update to the
408 // remoteTransactionActors list for thread visibility.
409 remoteTransactionActorsMB.set(true);
412 transactionContext = new TransactionContextImpl(shardName, transactionPath,
413 transactionActor, identifier, actorContext, schemaContext);
415 remoteTransactionPaths.put(shardName, transactionContext);
417 throw new IllegalArgumentException(String.format(
418 "Invalid reply type {} for CreateTransaction", response.getClass()));
420 } catch (Exception e) {
421 if(LOG.isDebugEnabled()) {
422 LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
424 remoteTransactionPaths
425 .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
429 public String getTransactionChainId() {
430 if(transactionChainProxy == null){
433 return transactionChainProxy.getTransactionChainId();
437 private interface TransactionContext {
438 String getShardName();
440 void closeTransaction();
442 Future<ActorPath> readyTransaction();
444 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
446 void deleteData(YangInstanceIdentifier path);
448 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
450 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
451 final YangInstanceIdentifier path);
453 CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
455 List<Future<Object>> getRecordedOperationFutures();
458 private static abstract class AbstractTransactionContext implements TransactionContext {
460 protected final TransactionIdentifier identifier;
461 protected final String shardName;
462 protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
464 AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
465 this.shardName = shardName;
466 this.identifier = identifier;
470 public String getShardName() {
475 public List<Future<Object>> getRecordedOperationFutures() {
476 return recordedOperationFutures;
480 private static class TransactionContextImpl extends AbstractTransactionContext {
481 private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
483 private final ActorContext actorContext;
484 private final SchemaContext schemaContext;
485 private final String actorPath;
486 private final ActorSelection actor;
488 private TransactionContextImpl(String shardName, String actorPath,
489 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
490 SchemaContext schemaContext) {
491 super(shardName, identifier);
492 this.actorPath = actorPath;
494 this.actorContext = actorContext;
495 this.schemaContext = schemaContext;
498 private ActorSelection getActor() {
502 private String getResolvedCohortPath(String cohortPath) {
503 return actorContext.resolvePath(actorPath, cohortPath);
507 public void closeTransaction() {
508 if(LOG.isDebugEnabled()) {
509 LOG.debug("Tx {} closeTransaction called", identifier);
511 actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
515 public Future<ActorPath> readyTransaction() {
516 if(LOG.isDebugEnabled()) {
517 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
518 identifier, recordedOperationFutures.size());
520 // Send the ReadyTransaction message to the Tx actor.
522 final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
523 new ReadyTransaction().toSerializable());
525 // Combine all the previously recorded put/merge/delete operation reply Futures and the
526 // ReadyTransactionReply Future into one Future. If any one fails then the combined
527 // Future will fail. We need all prior operations and the ready operation to succeed
528 // in order to attempt commit.
530 List<Future<Object>> futureList =
531 Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
532 futureList.addAll(recordedOperationFutures);
533 futureList.add(replyFuture);
535 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
536 actorContext.getActorSystem().dispatcher());
538 // Transform the combined Future into a Future that returns the cohort actor path from
539 // the ReadyTransactionReply. That's the end result of the ready operation.
541 return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
543 public ActorPath apply(Iterable<Object> notUsed) {
544 if(LOG.isDebugEnabled()) {
545 LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
548 // At this point all the Futures succeeded and we need to extract the cohort
549 // actor path from the ReadyTransactionReply. For the recorded operations, they
550 // don't return any data so we're only interested that they completed
551 // successfully. We could be paranoid and verify the correct reply types but
552 // that really should never happen so it's not worth the overhead of
553 // de-serializing each reply.
555 // Note the Future get call here won't block as it's complete.
556 Object serializedReadyReply = replyFuture.value().get().get();
557 if(serializedReadyReply.getClass().equals(
558 ReadyTransactionReply.SERIALIZABLE_CLASS)) {
559 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
560 actorContext.getActorSystem(), serializedReadyReply);
562 String resolvedCohortPath = getResolvedCohortPath(
563 reply.getCohortPath().toString());
565 if(LOG.isDebugEnabled()) {
566 LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
567 identifier, resolvedCohortPath);
569 return actorContext.actorFor(resolvedCohortPath);
571 // Throwing an exception here will fail the Future.
573 throw new IllegalArgumentException(String.format("Invalid reply type {}",
574 serializedReadyReply.getClass()));
577 }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
581 public void deleteData(YangInstanceIdentifier path) {
582 if(LOG.isDebugEnabled()) {
583 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
585 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
586 new DeleteData(path).toSerializable()));
590 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
591 if(LOG.isDebugEnabled()) {
592 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
594 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
595 new MergeData(path, data, schemaContext).toSerializable()));
599 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
600 if(LOG.isDebugEnabled()) {
601 LOG.debug("Tx {} writeData called path = {}", identifier, path);
603 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
604 new WriteData(path, data, schemaContext).toSerializable()));
608 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
609 final YangInstanceIdentifier path) {
611 if(LOG.isDebugEnabled()) {
612 LOG.debug("Tx {} readData called path = {}", identifier, path);
614 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
616 // If there were any previous recorded put/merge/delete operation reply Futures then we
617 // must wait for them to successfully complete. This is necessary to honor the read
618 // uncommitted semantics of the public API contract. If any one fails then fail the read.
620 if(recordedOperationFutures.isEmpty()) {
621 finishReadData(path, returnFuture);
623 if(LOG.isDebugEnabled()) {
624 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
625 identifier, recordedOperationFutures.size());
627 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
628 // Futures#sequence accesses the passed List on a different thread, as
629 // recordedOperationFutures is not synchronized.
631 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
632 Lists.newArrayList(recordedOperationFutures),
633 actorContext.getActorSystem().dispatcher());
634 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
636 public void onComplete(Throwable failure, Iterable<Object> notUsed)
638 if(failure != null) {
639 if(LOG.isDebugEnabled()) {
640 LOG.debug("Tx {} readData: a recorded operation failed: {}",
641 identifier, failure);
643 returnFuture.setException(new ReadFailedException(
644 "The read could not be performed because a previous put, merge,"
645 + "or delete operation failed", failure));
647 finishReadData(path, returnFuture);
652 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
655 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
658 private void finishReadData(final YangInstanceIdentifier path,
659 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
661 if(LOG.isDebugEnabled()) {
662 LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
664 OnComplete<Object> onComplete = new OnComplete<Object>() {
666 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
667 if(failure != null) {
668 if(LOG.isDebugEnabled()) {
669 LOG.debug("Tx {} read operation failed: {}", identifier, failure);
671 returnFuture.setException(new ReadFailedException(
672 "Error reading data for path " + path, failure));
675 if(LOG.isDebugEnabled()) {
676 LOG.debug("Tx {} read operation succeeded", identifier, failure);
678 if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
679 ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
681 if (reply.getNormalizedNode() == null) {
682 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
684 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
685 reply.getNormalizedNode()));
688 returnFuture.setException(new ReadFailedException(
689 "Invalid response reading data for path " + path));
695 Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
696 new ReadData(path).toSerializable());
697 readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
701 public CheckedFuture<Boolean, ReadFailedException> dataExists(
702 final YangInstanceIdentifier path) {
704 if(LOG.isDebugEnabled()) {
705 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
707 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
709 // If there were any previous recorded put/merge/delete operation reply Futures then we
710 // must wait for them to successfully complete. This is necessary to honor the read
711 // uncommitted semantics of the public API contract. If any one fails then fail this
714 if(recordedOperationFutures.isEmpty()) {
715 finishDataExists(path, returnFuture);
717 if(LOG.isDebugEnabled()) {
718 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
719 identifier, recordedOperationFutures.size());
721 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
722 // Futures#sequence accesses the passed List on a different thread, as
723 // recordedOperationFutures is not synchronized.
725 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
726 Lists.newArrayList(recordedOperationFutures),
727 actorContext.getActorSystem().dispatcher());
728 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
730 public void onComplete(Throwable failure, Iterable<Object> notUsed)
732 if(failure != null) {
733 if(LOG.isDebugEnabled()) {
734 LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
735 identifier, failure);
737 returnFuture.setException(new ReadFailedException(
738 "The data exists could not be performed because a previous "
739 + "put, merge, or delete operation failed", failure));
741 finishDataExists(path, returnFuture);
746 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
749 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
752 private void finishDataExists(final YangInstanceIdentifier path,
753 final SettableFuture<Boolean> returnFuture) {
755 if(LOG.isDebugEnabled()) {
756 LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
758 OnComplete<Object> onComplete = new OnComplete<Object>() {
760 public void onComplete(Throwable failure, Object response) throws Throwable {
761 if(failure != null) {
762 if(LOG.isDebugEnabled()) {
763 LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
765 returnFuture.setException(new ReadFailedException(
766 "Error checking data exists for path " + path, failure));
768 if(LOG.isDebugEnabled()) {
769 LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
771 if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
772 returnFuture.set(Boolean.valueOf(DataExistsReply.
773 fromSerializable(response).exists()));
775 returnFuture.setException(new ReadFailedException(
776 "Invalid response checking exists for path " + path));
782 Future<Object> future = actorContext.executeOperationAsync(getActor(),
783 new DataExists(path).toSerializable());
784 future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
788 private static class NoOpTransactionContext extends AbstractTransactionContext {
790 private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
792 private final Exception failure;
794 public NoOpTransactionContext(String shardName, Exception failure,
795 TransactionIdentifier identifier){
796 super(shardName, identifier);
797 this.failure = failure;
801 public void closeTransaction() {
802 if(LOG.isDebugEnabled()) {
803 LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
808 public Future<ActorPath> readyTransaction() {
809 if(LOG.isDebugEnabled()) {
810 LOG.debug("Tx {} readyTransaction called", identifier);
812 return akka.dispatch.Futures.failed(failure);
816 public void deleteData(YangInstanceIdentifier path) {
817 if(LOG.isDebugEnabled()) {
818 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
823 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
824 if(LOG.isDebugEnabled()) {
825 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
830 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
831 if(LOG.isDebugEnabled()) {
832 LOG.debug("Tx {} writeData called path = {}", identifier, path);
837 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
838 YangInstanceIdentifier path) {
839 if(LOG.isDebugEnabled()) {
840 LOG.debug("Tx {} readData called path = {}", identifier, path);
842 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
843 "Error reading data for path " + path, failure));
847 public CheckedFuture<Boolean, ReadFailedException> dataExists(
848 YangInstanceIdentifier path) {
849 if(LOG.isDebugEnabled()) {
850 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
852 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
853 "Error checking exists for path " + path, failure));