Bug 1637: Change Rpc actor calls to async
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.OnComplete;
14
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.FinalizablePhantomReference;
17 import com.google.common.base.FinalizableReferenceQueue;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import scala.Function1;
51 import scala.concurrent.Future;
52 import scala.runtime.AbstractFunction1;
53
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.atomic.AtomicBoolean;
59 import java.util.concurrent.atomic.AtomicLong;
60
61 /**
62  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
63  * <p>
64  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
65  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
66  * be created on each of those shards by the TransactionProxy
67  *</p>
68  * <p>
69  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
70  * shards will be executed.
71  * </p>
72  */
73 public class TransactionProxy implements DOMStoreReadWriteTransaction {
74     public enum TransactionType {
75         READ_ONLY,
76         WRITE_ONLY,
77         READ_WRITE
78     }
79
80     static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
81                                                                           Throwable, Throwable>() {
82         @Override
83         public Throwable apply(Throwable failure) {
84             return failure;
85         }
86     };
87
88     private static final AtomicLong counter = new AtomicLong();
89
90     private static final Logger
91         LOG = LoggerFactory.getLogger(TransactionProxy.class);
92
93
94     /**
95      * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
96      * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
97      * trickery to clean up its internal thread when the bundle is unloaded.
98      */
99     private static final FinalizableReferenceQueue phantomReferenceQueue =
100                                                                   new FinalizableReferenceQueue();
101
102     /**
103      * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
104      * necessary because PhantomReferences need a hard reference so they're not garbage collected.
105      * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
106      * and thus becomes eligible for garbage collection.
107      */
108     private static final Map<TransactionProxyCleanupPhantomReference,
109                              TransactionProxyCleanupPhantomReference> phantomReferenceCache =
110                                                                         new ConcurrentHashMap<>();
111
112     /**
113      * A PhantomReference that closes remote transactions for a TransactionProxy when it's
114      * garbage collected. This is used for read-only transactions as they're not explicitly closed
115      * by clients. So the only way to detect that a transaction is no longer in use and it's safe
116      * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
117      * but TransactionProxy instances should generally be short-lived enough to avoid being moved
118      * to the old generation space and thus should be cleaned up in a timely manner as the GC
119      * runs on the young generation (eden, swap1...) space much more frequently.
120      */
121     private static class TransactionProxyCleanupPhantomReference
122                                            extends FinalizablePhantomReference<TransactionProxy> {
123
124         private final List<ActorSelection> remoteTransactionActors;
125         private final AtomicBoolean remoteTransactionActorsMB;
126         private final ActorContext actorContext;
127         private final TransactionIdentifier identifier;
128
129         protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
130             super(referent, phantomReferenceQueue);
131
132             // Note we need to cache the relevant fields from the TransactionProxy as we can't
133             // have a hard reference to the TransactionProxy instance itself.
134
135             remoteTransactionActors = referent.remoteTransactionActors;
136             remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
137             actorContext = referent.actorContext;
138             identifier = referent.identifier;
139         }
140
141         @Override
142         public void finalizeReferent() {
143             LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
144                     remoteTransactionActors.size(), identifier);
145
146             phantomReferenceCache.remove(this);
147
148             // Access the memory barrier volatile to ensure all previous updates to the
149             // remoteTransactionActors list are visible to this thread.
150
151             if(remoteTransactionActorsMB.get()) {
152                 for(ActorSelection actor : remoteTransactionActors) {
153                     LOG.trace("Sending CloseTransaction to {}", actor);
154                     actorContext.sendRemoteOperationAsync(actor,
155                             new CloseTransaction().toSerializable());
156                 }
157             }
158         }
159     }
160
161     /**
162      * Stores the remote Tx actors for each requested data store path to be used by the
163      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
164      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
165      * remoteTransactionActors list so they will be visible to the thread accessing the
166      * PhantomReference.
167      */
168     private List<ActorSelection> remoteTransactionActors;
169     private AtomicBoolean remoteTransactionActorsMB;
170
171     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
172
173     private final TransactionType transactionType;
174     private final ActorContext actorContext;
175     private final TransactionIdentifier identifier;
176     private final SchemaContext schemaContext;
177     private boolean inReadyState;
178
179     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
180         this.actorContext = Preconditions.checkNotNull(actorContext,
181                 "actorContext should not be null");
182         this.transactionType = Preconditions.checkNotNull(transactionType,
183                 "transactionType should not be null");
184         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
185                 "schemaContext should not be null");
186
187         String memberName = actorContext.getCurrentMemberName();
188         if(memberName == null){
189             memberName = "UNKNOWN-MEMBER";
190         }
191
192         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
193                 counter.getAndIncrement()).build();
194
195         if(transactionType == TransactionType.READ_ONLY) {
196             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
197             // to close the remote Tx's when this instance is no longer in use and is garbage
198             // collected.
199
200             remoteTransactionActors = Lists.newArrayList();
201             remoteTransactionActorsMB = new AtomicBoolean();
202
203             TransactionProxyCleanupPhantomReference cleanup =
204                                               new TransactionProxyCleanupPhantomReference(this);
205             phantomReferenceCache.put(cleanup, cleanup);
206         }
207
208         LOG.debug("Created txn {} of type {}", identifier, transactionType);
209     }
210
211     @VisibleForTesting
212     List<Future<Object>> getRecordedOperationFutures() {
213         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
214         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
215             recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
216         }
217
218         return recordedOperationFutures;
219     }
220
221     @Override
222     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
223             final YangInstanceIdentifier path) {
224
225         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
226                 "Read operation on write-only transaction is not allowed");
227
228         LOG.debug("Tx {} read {}", identifier, path);
229
230         createTransactionIfMissing(actorContext, path);
231
232         return transactionContext(path).readData(path);
233     }
234
235     @Override
236     public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
237
238         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
239                 "Exists operation on write-only transaction is not allowed");
240
241         LOG.debug("Tx {} exists {}", identifier, path);
242
243         createTransactionIfMissing(actorContext, path);
244
245         return transactionContext(path).dataExists(path);
246     }
247
248     private void checkModificationState() {
249         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
250                 "Modification operation on read-only transaction is not allowed");
251         Preconditions.checkState(!inReadyState,
252                 "Transaction is sealed - further modifications are not allowed");
253     }
254
255     @Override
256     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
257
258         checkModificationState();
259
260         LOG.debug("Tx {} write {}", identifier, path);
261
262         createTransactionIfMissing(actorContext, path);
263
264         transactionContext(path).writeData(path, data);
265     }
266
267     @Override
268     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
269
270         checkModificationState();
271
272         LOG.debug("Tx {} merge {}", identifier, path);
273
274         createTransactionIfMissing(actorContext, path);
275
276         transactionContext(path).mergeData(path, data);
277     }
278
279     @Override
280     public void delete(YangInstanceIdentifier path) {
281
282         checkModificationState();
283
284         LOG.debug("Tx {} delete {}", identifier, path);
285
286         createTransactionIfMissing(actorContext, path);
287
288         transactionContext(path).deleteData(path);
289     }
290
291     @Override
292     public DOMStoreThreePhaseCommitCohort ready() {
293
294         checkModificationState();
295
296         inReadyState = true;
297
298         LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
299                 remoteTransactionPaths.size());
300
301         List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
302
303         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
304
305             LOG.debug("Tx {} Readying transaction for shard {}", identifier,
306                     transactionContext.getShardName());
307
308             cohortPathFutures.add(transactionContext.readyTransaction());
309         }
310
311         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
312                 identifier.toString());
313     }
314
315     @Override
316     public Object getIdentifier() {
317         return this.identifier;
318     }
319
320     @Override
321     public void close() {
322         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
323             transactionContext.closeTransaction();
324         }
325
326         remoteTransactionPaths.clear();
327
328         if(transactionType == TransactionType.READ_ONLY) {
329             remoteTransactionActors.clear();
330             remoteTransactionActorsMB.set(true);
331         }
332     }
333
334     private TransactionContext transactionContext(YangInstanceIdentifier path){
335         String shardName = shardNameFromIdentifier(path);
336         return remoteTransactionPaths.get(shardName);
337     }
338
339     private String shardNameFromIdentifier(YangInstanceIdentifier path){
340         return ShardStrategyFactory.getStrategy(path).findShard(path);
341     }
342
343     private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
344         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
345
346         TransactionContext transactionContext =
347             remoteTransactionPaths.get(shardName);
348
349         if(transactionContext != null){
350             // A transaction already exists with that shard
351             return;
352         }
353
354         try {
355             Object response = actorContext.executeShardOperation(shardName,
356                 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
357                 ActorContext.ASK_DURATION);
358             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
359                 CreateTransactionReply reply =
360                     CreateTransactionReply.fromSerializable(response);
361
362                 String transactionPath = reply.getTransactionPath();
363
364                 LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
365
366                 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
367
368                 if(transactionType == TransactionType.READ_ONLY) {
369                     // Add the actor to the remoteTransactionActors list for access by the
370                     // cleanup PhantonReference.
371                     remoteTransactionActors.add(transactionActor);
372
373                     // Write to the memory barrier volatile to publish the above update to the
374                     // remoteTransactionActors list for thread visibility.
375                     remoteTransactionActorsMB.set(true);
376                 }
377
378                 transactionContext = new TransactionContextImpl(shardName, transactionPath,
379                         transactionActor, identifier, actorContext, schemaContext);
380
381                 remoteTransactionPaths.put(shardName, transactionContext);
382             } else {
383                 throw new IllegalArgumentException(String.format(
384                         "Invalid reply type {} for CreateTransaction", response.getClass()));
385             }
386         } catch(Exception e){
387             LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
388             remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
389         }
390     }
391
392     private interface TransactionContext {
393         String getShardName();
394
395         void closeTransaction();
396
397         Future<ActorPath> readyTransaction();
398
399         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
400
401         void deleteData(YangInstanceIdentifier path);
402
403         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
404
405         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
406                 final YangInstanceIdentifier path);
407
408         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
409
410         List<Future<Object>> getRecordedOperationFutures();
411     }
412
413     private static abstract class AbstractTransactionContext implements TransactionContext {
414
415         protected final TransactionIdentifier identifier;
416         protected final String shardName;
417         protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
418
419         AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
420             this.shardName = shardName;
421             this.identifier = identifier;
422         }
423
424         @Override
425         public String getShardName() {
426             return shardName;
427         }
428
429         @Override
430         public List<Future<Object>> getRecordedOperationFutures() {
431             return recordedOperationFutures;
432         }
433     }
434
435     private static class TransactionContextImpl extends AbstractTransactionContext {
436         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
437
438         private final ActorContext actorContext;
439         private final SchemaContext schemaContext;
440         private final String actorPath;
441         private final ActorSelection actor;
442
443         private TransactionContextImpl(String shardName, String actorPath,
444                 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
445                 SchemaContext schemaContext) {
446             super(shardName, identifier);
447             this.actorPath = actorPath;
448             this.actor = actor;
449             this.actorContext = actorContext;
450             this.schemaContext = schemaContext;
451         }
452
453         private ActorSelection getActor() {
454             return actor;
455         }
456
457         private String getResolvedCohortPath(String cohortPath) {
458             return actorContext.resolvePath(actorPath, cohortPath);
459         }
460
461         @Override
462         public void closeTransaction() {
463             LOG.debug("Tx {} closeTransaction called", identifier);
464             actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
465         }
466
467         @Override
468         public Future<ActorPath> readyTransaction() {
469             LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
470                     identifier, recordedOperationFutures.size());
471
472             // Send the ReadyTransaction message to the Tx actor.
473
474             final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
475                     new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
476
477             // Combine all the previously recorded put/merge/delete operation reply Futures and the
478             // ReadyTransactionReply Future into one Future. If any one fails then the combined
479             // Future will fail. We need all prior operations and the ready operation to succeed
480             // in order to attempt commit.
481
482             List<Future<Object>> futureList =
483                     Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
484             futureList.addAll(recordedOperationFutures);
485             futureList.add(replyFuture);
486
487             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
488                     actorContext.getActorSystem().dispatcher());
489
490             // Transform the combined Future into a Future that returns the cohort actor path from
491             // the ReadyTransactionReply. That's the end result of the ready operation.
492
493             return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
494                 @Override
495                 public ActorPath apply(Iterable<Object> notUsed) {
496
497                     LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
498                             identifier);
499
500                     // At this point all the Futures succeeded and we need to extract the cohort
501                     // actor path from the ReadyTransactionReply. For the recorded operations, they
502                     // don't return any data so we're only interested that they completed
503                     // successfully. We could be paranoid and verify the correct reply types but
504                     // that really should never happen so it's not worth the overhead of
505                     // de-serializing each reply.
506
507                     // Note the Future get call here won't block as it's complete.
508                     Object serializedReadyReply = replyFuture.value().get().get();
509                     if(serializedReadyReply.getClass().equals(
510                                                      ReadyTransactionReply.SERIALIZABLE_CLASS)) {
511                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
512                                 actorContext.getActorSystem(), serializedReadyReply);
513
514                         String resolvedCohortPath = getResolvedCohortPath(
515                                 reply.getCohortPath().toString());
516
517                         LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
518                                 identifier, resolvedCohortPath);
519
520                         return actorContext.actorFor(resolvedCohortPath);
521                     } else {
522                         // Throwing an exception here will fail the Future.
523
524                         throw new IllegalArgumentException(String.format("Invalid reply type {}",
525                                 serializedReadyReply.getClass()));
526                     }
527                 }
528             }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
529         }
530
531         @Override
532         public void deleteData(YangInstanceIdentifier path) {
533             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
534             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
535                     new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
536         }
537
538         @Override
539         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
540             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
541             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
542                     new MergeData(path, data, schemaContext).toSerializable(),
543                     ActorContext.ASK_DURATION));
544         }
545
546         @Override
547         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
548             LOG.debug("Tx {} writeData called path = {}", identifier, path);
549             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
550                     new WriteData(path, data, schemaContext).toSerializable(),
551                     ActorContext.ASK_DURATION));
552         }
553
554         @Override
555         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
556                 final YangInstanceIdentifier path) {
557
558             LOG.debug("Tx {} readData called path = {}", identifier, path);
559
560             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
561
562             // If there were any previous recorded put/merge/delete operation reply Futures then we
563             // must wait for them to successfully complete. This is necessary to honor the read
564             // uncommitted semantics of the public API contract. If any one fails then fail the read.
565
566             if(recordedOperationFutures.isEmpty()) {
567                 finishReadData(path, returnFuture);
568             } else {
569                 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
570                         identifier, recordedOperationFutures.size());
571
572                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
573                 // Futures#sequence accesses the passed List on a different thread, as
574                 // recordedOperationFutures is not synchronized.
575
576                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
577                         Lists.newArrayList(recordedOperationFutures),
578                         actorContext.getActorSystem().dispatcher());
579                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
580                     @Override
581                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
582                             throws Throwable {
583                         if(failure != null) {
584                             LOG.debug("Tx {} readData: a recorded operation failed: {}",
585                                     identifier, failure);
586
587                             returnFuture.setException(new ReadFailedException(
588                                     "The read could not be performed because a previous put, merge,"
589                                     + "or delete operation failed", failure));
590                         } else {
591                             finishReadData(path, returnFuture);
592                         }
593                     }
594                 };
595
596                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
597             }
598
599             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
600         }
601
602         private void finishReadData(final YangInstanceIdentifier path,
603                 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
604
605             LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
606
607             OnComplete<Object> onComplete = new OnComplete<Object>() {
608                 @Override
609                 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
610                     if(failure != null) {
611                         LOG.debug("Tx {} read operation failed: {}", identifier, failure);
612
613                         returnFuture.setException(new ReadFailedException(
614                                 "Error reading data for path " + path, failure));
615
616                     } else {
617                         LOG.debug("Tx {} read operation succeeded", identifier, failure);
618
619                         if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
620                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
621                                     path, readResponse);
622                             if (reply.getNormalizedNode() == null) {
623                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
624                             } else {
625                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
626                                         reply.getNormalizedNode()));
627                             }
628                         } else {
629                             returnFuture.setException(new ReadFailedException(
630                                     "Invalid response reading data for path " + path));
631                         }
632                     }
633                 }
634             };
635
636             Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
637                     new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
638             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
639         }
640
641         @Override
642         public CheckedFuture<Boolean, ReadFailedException> dataExists(
643                 final YangInstanceIdentifier path) {
644
645             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
646
647             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
648
649             // If there were any previous recorded put/merge/delete operation reply Futures then we
650             // must wait for them to successfully complete. This is necessary to honor the read
651             // uncommitted semantics of the public API contract. If any one fails then fail this
652             // request.
653
654             if(recordedOperationFutures.isEmpty()) {
655                 finishDataExists(path, returnFuture);
656             } else {
657                 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
658                         identifier, recordedOperationFutures.size());
659
660                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
661                 // Futures#sequence accesses the passed List on a different thread, as
662                 // recordedOperationFutures is not synchronized.
663
664                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
665                         Lists.newArrayList(recordedOperationFutures),
666                         actorContext.getActorSystem().dispatcher());
667                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
668                     @Override
669                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
670                             throws Throwable {
671                         if(failure != null) {
672                             LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
673                                     identifier, failure);
674
675                             returnFuture.setException(new ReadFailedException(
676                                     "The data exists could not be performed because a previous "
677                                     + "put, merge, or delete operation failed", failure));
678                         } else {
679                             finishDataExists(path, returnFuture);
680                         }
681                     }
682                 };
683
684                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
685             }
686
687             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
688         }
689
690         private void finishDataExists(final YangInstanceIdentifier path,
691                 final SettableFuture<Boolean> returnFuture) {
692
693             LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
694
695             OnComplete<Object> onComplete = new OnComplete<Object>() {
696                 @Override
697                 public void onComplete(Throwable failure, Object response) throws Throwable {
698                     if(failure != null) {
699                         LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
700
701                         returnFuture.setException(new ReadFailedException(
702                                 "Error checking data exists for path " + path, failure));
703                     } else {
704                         LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
705
706                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
707                             returnFuture.set(Boolean.valueOf(DataExistsReply.
708                                         fromSerializable(response).exists()));
709                         } else {
710                             returnFuture.setException(new ReadFailedException(
711                                     "Invalid response checking exists for path " + path));
712                         }
713                     }
714                 }
715             };
716
717             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
718                     new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
719             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
720         }
721     }
722
723     private static class NoOpTransactionContext extends AbstractTransactionContext {
724
725         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
726
727         private final Exception failure;
728
729         public NoOpTransactionContext(String shardName, Exception failure,
730                 TransactionIdentifier identifier){
731             super(shardName, identifier);
732             this.failure = failure;
733         }
734
735         @Override
736         public void closeTransaction() {
737             LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
738         }
739
740         @Override
741         public Future<ActorPath> readyTransaction() {
742             LOG.debug("Tx {} readyTransaction called", identifier);
743             return akka.dispatch.Futures.failed(failure);
744         }
745
746         @Override
747         public void deleteData(YangInstanceIdentifier path) {
748             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
749         }
750
751         @Override
752         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
753             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
754         }
755
756         @Override
757         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
758             LOG.debug("Tx {} writeData called path = {}", identifier, path);
759         }
760
761         @Override
762         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
763             YangInstanceIdentifier path) {
764             LOG.debug("Tx {} readData called path = {}", identifier, path);
765             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
766                     "Error reading data for path " + path, failure));
767         }
768
769         @Override
770         public CheckedFuture<Boolean, ReadFailedException> dataExists(
771             YangInstanceIdentifier path) {
772             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
773             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
774                     "Error checking exists for path " + path, failure));
775         }
776     }
777 }