2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.OnComplete;
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.FinalizablePhantomReference;
17 import com.google.common.base.FinalizableReferenceQueue;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 import scala.Function1;
51 import scala.concurrent.Future;
52 import scala.runtime.AbstractFunction1;
54 import java.util.HashMap;
55 import java.util.List;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.atomic.AtomicBoolean;
59 import java.util.concurrent.atomic.AtomicLong;
62 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
64 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
65 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
66 * be created on each of those shards by the TransactionProxy
69 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
70 * shards will be executed.
73 public class TransactionProxy implements DOMStoreReadWriteTransaction {
74 public enum TransactionType {
80 static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
81 Throwable, Throwable>() {
83 public Throwable apply(Throwable failure) {
88 private static final AtomicLong counter = new AtomicLong();
90 private static final Logger
91 LOG = LoggerFactory.getLogger(TransactionProxy.class);
95 * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
96 * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
97 * trickery to clean up its internal thread when the bundle is unloaded.
99 private static final FinalizableReferenceQueue phantomReferenceQueue =
100 new FinalizableReferenceQueue();
103 * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
104 * necessary because PhantomReferences need a hard reference so they're not garbage collected.
105 * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
106 * and thus becomes eligible for garbage collection.
108 private static final Map<TransactionProxyCleanupPhantomReference,
109 TransactionProxyCleanupPhantomReference> phantomReferenceCache =
110 new ConcurrentHashMap<>();
113 * A PhantomReference that closes remote transactions for a TransactionProxy when it's
114 * garbage collected. This is used for read-only transactions as they're not explicitly closed
115 * by clients. So the only way to detect that a transaction is no longer in use and it's safe
116 * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
117 * but TransactionProxy instances should generally be short-lived enough to avoid being moved
118 * to the old generation space and thus should be cleaned up in a timely manner as the GC
119 * runs on the young generation (eden, swap1...) space much more frequently.
121 private static class TransactionProxyCleanupPhantomReference
122 extends FinalizablePhantomReference<TransactionProxy> {
124 private final List<ActorSelection> remoteTransactionActors;
125 private final AtomicBoolean remoteTransactionActorsMB;
126 private final ActorContext actorContext;
127 private final TransactionIdentifier identifier;
129 protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
130 super(referent, phantomReferenceQueue);
132 // Note we need to cache the relevant fields from the TransactionProxy as we can't
133 // have a hard reference to the TransactionProxy instance itself.
135 remoteTransactionActors = referent.remoteTransactionActors;
136 remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
137 actorContext = referent.actorContext;
138 identifier = referent.identifier;
142 public void finalizeReferent() {
143 LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
144 remoteTransactionActors.size(), identifier);
146 phantomReferenceCache.remove(this);
148 // Access the memory barrier volatile to ensure all previous updates to the
149 // remoteTransactionActors list are visible to this thread.
151 if(remoteTransactionActorsMB.get()) {
152 for(ActorSelection actor : remoteTransactionActors) {
153 LOG.trace("Sending CloseTransaction to {}", actor);
154 actorContext.sendRemoteOperationAsync(actor,
155 new CloseTransaction().toSerializable());
162 * Stores the remote Tx actors for each requested data store path to be used by the
163 * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
164 * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
165 * remoteTransactionActors list so they will be visible to the thread accessing the
168 private List<ActorSelection> remoteTransactionActors;
169 private AtomicBoolean remoteTransactionActorsMB;
171 private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
173 private final TransactionType transactionType;
174 private final ActorContext actorContext;
175 private final TransactionIdentifier identifier;
176 private final SchemaContext schemaContext;
177 private boolean inReadyState;
179 public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
180 this.actorContext = Preconditions.checkNotNull(actorContext,
181 "actorContext should not be null");
182 this.transactionType = Preconditions.checkNotNull(transactionType,
183 "transactionType should not be null");
184 this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
185 "schemaContext should not be null");
187 String memberName = actorContext.getCurrentMemberName();
188 if(memberName == null){
189 memberName = "UNKNOWN-MEMBER";
192 this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
193 counter.getAndIncrement()).build();
195 if(transactionType == TransactionType.READ_ONLY) {
196 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
197 // to close the remote Tx's when this instance is no longer in use and is garbage
200 remoteTransactionActors = Lists.newArrayList();
201 remoteTransactionActorsMB = new AtomicBoolean();
203 TransactionProxyCleanupPhantomReference cleanup =
204 new TransactionProxyCleanupPhantomReference(this);
205 phantomReferenceCache.put(cleanup, cleanup);
208 LOG.debug("Created txn {} of type {}", identifier, transactionType);
212 List<Future<Object>> getRecordedOperationFutures() {
213 List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
214 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
215 recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
218 return recordedOperationFutures;
222 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
223 final YangInstanceIdentifier path) {
225 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
226 "Read operation on write-only transaction is not allowed");
228 LOG.debug("Tx {} read {}", identifier, path);
230 createTransactionIfMissing(actorContext, path);
232 return transactionContext(path).readData(path);
236 public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
238 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
239 "Exists operation on write-only transaction is not allowed");
241 LOG.debug("Tx {} exists {}", identifier, path);
243 createTransactionIfMissing(actorContext, path);
245 return transactionContext(path).dataExists(path);
248 private void checkModificationState() {
249 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
250 "Modification operation on read-only transaction is not allowed");
251 Preconditions.checkState(!inReadyState,
252 "Transaction is sealed - further modifications are not allowed");
256 public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
258 checkModificationState();
260 LOG.debug("Tx {} write {}", identifier, path);
262 createTransactionIfMissing(actorContext, path);
264 transactionContext(path).writeData(path, data);
268 public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
270 checkModificationState();
272 LOG.debug("Tx {} merge {}", identifier, path);
274 createTransactionIfMissing(actorContext, path);
276 transactionContext(path).mergeData(path, data);
280 public void delete(YangInstanceIdentifier path) {
282 checkModificationState();
284 LOG.debug("Tx {} delete {}", identifier, path);
286 createTransactionIfMissing(actorContext, path);
288 transactionContext(path).deleteData(path);
292 public DOMStoreThreePhaseCommitCohort ready() {
294 checkModificationState();
298 LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
299 remoteTransactionPaths.size());
301 List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
303 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
305 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
306 transactionContext.getShardName());
308 cohortPathFutures.add(transactionContext.readyTransaction());
311 return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
312 identifier.toString());
316 public Object getIdentifier() {
317 return this.identifier;
321 public void close() {
322 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
323 transactionContext.closeTransaction();
326 remoteTransactionPaths.clear();
328 if(transactionType == TransactionType.READ_ONLY) {
329 remoteTransactionActors.clear();
330 remoteTransactionActorsMB.set(true);
334 private TransactionContext transactionContext(YangInstanceIdentifier path){
335 String shardName = shardNameFromIdentifier(path);
336 return remoteTransactionPaths.get(shardName);
339 private String shardNameFromIdentifier(YangInstanceIdentifier path){
340 return ShardStrategyFactory.getStrategy(path).findShard(path);
343 private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
344 String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
346 TransactionContext transactionContext =
347 remoteTransactionPaths.get(shardName);
349 if(transactionContext != null){
350 // A transaction already exists with that shard
355 Object response = actorContext.executeShardOperation(shardName,
356 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
357 ActorContext.ASK_DURATION);
358 if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
359 CreateTransactionReply reply =
360 CreateTransactionReply.fromSerializable(response);
362 String transactionPath = reply.getTransactionPath();
364 LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
366 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
368 if(transactionType == TransactionType.READ_ONLY) {
369 // Add the actor to the remoteTransactionActors list for access by the
370 // cleanup PhantonReference.
371 remoteTransactionActors.add(transactionActor);
373 // Write to the memory barrier volatile to publish the above update to the
374 // remoteTransactionActors list for thread visibility.
375 remoteTransactionActorsMB.set(true);
378 transactionContext = new TransactionContextImpl(shardName, transactionPath,
379 transactionActor, identifier, actorContext, schemaContext);
381 remoteTransactionPaths.put(shardName, transactionContext);
383 throw new IllegalArgumentException(String.format(
384 "Invalid reply type {} for CreateTransaction", response.getClass()));
386 } catch(Exception e){
387 LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
388 remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
392 private interface TransactionContext {
393 String getShardName();
395 void closeTransaction();
397 Future<ActorPath> readyTransaction();
399 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
401 void deleteData(YangInstanceIdentifier path);
403 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
405 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
406 final YangInstanceIdentifier path);
408 CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
410 List<Future<Object>> getRecordedOperationFutures();
413 private static abstract class AbstractTransactionContext implements TransactionContext {
415 protected final TransactionIdentifier identifier;
416 protected final String shardName;
417 protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
419 AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
420 this.shardName = shardName;
421 this.identifier = identifier;
425 public String getShardName() {
430 public List<Future<Object>> getRecordedOperationFutures() {
431 return recordedOperationFutures;
435 private static class TransactionContextImpl extends AbstractTransactionContext {
436 private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
438 private final ActorContext actorContext;
439 private final SchemaContext schemaContext;
440 private final String actorPath;
441 private final ActorSelection actor;
443 private TransactionContextImpl(String shardName, String actorPath,
444 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
445 SchemaContext schemaContext) {
446 super(shardName, identifier);
447 this.actorPath = actorPath;
449 this.actorContext = actorContext;
450 this.schemaContext = schemaContext;
453 private ActorSelection getActor() {
457 private String getResolvedCohortPath(String cohortPath) {
458 return actorContext.resolvePath(actorPath, cohortPath);
462 public void closeTransaction() {
463 LOG.debug("Tx {} closeTransaction called", identifier);
464 actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
468 public Future<ActorPath> readyTransaction() {
469 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
470 identifier, recordedOperationFutures.size());
472 // Send the ReadyTransaction message to the Tx actor.
474 final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
475 new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
477 // Combine all the previously recorded put/merge/delete operation reply Futures and the
478 // ReadyTransactionReply Future into one Future. If any one fails then the combined
479 // Future will fail. We need all prior operations and the ready operation to succeed
480 // in order to attempt commit.
482 List<Future<Object>> futureList =
483 Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
484 futureList.addAll(recordedOperationFutures);
485 futureList.add(replyFuture);
487 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
488 actorContext.getActorSystem().dispatcher());
490 // Transform the combined Future into a Future that returns the cohort actor path from
491 // the ReadyTransactionReply. That's the end result of the ready operation.
493 return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
495 public ActorPath apply(Iterable<Object> notUsed) {
497 LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
500 // At this point all the Futures succeeded and we need to extract the cohort
501 // actor path from the ReadyTransactionReply. For the recorded operations, they
502 // don't return any data so we're only interested that they completed
503 // successfully. We could be paranoid and verify the correct reply types but
504 // that really should never happen so it's not worth the overhead of
505 // de-serializing each reply.
507 // Note the Future get call here won't block as it's complete.
508 Object serializedReadyReply = replyFuture.value().get().get();
509 if(serializedReadyReply.getClass().equals(
510 ReadyTransactionReply.SERIALIZABLE_CLASS)) {
511 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
512 actorContext.getActorSystem(), serializedReadyReply);
514 String resolvedCohortPath = getResolvedCohortPath(
515 reply.getCohortPath().toString());
517 LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
518 identifier, resolvedCohortPath);
520 return actorContext.actorFor(resolvedCohortPath);
522 // Throwing an exception here will fail the Future.
524 throw new IllegalArgumentException(String.format("Invalid reply type {}",
525 serializedReadyReply.getClass()));
528 }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
532 public void deleteData(YangInstanceIdentifier path) {
533 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
534 recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
535 new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
539 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
540 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
541 recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
542 new MergeData(path, data, schemaContext).toSerializable(),
543 ActorContext.ASK_DURATION));
547 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
548 LOG.debug("Tx {} writeData called path = {}", identifier, path);
549 recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
550 new WriteData(path, data, schemaContext).toSerializable(),
551 ActorContext.ASK_DURATION));
555 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
556 final YangInstanceIdentifier path) {
558 LOG.debug("Tx {} readData called path = {}", identifier, path);
560 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
562 // If there were any previous recorded put/merge/delete operation reply Futures then we
563 // must wait for them to successfully complete. This is necessary to honor the read
564 // uncommitted semantics of the public API contract. If any one fails then fail the read.
566 if(recordedOperationFutures.isEmpty()) {
567 finishReadData(path, returnFuture);
569 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
570 identifier, recordedOperationFutures.size());
572 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
573 // Futures#sequence accesses the passed List on a different thread, as
574 // recordedOperationFutures is not synchronized.
576 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
577 Lists.newArrayList(recordedOperationFutures),
578 actorContext.getActorSystem().dispatcher());
579 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
581 public void onComplete(Throwable failure, Iterable<Object> notUsed)
583 if(failure != null) {
584 LOG.debug("Tx {} readData: a recorded operation failed: {}",
585 identifier, failure);
587 returnFuture.setException(new ReadFailedException(
588 "The read could not be performed because a previous put, merge,"
589 + "or delete operation failed", failure));
591 finishReadData(path, returnFuture);
596 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
599 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
602 private void finishReadData(final YangInstanceIdentifier path,
603 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
605 LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
607 OnComplete<Object> onComplete = new OnComplete<Object>() {
609 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
610 if(failure != null) {
611 LOG.debug("Tx {} read operation failed: {}", identifier, failure);
613 returnFuture.setException(new ReadFailedException(
614 "Error reading data for path " + path, failure));
617 LOG.debug("Tx {} read operation succeeded", identifier, failure);
619 if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
620 ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
622 if (reply.getNormalizedNode() == null) {
623 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
625 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
626 reply.getNormalizedNode()));
629 returnFuture.setException(new ReadFailedException(
630 "Invalid response reading data for path " + path));
636 Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
637 new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
638 readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
642 public CheckedFuture<Boolean, ReadFailedException> dataExists(
643 final YangInstanceIdentifier path) {
645 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
647 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
649 // If there were any previous recorded put/merge/delete operation reply Futures then we
650 // must wait for them to successfully complete. This is necessary to honor the read
651 // uncommitted semantics of the public API contract. If any one fails then fail this
654 if(recordedOperationFutures.isEmpty()) {
655 finishDataExists(path, returnFuture);
657 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
658 identifier, recordedOperationFutures.size());
660 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
661 // Futures#sequence accesses the passed List on a different thread, as
662 // recordedOperationFutures is not synchronized.
664 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
665 Lists.newArrayList(recordedOperationFutures),
666 actorContext.getActorSystem().dispatcher());
667 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
669 public void onComplete(Throwable failure, Iterable<Object> notUsed)
671 if(failure != null) {
672 LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
673 identifier, failure);
675 returnFuture.setException(new ReadFailedException(
676 "The data exists could not be performed because a previous "
677 + "put, merge, or delete operation failed", failure));
679 finishDataExists(path, returnFuture);
684 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
687 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
690 private void finishDataExists(final YangInstanceIdentifier path,
691 final SettableFuture<Boolean> returnFuture) {
693 LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
695 OnComplete<Object> onComplete = new OnComplete<Object>() {
697 public void onComplete(Throwable failure, Object response) throws Throwable {
698 if(failure != null) {
699 LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
701 returnFuture.setException(new ReadFailedException(
702 "Error checking data exists for path " + path, failure));
704 LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
706 if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
707 returnFuture.set(Boolean.valueOf(DataExistsReply.
708 fromSerializable(response).exists()));
710 returnFuture.setException(new ReadFailedException(
711 "Invalid response checking exists for path " + path));
717 Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
718 new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
719 future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
723 private static class NoOpTransactionContext extends AbstractTransactionContext {
725 private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
727 private final Exception failure;
729 public NoOpTransactionContext(String shardName, Exception failure,
730 TransactionIdentifier identifier){
731 super(shardName, identifier);
732 this.failure = failure;
736 public void closeTransaction() {
737 LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
741 public Future<ActorPath> readyTransaction() {
742 LOG.debug("Tx {} readyTransaction called", identifier);
743 return akka.dispatch.Futures.failed(failure);
747 public void deleteData(YangInstanceIdentifier path) {
748 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
752 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
753 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
757 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
758 LOG.debug("Tx {} writeData called path = {}", identifier, path);
762 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
763 YangInstanceIdentifier path) {
764 LOG.debug("Tx {} readData called path = {}", identifier, path);
765 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
766 "Error reading data for path " + path, failure));
770 public CheckedFuture<Boolean, ReadFailedException> dataExists(
771 YangInstanceIdentifier path) {
772 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
773 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
774 "Error checking exists for path " + path, failure));