Merge "Bug-1607: Clustering : Remove actorFor (deprecated) call from TransactionProx...
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.FinalizablePhantomReference;
16 import com.google.common.base.FinalizableReferenceQueue;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.CheckedFuture;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.SettableFuture;
23
24 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import scala.Function1;
51 import scala.concurrent.Future;
52 import scala.runtime.AbstractFunction1;
53
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.atomic.AtomicBoolean;
59 import java.util.concurrent.atomic.AtomicLong;
60
61 /**
62  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
63  * <p>
64  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
65  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
66  * be created on each of those shards by the TransactionProxy
67  *</p>
68  * <p>
69  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
70  * shards will be executed.
71  * </p>
72  */
73 public class TransactionProxy implements DOMStoreReadWriteTransaction {
74
75     private final TransactionChainProxy transactionChainProxy;
76
77
78
79     public enum TransactionType {
80         READ_ONLY,
81         WRITE_ONLY,
82         READ_WRITE
83     }
84
85     static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
86                                                                           Throwable, Throwable>() {
87         @Override
88         public Throwable apply(Throwable failure) {
89             return failure;
90         }
91     };
92
93     private static final AtomicLong counter = new AtomicLong();
94
95     private static final Logger
96         LOG = LoggerFactory.getLogger(TransactionProxy.class);
97
98
99     /**
100      * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
101      * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
102      * trickery to clean up its internal thread when the bundle is unloaded.
103      */
104     private static final FinalizableReferenceQueue phantomReferenceQueue =
105                                                                   new FinalizableReferenceQueue();
106
107     /**
108      * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
109      * necessary because PhantomReferences need a hard reference so they're not garbage collected.
110      * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
111      * and thus becomes eligible for garbage collection.
112      */
113     private static final Map<TransactionProxyCleanupPhantomReference,
114                              TransactionProxyCleanupPhantomReference> phantomReferenceCache =
115                                                                         new ConcurrentHashMap<>();
116
117     /**
118      * A PhantomReference that closes remote transactions for a TransactionProxy when it's
119      * garbage collected. This is used for read-only transactions as they're not explicitly closed
120      * by clients. So the only way to detect that a transaction is no longer in use and it's safe
121      * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
122      * but TransactionProxy instances should generally be short-lived enough to avoid being moved
123      * to the old generation space and thus should be cleaned up in a timely manner as the GC
124      * runs on the young generation (eden, swap1...) space much more frequently.
125      */
126     private static class TransactionProxyCleanupPhantomReference
127                                            extends FinalizablePhantomReference<TransactionProxy> {
128
129         private final List<ActorSelection> remoteTransactionActors;
130         private final AtomicBoolean remoteTransactionActorsMB;
131         private final ActorContext actorContext;
132         private final TransactionIdentifier identifier;
133
134         protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
135             super(referent, phantomReferenceQueue);
136
137             // Note we need to cache the relevant fields from the TransactionProxy as we can't
138             // have a hard reference to the TransactionProxy instance itself.
139
140             remoteTransactionActors = referent.remoteTransactionActors;
141             remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
142             actorContext = referent.actorContext;
143             identifier = referent.identifier;
144         }
145
146         @Override
147         public void finalizeReferent() {
148             LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
149                     remoteTransactionActors.size(), identifier);
150
151             phantomReferenceCache.remove(this);
152
153             // Access the memory barrier volatile to ensure all previous updates to the
154             // remoteTransactionActors list are visible to this thread.
155
156             if(remoteTransactionActorsMB.get()) {
157                 for(ActorSelection actor : remoteTransactionActors) {
158                     LOG.trace("Sending CloseTransaction to {}", actor);
159                     actorContext.sendOperationAsync(actor,
160                             new CloseTransaction().toSerializable());
161                 }
162             }
163         }
164     }
165
166     /**
167      * Stores the remote Tx actors for each requested data store path to be used by the
168      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
169      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
170      * remoteTransactionActors list so they will be visible to the thread accessing the
171      * PhantomReference.
172      */
173     private List<ActorSelection> remoteTransactionActors;
174     private AtomicBoolean remoteTransactionActorsMB;
175
176     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
177
178     private final TransactionType transactionType;
179     private final ActorContext actorContext;
180     private final TransactionIdentifier identifier;
181     private final SchemaContext schemaContext;
182     private boolean inReadyState;
183
184     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
185         this(actorContext, transactionType, null);
186     }
187
188     @VisibleForTesting
189     List<Future<Object>> getRecordedOperationFutures() {
190         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
191         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
192             recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
193         }
194
195         return recordedOperationFutures;
196     }
197
198     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
199         this.actorContext = Preconditions.checkNotNull(actorContext,
200             "actorContext should not be null");
201         this.transactionType = Preconditions.checkNotNull(transactionType,
202             "transactionType should not be null");
203         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
204             "schemaContext should not be null");
205         this.transactionChainProxy = transactionChainProxy;
206
207         String memberName = actorContext.getCurrentMemberName();
208         if(memberName == null){
209             memberName = "UNKNOWN-MEMBER";
210         }
211
212         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
213             counter.getAndIncrement()).build();
214
215         if(transactionType == TransactionType.READ_ONLY) {
216             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
217             // to close the remote Tx's when this instance is no longer in use and is garbage
218             // collected.
219
220             remoteTransactionActors = Lists.newArrayList();
221             remoteTransactionActorsMB = new AtomicBoolean();
222
223             TransactionProxyCleanupPhantomReference cleanup =
224                 new TransactionProxyCleanupPhantomReference(this);
225             phantomReferenceCache.put(cleanup, cleanup);
226         }
227         if(LOG.isDebugEnabled()) {
228             LOG.debug("Created txn {} of type {}", identifier, transactionType);
229         }
230     }
231
232     @Override
233     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
234             final YangInstanceIdentifier path) {
235
236         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
237                 "Read operation on write-only transaction is not allowed");
238
239         if(LOG.isDebugEnabled()) {
240             LOG.debug("Tx {} read {}", identifier, path);
241         }
242         createTransactionIfMissing(actorContext, path);
243
244         return transactionContext(path).readData(path);
245     }
246
247     @Override
248     public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
249
250         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
251                 "Exists operation on write-only transaction is not allowed");
252
253         if(LOG.isDebugEnabled()) {
254             LOG.debug("Tx {} exists {}", identifier, path);
255         }
256         createTransactionIfMissing(actorContext, path);
257
258         return transactionContext(path).dataExists(path);
259     }
260
261     private void checkModificationState() {
262         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
263                 "Modification operation on read-only transaction is not allowed");
264         Preconditions.checkState(!inReadyState,
265                 "Transaction is sealed - further modifications are not allowed");
266     }
267
268     @Override
269     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
270
271         checkModificationState();
272
273         if(LOG.isDebugEnabled()) {
274             LOG.debug("Tx {} write {}", identifier, path);
275         }
276         createTransactionIfMissing(actorContext, path);
277
278         transactionContext(path).writeData(path, data);
279     }
280
281     @Override
282     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
283
284         checkModificationState();
285
286         if(LOG.isDebugEnabled()) {
287             LOG.debug("Tx {} merge {}", identifier, path);
288         }
289         createTransactionIfMissing(actorContext, path);
290
291         transactionContext(path).mergeData(path, data);
292     }
293
294     @Override
295     public void delete(YangInstanceIdentifier path) {
296
297         checkModificationState();
298         if(LOG.isDebugEnabled()) {
299             LOG.debug("Tx {} delete {}", identifier, path);
300         }
301         createTransactionIfMissing(actorContext, path);
302
303         transactionContext(path).deleteData(path);
304     }
305
306     @Override
307     public DOMStoreThreePhaseCommitCohort ready() {
308
309         checkModificationState();
310
311         inReadyState = true;
312
313         if(LOG.isDebugEnabled()) {
314             LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
315                 remoteTransactionPaths.size());
316         }
317         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
318
319         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
320
321             if(LOG.isDebugEnabled()) {
322                 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
323                     transactionContext.getShardName());
324             }
325             cohortFutures.add(transactionContext.readyTransaction());
326         }
327
328         if(transactionChainProxy != null){
329             transactionChainProxy.onTransactionReady(cohortFutures);
330         }
331
332         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
333                 identifier.toString());
334     }
335
336     @Override
337     public Object getIdentifier() {
338         return this.identifier;
339     }
340
341     @Override
342     public void close() {
343         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
344             transactionContext.closeTransaction();
345         }
346
347         remoteTransactionPaths.clear();
348
349         if(transactionType == TransactionType.READ_ONLY) {
350             remoteTransactionActors.clear();
351             remoteTransactionActorsMB.set(true);
352         }
353     }
354
355     private TransactionContext transactionContext(YangInstanceIdentifier path){
356         String shardName = shardNameFromIdentifier(path);
357         return remoteTransactionPaths.get(shardName);
358     }
359
360     private String shardNameFromIdentifier(YangInstanceIdentifier path){
361         return ShardStrategyFactory.getStrategy(path).findShard(path);
362     }
363
364     private void createTransactionIfMissing(ActorContext actorContext,
365         YangInstanceIdentifier path) {
366
367         if(transactionChainProxy != null){
368             transactionChainProxy.waitTillCurrentTransactionReady();
369         }
370
371         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
372
373         TransactionContext transactionContext =
374             remoteTransactionPaths.get(shardName);
375
376         if (transactionContext != null) {
377             // A transaction already exists with that shard
378             return;
379         }
380
381         try {
382             Optional<ActorSelection> primaryShard = actorContext.findPrimaryShard(shardName);
383             if (!primaryShard.isPresent()) {
384                 throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName);
385             }
386
387             Object response = actorContext.executeOperation(primaryShard.get(),
388                     new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
389                             getTransactionChainId()).toSerializable());
390             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
391                 CreateTransactionReply reply =
392                     CreateTransactionReply.fromSerializable(response);
393
394                 String transactionPath = reply.getTransactionPath();
395
396                 if(LOG.isDebugEnabled()) {
397                     LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
398                 }
399                 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
400
401                 if (transactionType == TransactionType.READ_ONLY) {
402                     // Add the actor to the remoteTransactionActors list for access by the
403                     // cleanup PhantonReference.
404                     remoteTransactionActors.add(transactionActor);
405
406                     // Write to the memory barrier volatile to publish the above update to the
407                     // remoteTransactionActors list for thread visibility.
408                     remoteTransactionActorsMB.set(true);
409                 }
410
411                 transactionContext = new TransactionContextImpl(shardName, transactionPath,
412                     transactionActor, identifier, actorContext, schemaContext);
413
414                 remoteTransactionPaths.put(shardName, transactionContext);
415             } else {
416                 throw new IllegalArgumentException(String.format(
417                     "Invalid reply type {} for CreateTransaction", response.getClass()));
418             }
419         } catch (Exception e) {
420             if(LOG.isDebugEnabled()) {
421                 LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
422             }
423             remoteTransactionPaths
424                 .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
425         }
426     }
427
428     public String getTransactionChainId() {
429         if(transactionChainProxy == null){
430             return "";
431         }
432         return transactionChainProxy.getTransactionChainId();
433     }
434
435
436     private interface TransactionContext {
437         String getShardName();
438
439         void closeTransaction();
440
441         Future<ActorSelection> readyTransaction();
442
443         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
444
445         void deleteData(YangInstanceIdentifier path);
446
447         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
448
449         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
450                 final YangInstanceIdentifier path);
451
452         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
453
454         List<Future<Object>> getRecordedOperationFutures();
455     }
456
457     private static abstract class AbstractTransactionContext implements TransactionContext {
458
459         protected final TransactionIdentifier identifier;
460         protected final String shardName;
461         protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
462
463         AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
464             this.shardName = shardName;
465             this.identifier = identifier;
466         }
467
468         @Override
469         public String getShardName() {
470             return shardName;
471         }
472
473         @Override
474         public List<Future<Object>> getRecordedOperationFutures() {
475             return recordedOperationFutures;
476         }
477     }
478
479     private static class TransactionContextImpl extends AbstractTransactionContext {
480         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
481
482         private final ActorContext actorContext;
483         private final SchemaContext schemaContext;
484         private final String actorPath;
485         private final ActorSelection actor;
486
487         private TransactionContextImpl(String shardName, String actorPath,
488                 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
489                 SchemaContext schemaContext) {
490             super(shardName, identifier);
491             this.actorPath = actorPath;
492             this.actor = actor;
493             this.actorContext = actorContext;
494             this.schemaContext = schemaContext;
495         }
496
497         private ActorSelection getActor() {
498             return actor;
499         }
500
501         @Override
502         public void closeTransaction() {
503             if(LOG.isDebugEnabled()) {
504                 LOG.debug("Tx {} closeTransaction called", identifier);
505             }
506             actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
507         }
508
509         @Override
510         public Future<ActorSelection> readyTransaction() {
511             if(LOG.isDebugEnabled()) {
512                 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
513                     identifier, recordedOperationFutures.size());
514             }
515             // Send the ReadyTransaction message to the Tx actor.
516
517             final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
518                     new ReadyTransaction().toSerializable());
519
520             // Combine all the previously recorded put/merge/delete operation reply Futures and the
521             // ReadyTransactionReply Future into one Future. If any one fails then the combined
522             // Future will fail. We need all prior operations and the ready operation to succeed
523             // in order to attempt commit.
524
525             List<Future<Object>> futureList =
526                     Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
527             futureList.addAll(recordedOperationFutures);
528             futureList.add(replyFuture);
529
530             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
531                     actorContext.getActorSystem().dispatcher());
532
533             // Transform the combined Future into a Future that returns the cohort actor path from
534             // the ReadyTransactionReply. That's the end result of the ready operation.
535
536             return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorSelection>() {
537                 @Override
538                 public ActorSelection apply(Iterable<Object> notUsed) {
539                     if(LOG.isDebugEnabled()) {
540                         LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
541                             identifier);
542                     }
543                     // At this point all the Futures succeeded and we need to extract the cohort
544                     // actor path from the ReadyTransactionReply. For the recorded operations, they
545                     // don't return any data so we're only interested that they completed
546                     // successfully. We could be paranoid and verify the correct reply types but
547                     // that really should never happen so it's not worth the overhead of
548                     // de-serializing each reply.
549
550                     // Note the Future get call here won't block as it's complete.
551                     Object serializedReadyReply = replyFuture.value().get().get();
552                     if(serializedReadyReply.getClass().equals(
553                                                      ReadyTransactionReply.SERIALIZABLE_CLASS)) {
554                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
555                                serializedReadyReply);
556
557                         return actorContext.actorSelection(reply.getCohortPath());
558                     } else {
559                         // Throwing an exception here will fail the Future.
560
561                         throw new IllegalArgumentException(String.format("Invalid reply type {}",
562                                 serializedReadyReply.getClass()));
563                     }
564                 }
565             }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
566         }
567
568         @Override
569         public void deleteData(YangInstanceIdentifier path) {
570             if(LOG.isDebugEnabled()) {
571                 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
572             }
573             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
574                     new DeleteData(path).toSerializable()));
575         }
576
577         @Override
578         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
579             if(LOG.isDebugEnabled()) {
580                 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
581             }
582             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
583                     new MergeData(path, data, schemaContext).toSerializable()));
584         }
585
586         @Override
587         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
588             if(LOG.isDebugEnabled()) {
589                 LOG.debug("Tx {} writeData called path = {}", identifier, path);
590             }
591             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
592                     new WriteData(path, data, schemaContext).toSerializable()));
593         }
594
595         @Override
596         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
597                 final YangInstanceIdentifier path) {
598
599             if(LOG.isDebugEnabled()) {
600                 LOG.debug("Tx {} readData called path = {}", identifier, path);
601             }
602             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
603
604             // If there were any previous recorded put/merge/delete operation reply Futures then we
605             // must wait for them to successfully complete. This is necessary to honor the read
606             // uncommitted semantics of the public API contract. If any one fails then fail the read.
607
608             if(recordedOperationFutures.isEmpty()) {
609                 finishReadData(path, returnFuture);
610             } else {
611                 if(LOG.isDebugEnabled()) {
612                     LOG.debug("Tx {} readData: verifying {} previous recorded operations",
613                         identifier, recordedOperationFutures.size());
614                 }
615                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
616                 // Futures#sequence accesses the passed List on a different thread, as
617                 // recordedOperationFutures is not synchronized.
618
619                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
620                         Lists.newArrayList(recordedOperationFutures),
621                         actorContext.getActorSystem().dispatcher());
622                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
623                     @Override
624                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
625                             throws Throwable {
626                         if(failure != null) {
627                             if(LOG.isDebugEnabled()) {
628                                 LOG.debug("Tx {} readData: a recorded operation failed: {}",
629                                     identifier, failure);
630                             }
631                             returnFuture.setException(new ReadFailedException(
632                                     "The read could not be performed because a previous put, merge,"
633                                     + "or delete operation failed", failure));
634                         } else {
635                             finishReadData(path, returnFuture);
636                         }
637                     }
638                 };
639
640                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
641             }
642
643             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
644         }
645
646         private void finishReadData(final YangInstanceIdentifier path,
647                 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
648
649             if(LOG.isDebugEnabled()) {
650                 LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
651             }
652             OnComplete<Object> onComplete = new OnComplete<Object>() {
653                 @Override
654                 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
655                     if(failure != null) {
656                         if(LOG.isDebugEnabled()) {
657                             LOG.debug("Tx {} read operation failed: {}", identifier, failure);
658                         }
659                         returnFuture.setException(new ReadFailedException(
660                                 "Error reading data for path " + path, failure));
661
662                     } else {
663                         if(LOG.isDebugEnabled()) {
664                             LOG.debug("Tx {} read operation succeeded", identifier, failure);
665                         }
666                         if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
667                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
668                                     path, readResponse);
669                             if (reply.getNormalizedNode() == null) {
670                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
671                             } else {
672                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
673                                         reply.getNormalizedNode()));
674                             }
675                         } else {
676                             returnFuture.setException(new ReadFailedException(
677                                     "Invalid response reading data for path " + path));
678                         }
679                     }
680                 }
681             };
682
683             Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
684                     new ReadData(path).toSerializable());
685             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
686         }
687
688         @Override
689         public CheckedFuture<Boolean, ReadFailedException> dataExists(
690                 final YangInstanceIdentifier path) {
691
692             if(LOG.isDebugEnabled()) {
693                 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
694             }
695             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
696
697             // If there were any previous recorded put/merge/delete operation reply Futures then we
698             // must wait for them to successfully complete. This is necessary to honor the read
699             // uncommitted semantics of the public API contract. If any one fails then fail this
700             // request.
701
702             if(recordedOperationFutures.isEmpty()) {
703                 finishDataExists(path, returnFuture);
704             } else {
705                 if(LOG.isDebugEnabled()) {
706                     LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
707                         identifier, recordedOperationFutures.size());
708                 }
709                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
710                 // Futures#sequence accesses the passed List on a different thread, as
711                 // recordedOperationFutures is not synchronized.
712
713                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
714                         Lists.newArrayList(recordedOperationFutures),
715                         actorContext.getActorSystem().dispatcher());
716                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
717                     @Override
718                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
719                             throws Throwable {
720                         if(failure != null) {
721                             if(LOG.isDebugEnabled()) {
722                                 LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
723                                     identifier, failure);
724                             }
725                             returnFuture.setException(new ReadFailedException(
726                                     "The data exists could not be performed because a previous "
727                                     + "put, merge, or delete operation failed", failure));
728                         } else {
729                             finishDataExists(path, returnFuture);
730                         }
731                     }
732                 };
733
734                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
735             }
736
737             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
738         }
739
740         private void finishDataExists(final YangInstanceIdentifier path,
741                 final SettableFuture<Boolean> returnFuture) {
742
743             if(LOG.isDebugEnabled()) {
744                 LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
745             }
746             OnComplete<Object> onComplete = new OnComplete<Object>() {
747                 @Override
748                 public void onComplete(Throwable failure, Object response) throws Throwable {
749                     if(failure != null) {
750                         if(LOG.isDebugEnabled()) {
751                             LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
752                         }
753                         returnFuture.setException(new ReadFailedException(
754                                 "Error checking data exists for path " + path, failure));
755                     } else {
756                         if(LOG.isDebugEnabled()) {
757                             LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
758                         }
759                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
760                             returnFuture.set(Boolean.valueOf(DataExistsReply.
761                                         fromSerializable(response).exists()));
762                         } else {
763                             returnFuture.setException(new ReadFailedException(
764                                     "Invalid response checking exists for path " + path));
765                         }
766                     }
767                 }
768             };
769
770             Future<Object> future = actorContext.executeOperationAsync(getActor(),
771                     new DataExists(path).toSerializable());
772             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
773         }
774     }
775
776     private static class NoOpTransactionContext extends AbstractTransactionContext {
777
778         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
779
780         private final Exception failure;
781
782         public NoOpTransactionContext(String shardName, Exception failure,
783                 TransactionIdentifier identifier){
784             super(shardName, identifier);
785             this.failure = failure;
786         }
787
788         @Override
789         public void closeTransaction() {
790             if(LOG.isDebugEnabled()) {
791                 LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
792             }
793         }
794
795         @Override
796         public Future<ActorSelection> readyTransaction() {
797             if(LOG.isDebugEnabled()) {
798                 LOG.debug("Tx {} readyTransaction called", identifier);
799             }
800             return akka.dispatch.Futures.failed(failure);
801         }
802
803         @Override
804         public void deleteData(YangInstanceIdentifier path) {
805             if(LOG.isDebugEnabled()) {
806                 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
807             }
808         }
809
810         @Override
811         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
812             if(LOG.isDebugEnabled()) {
813                 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
814             }
815         }
816
817         @Override
818         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
819             if(LOG.isDebugEnabled()) {
820                 LOG.debug("Tx {} writeData called path = {}", identifier, path);
821             }
822         }
823
824         @Override
825         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
826             YangInstanceIdentifier path) {
827             if(LOG.isDebugEnabled()) {
828                 LOG.debug("Tx {} readData called path = {}", identifier, path);
829             }
830             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
831                     "Error reading data for path " + path, failure));
832         }
833
834         @Override
835         public CheckedFuture<Boolean, ReadFailedException> dataExists(
836             YangInstanceIdentifier path) {
837             if(LOG.isDebugEnabled()) {
838                 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
839             }
840             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
841                     "Error checking exists for path " + path, failure));
842         }
843     }
844 }