2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.FinalizablePhantomReference;
16 import com.google.common.base.FinalizableReferenceQueue;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.CheckedFuture;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.SettableFuture;
24 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
50 import scala.Function1;
51 import scala.concurrent.Future;
52 import scala.runtime.AbstractFunction1;
54 import java.util.HashMap;
55 import java.util.List;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.atomic.AtomicBoolean;
59 import java.util.concurrent.atomic.AtomicLong;
62 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
64 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
65 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
66 * be created on each of those shards by the TransactionProxy
69 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
70 * shards will be executed.
73 public class TransactionProxy implements DOMStoreReadWriteTransaction {
75 private final TransactionChainProxy transactionChainProxy;
79 public enum TransactionType {
85 static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
86 Throwable, Throwable>() {
88 public Throwable apply(Throwable failure) {
93 private static final AtomicLong counter = new AtomicLong();
95 private static final Logger
96 LOG = LoggerFactory.getLogger(TransactionProxy.class);
100 * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
101 * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
102 * trickery to clean up its internal thread when the bundle is unloaded.
104 private static final FinalizableReferenceQueue phantomReferenceQueue =
105 new FinalizableReferenceQueue();
108 * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
109 * necessary because PhantomReferences need a hard reference so they're not garbage collected.
110 * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
111 * and thus becomes eligible for garbage collection.
113 private static final Map<TransactionProxyCleanupPhantomReference,
114 TransactionProxyCleanupPhantomReference> phantomReferenceCache =
115 new ConcurrentHashMap<>();
118 * A PhantomReference that closes remote transactions for a TransactionProxy when it's
119 * garbage collected. This is used for read-only transactions as they're not explicitly closed
120 * by clients. So the only way to detect that a transaction is no longer in use and it's safe
121 * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
122 * but TransactionProxy instances should generally be short-lived enough to avoid being moved
123 * to the old generation space and thus should be cleaned up in a timely manner as the GC
124 * runs on the young generation (eden, swap1...) space much more frequently.
126 private static class TransactionProxyCleanupPhantomReference
127 extends FinalizablePhantomReference<TransactionProxy> {
129 private final List<ActorSelection> remoteTransactionActors;
130 private final AtomicBoolean remoteTransactionActorsMB;
131 private final ActorContext actorContext;
132 private final TransactionIdentifier identifier;
134 protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
135 super(referent, phantomReferenceQueue);
137 // Note we need to cache the relevant fields from the TransactionProxy as we can't
138 // have a hard reference to the TransactionProxy instance itself.
140 remoteTransactionActors = referent.remoteTransactionActors;
141 remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
142 actorContext = referent.actorContext;
143 identifier = referent.identifier;
147 public void finalizeReferent() {
148 LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
149 remoteTransactionActors.size(), identifier);
151 phantomReferenceCache.remove(this);
153 // Access the memory barrier volatile to ensure all previous updates to the
154 // remoteTransactionActors list are visible to this thread.
156 if(remoteTransactionActorsMB.get()) {
157 for(ActorSelection actor : remoteTransactionActors) {
158 LOG.trace("Sending CloseTransaction to {}", actor);
159 actorContext.sendOperationAsync(actor,
160 new CloseTransaction().toSerializable());
167 * Stores the remote Tx actors for each requested data store path to be used by the
168 * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
169 * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
170 * remoteTransactionActors list so they will be visible to the thread accessing the
173 private List<ActorSelection> remoteTransactionActors;
174 private AtomicBoolean remoteTransactionActorsMB;
176 private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
178 private final TransactionType transactionType;
179 private final ActorContext actorContext;
180 private final TransactionIdentifier identifier;
181 private final SchemaContext schemaContext;
182 private boolean inReadyState;
184 public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
185 this(actorContext, transactionType, null);
189 List<Future<Object>> getRecordedOperationFutures() {
190 List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
191 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
192 recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
195 return recordedOperationFutures;
198 public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
199 this.actorContext = Preconditions.checkNotNull(actorContext,
200 "actorContext should not be null");
201 this.transactionType = Preconditions.checkNotNull(transactionType,
202 "transactionType should not be null");
203 this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
204 "schemaContext should not be null");
205 this.transactionChainProxy = transactionChainProxy;
207 String memberName = actorContext.getCurrentMemberName();
208 if(memberName == null){
209 memberName = "UNKNOWN-MEMBER";
212 this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
213 counter.getAndIncrement()).build();
215 if(transactionType == TransactionType.READ_ONLY) {
216 // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
217 // to close the remote Tx's when this instance is no longer in use and is garbage
220 remoteTransactionActors = Lists.newArrayList();
221 remoteTransactionActorsMB = new AtomicBoolean();
223 TransactionProxyCleanupPhantomReference cleanup =
224 new TransactionProxyCleanupPhantomReference(this);
225 phantomReferenceCache.put(cleanup, cleanup);
227 if(LOG.isDebugEnabled()) {
228 LOG.debug("Created txn {} of type {}", identifier, transactionType);
233 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
234 final YangInstanceIdentifier path) {
236 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
237 "Read operation on write-only transaction is not allowed");
239 if(LOG.isDebugEnabled()) {
240 LOG.debug("Tx {} read {}", identifier, path);
242 createTransactionIfMissing(actorContext, path);
244 return transactionContext(path).readData(path);
248 public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
250 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
251 "Exists operation on write-only transaction is not allowed");
253 if(LOG.isDebugEnabled()) {
254 LOG.debug("Tx {} exists {}", identifier, path);
256 createTransactionIfMissing(actorContext, path);
258 return transactionContext(path).dataExists(path);
261 private void checkModificationState() {
262 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
263 "Modification operation on read-only transaction is not allowed");
264 Preconditions.checkState(!inReadyState,
265 "Transaction is sealed - further modifications are not allowed");
269 public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
271 checkModificationState();
273 if(LOG.isDebugEnabled()) {
274 LOG.debug("Tx {} write {}", identifier, path);
276 createTransactionIfMissing(actorContext, path);
278 transactionContext(path).writeData(path, data);
282 public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
284 checkModificationState();
286 if(LOG.isDebugEnabled()) {
287 LOG.debug("Tx {} merge {}", identifier, path);
289 createTransactionIfMissing(actorContext, path);
291 transactionContext(path).mergeData(path, data);
295 public void delete(YangInstanceIdentifier path) {
297 checkModificationState();
298 if(LOG.isDebugEnabled()) {
299 LOG.debug("Tx {} delete {}", identifier, path);
301 createTransactionIfMissing(actorContext, path);
303 transactionContext(path).deleteData(path);
307 public DOMStoreThreePhaseCommitCohort ready() {
309 checkModificationState();
313 if(LOG.isDebugEnabled()) {
314 LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
315 remoteTransactionPaths.size());
317 List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
319 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
321 if(LOG.isDebugEnabled()) {
322 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
323 transactionContext.getShardName());
325 cohortFutures.add(transactionContext.readyTransaction());
328 if(transactionChainProxy != null){
329 transactionChainProxy.onTransactionReady(cohortFutures);
332 return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
333 identifier.toString());
337 public Object getIdentifier() {
338 return this.identifier;
342 public void close() {
343 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
344 transactionContext.closeTransaction();
347 remoteTransactionPaths.clear();
349 if(transactionType == TransactionType.READ_ONLY) {
350 remoteTransactionActors.clear();
351 remoteTransactionActorsMB.set(true);
355 private TransactionContext transactionContext(YangInstanceIdentifier path){
356 String shardName = shardNameFromIdentifier(path);
357 return remoteTransactionPaths.get(shardName);
360 private String shardNameFromIdentifier(YangInstanceIdentifier path){
361 return ShardStrategyFactory.getStrategy(path).findShard(path);
364 private void createTransactionIfMissing(ActorContext actorContext,
365 YangInstanceIdentifier path) {
367 if(transactionChainProxy != null){
368 transactionChainProxy.waitTillCurrentTransactionReady();
371 String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
373 TransactionContext transactionContext =
374 remoteTransactionPaths.get(shardName);
376 if (transactionContext != null) {
377 // A transaction already exists with that shard
382 Optional<ActorSelection> primaryShard = actorContext.findPrimaryShard(shardName);
383 if (!primaryShard.isPresent()) {
384 throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName);
387 Object response = actorContext.executeOperation(primaryShard.get(),
388 new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
389 getTransactionChainId()).toSerializable());
390 if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
391 CreateTransactionReply reply =
392 CreateTransactionReply.fromSerializable(response);
394 String transactionPath = reply.getTransactionPath();
396 if(LOG.isDebugEnabled()) {
397 LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
399 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
401 if (transactionType == TransactionType.READ_ONLY) {
402 // Add the actor to the remoteTransactionActors list for access by the
403 // cleanup PhantonReference.
404 remoteTransactionActors.add(transactionActor);
406 // Write to the memory barrier volatile to publish the above update to the
407 // remoteTransactionActors list for thread visibility.
408 remoteTransactionActorsMB.set(true);
411 // TxActor is always created where the leader of the shard is.
412 // Check if TxActor is created in the same node
413 boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
415 transactionContext = new TransactionContextImpl(shardName, transactionPath,
416 transactionActor, identifier, actorContext, schemaContext, isTxActorLocal);
418 remoteTransactionPaths.put(shardName, transactionContext);
420 throw new IllegalArgumentException(String.format(
421 "Invalid reply type {} for CreateTransaction", response.getClass()));
423 } catch (Exception e) {
424 if(LOG.isDebugEnabled()) {
425 LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
427 remoteTransactionPaths
428 .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
432 public String getTransactionChainId() {
433 if(transactionChainProxy == null){
436 return transactionChainProxy.getTransactionChainId();
440 private interface TransactionContext {
441 String getShardName();
443 void closeTransaction();
445 Future<ActorSelection> readyTransaction();
447 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
449 void deleteData(YangInstanceIdentifier path);
451 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
453 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
454 final YangInstanceIdentifier path);
456 CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
458 List<Future<Object>> getRecordedOperationFutures();
461 private static abstract class AbstractTransactionContext implements TransactionContext {
463 protected final TransactionIdentifier identifier;
464 protected final String shardName;
465 protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
467 AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
468 this.shardName = shardName;
469 this.identifier = identifier;
473 public String getShardName() {
478 public List<Future<Object>> getRecordedOperationFutures() {
479 return recordedOperationFutures;
483 private static class TransactionContextImpl extends AbstractTransactionContext {
484 private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
486 private final ActorContext actorContext;
487 private final SchemaContext schemaContext;
488 private final String actorPath;
489 private final ActorSelection actor;
490 private final boolean isTxActorLocal;
492 private TransactionContextImpl(String shardName, String actorPath,
493 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
494 SchemaContext schemaContext, boolean isTxActorLocal) {
495 super(shardName, identifier);
496 this.actorPath = actorPath;
498 this.actorContext = actorContext;
499 this.schemaContext = schemaContext;
500 this.isTxActorLocal = isTxActorLocal;
503 private ActorSelection getActor() {
508 public void closeTransaction() {
509 if(LOG.isDebugEnabled()) {
510 LOG.debug("Tx {} closeTransaction called", identifier);
512 actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
516 public Future<ActorSelection> readyTransaction() {
517 if(LOG.isDebugEnabled()) {
518 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
519 identifier, recordedOperationFutures.size());
521 // Send the ReadyTransaction message to the Tx actor.
523 ReadyTransaction readyTransaction = new ReadyTransaction();
524 final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
525 isTxActorLocal ? readyTransaction : readyTransaction.toSerializable());
527 // Combine all the previously recorded put/merge/delete operation reply Futures and the
528 // ReadyTransactionReply Future into one Future. If any one fails then the combined
529 // Future will fail. We need all prior operations and the ready operation to succeed
530 // in order to attempt commit.
532 List<Future<Object>> futureList =
533 Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
534 futureList.addAll(recordedOperationFutures);
535 futureList.add(replyFuture);
537 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
538 actorContext.getActorSystem().dispatcher());
540 // Transform the combined Future into a Future that returns the cohort actor path from
541 // the ReadyTransactionReply. That's the end result of the ready operation.
543 return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorSelection>() {
545 public ActorSelection apply(Iterable<Object> notUsed) {
546 if(LOG.isDebugEnabled()) {
547 LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
550 // At this point all the Futures succeeded and we need to extract the cohort
551 // actor path from the ReadyTransactionReply. For the recorded operations, they
552 // don't return any data so we're only interested that they completed
553 // successfully. We could be paranoid and verify the correct reply types but
554 // that really should never happen so it's not worth the overhead of
555 // de-serializing each reply.
557 // Note the Future get call here won't block as it's complete.
558 Object serializedReadyReply = replyFuture.value().get().get();
559 if (serializedReadyReply instanceof ReadyTransactionReply) {
560 return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
562 } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
563 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
564 return actorContext.actorSelection(reply.getCohortPath());
567 // Throwing an exception here will fail the Future.
568 throw new IllegalArgumentException(String.format("Invalid reply type {}",
569 serializedReadyReply.getClass()));
572 }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
576 public void deleteData(YangInstanceIdentifier path) {
577 if(LOG.isDebugEnabled()) {
578 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
581 DeleteData deleteData = new DeleteData(path);
582 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
583 isTxActorLocal ? deleteData : deleteData.toSerializable()));
587 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
588 if(LOG.isDebugEnabled()) {
589 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
592 MergeData mergeData = new MergeData(path, data, schemaContext);
593 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
594 isTxActorLocal ? mergeData : mergeData.toSerializable()));
598 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
599 if(LOG.isDebugEnabled()) {
600 LOG.debug("Tx {} writeData called path = {}", identifier, path);
603 WriteData writeData = new WriteData(path, data, schemaContext);
604 recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
605 isTxActorLocal ? writeData : writeData.toSerializable()));
609 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
610 final YangInstanceIdentifier path) {
612 if(LOG.isDebugEnabled()) {
613 LOG.debug("Tx {} readData called path = {}", identifier, path);
615 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
617 // If there were any previous recorded put/merge/delete operation reply Futures then we
618 // must wait for them to successfully complete. This is necessary to honor the read
619 // uncommitted semantics of the public API contract. If any one fails then fail the read.
621 if(recordedOperationFutures.isEmpty()) {
622 finishReadData(path, returnFuture);
624 if(LOG.isDebugEnabled()) {
625 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
626 identifier, recordedOperationFutures.size());
628 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
629 // Futures#sequence accesses the passed List on a different thread, as
630 // recordedOperationFutures is not synchronized.
632 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
633 Lists.newArrayList(recordedOperationFutures),
634 actorContext.getActorSystem().dispatcher());
636 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
638 public void onComplete(Throwable failure, Iterable<Object> notUsed)
640 if(failure != null) {
641 if(LOG.isDebugEnabled()) {
642 LOG.debug("Tx {} readData: a recorded operation failed: {}",
643 identifier, failure);
645 returnFuture.setException(new ReadFailedException(
646 "The read could not be performed because a previous put, merge,"
647 + "or delete operation failed", failure));
649 finishReadData(path, returnFuture);
654 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
657 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
660 private void finishReadData(final YangInstanceIdentifier path,
661 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
663 if(LOG.isDebugEnabled()) {
664 LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
666 OnComplete<Object> onComplete = new OnComplete<Object>() {
668 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
669 if(failure != null) {
670 if(LOG.isDebugEnabled()) {
671 LOG.debug("Tx {} read operation failed: {}", identifier, failure);
673 returnFuture.setException(new ReadFailedException(
674 "Error reading data for path " + path, failure));
677 if(LOG.isDebugEnabled()) {
678 LOG.debug("Tx {} read operation succeeded", identifier, failure);
681 if (readResponse instanceof ReadDataReply) {
682 ReadDataReply reply = (ReadDataReply) readResponse;
683 returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
685 } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
686 ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
687 returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
690 returnFuture.setException(new ReadFailedException(
691 "Invalid response reading data for path " + path));
697 ReadData readData = new ReadData(path);
698 Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
699 isTxActorLocal ? readData : readData.toSerializable());
701 readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
705 public CheckedFuture<Boolean, ReadFailedException> dataExists(
706 final YangInstanceIdentifier path) {
708 if(LOG.isDebugEnabled()) {
709 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
711 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
713 // If there were any previous recorded put/merge/delete operation reply Futures then we
714 // must wait for them to successfully complete. This is necessary to honor the read
715 // uncommitted semantics of the public API contract. If any one fails then fail this
718 if(recordedOperationFutures.isEmpty()) {
719 finishDataExists(path, returnFuture);
721 if(LOG.isDebugEnabled()) {
722 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
723 identifier, recordedOperationFutures.size());
725 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
726 // Futures#sequence accesses the passed List on a different thread, as
727 // recordedOperationFutures is not synchronized.
729 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
730 Lists.newArrayList(recordedOperationFutures),
731 actorContext.getActorSystem().dispatcher());
732 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
734 public void onComplete(Throwable failure, Iterable<Object> notUsed)
736 if(failure != null) {
737 if(LOG.isDebugEnabled()) {
738 LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
739 identifier, failure);
741 returnFuture.setException(new ReadFailedException(
742 "The data exists could not be performed because a previous "
743 + "put, merge, or delete operation failed", failure));
745 finishDataExists(path, returnFuture);
750 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
753 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
756 private void finishDataExists(final YangInstanceIdentifier path,
757 final SettableFuture<Boolean> returnFuture) {
759 if(LOG.isDebugEnabled()) {
760 LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
762 OnComplete<Object> onComplete = new OnComplete<Object>() {
764 public void onComplete(Throwable failure, Object response) throws Throwable {
765 if(failure != null) {
766 if(LOG.isDebugEnabled()) {
767 LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
769 returnFuture.setException(new ReadFailedException(
770 "Error checking data exists for path " + path, failure));
772 if(LOG.isDebugEnabled()) {
773 LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
776 if (response instanceof DataExistsReply) {
777 returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
779 } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
780 returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
783 returnFuture.setException(new ReadFailedException(
784 "Invalid response checking exists for path " + path));
790 DataExists dataExists = new DataExists(path);
791 Future<Object> future = actorContext.executeOperationAsync(getActor(),
792 isTxActorLocal ? dataExists : dataExists.toSerializable());
794 future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
798 private static class NoOpTransactionContext extends AbstractTransactionContext {
800 private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
802 private final Exception failure;
804 public NoOpTransactionContext(String shardName, Exception failure,
805 TransactionIdentifier identifier){
806 super(shardName, identifier);
807 this.failure = failure;
811 public void closeTransaction() {
812 if(LOG.isDebugEnabled()) {
813 LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
818 public Future<ActorSelection> readyTransaction() {
819 if(LOG.isDebugEnabled()) {
820 LOG.debug("Tx {} readyTransaction called", identifier);
822 return akka.dispatch.Futures.failed(failure);
826 public void deleteData(YangInstanceIdentifier path) {
827 if(LOG.isDebugEnabled()) {
828 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
833 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
834 if(LOG.isDebugEnabled()) {
835 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
840 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
841 if(LOG.isDebugEnabled()) {
842 LOG.debug("Tx {} writeData called path = {}", identifier, path);
847 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
848 YangInstanceIdentifier path) {
849 if(LOG.isDebugEnabled()) {
850 LOG.debug("Tx {} readData called path = {}", identifier, path);
852 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
853 "Error reading data for path " + path, failure));
857 public CheckedFuture<Boolean, ReadFailedException> dataExists(
858 YangInstanceIdentifier path) {
859 if(LOG.isDebugEnabled()) {
860 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
862 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
863 "Error checking exists for path " + path, failure));