Merge "BUG 1966 - change message logging level (info -> trace)"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.OnComplete;
14
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.FinalizablePhantomReference;
17 import com.google.common.base.FinalizableReferenceQueue;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import com.google.common.collect.Lists;
21 import com.google.common.util.concurrent.CheckedFuture;
22 import com.google.common.util.concurrent.Futures;
23 import com.google.common.util.concurrent.SettableFuture;
24
25 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
26 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
27 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
31 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
32 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
33 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
39 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
43 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
44 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
45 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
46 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
47 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import scala.Function1;
52 import scala.concurrent.Future;
53 import scala.runtime.AbstractFunction1;
54
55 import java.util.HashMap;
56 import java.util.List;
57 import java.util.Map;
58 import java.util.concurrent.ConcurrentHashMap;
59 import java.util.concurrent.atomic.AtomicBoolean;
60 import java.util.concurrent.atomic.AtomicLong;
61
62 /**
63  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
64  * <p>
65  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
66  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
67  * be created on each of those shards by the TransactionProxy
68  *</p>
69  * <p>
70  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
71  * shards will be executed.
72  * </p>
73  */
74 public class TransactionProxy implements DOMStoreReadWriteTransaction {
75
76     private final TransactionChainProxy transactionChainProxy;
77
78
79
80     public enum TransactionType {
81         READ_ONLY,
82         WRITE_ONLY,
83         READ_WRITE
84     }
85
86     static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
87                                                                           Throwable, Throwable>() {
88         @Override
89         public Throwable apply(Throwable failure) {
90             return failure;
91         }
92     };
93
94     private static final AtomicLong counter = new AtomicLong();
95
96     private static final Logger
97         LOG = LoggerFactory.getLogger(TransactionProxy.class);
98
99
100     /**
101      * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
102      * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
103      * trickery to clean up its internal thread when the bundle is unloaded.
104      */
105     private static final FinalizableReferenceQueue phantomReferenceQueue =
106                                                                   new FinalizableReferenceQueue();
107
108     /**
109      * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
110      * necessary because PhantomReferences need a hard reference so they're not garbage collected.
111      * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
112      * and thus becomes eligible for garbage collection.
113      */
114     private static final Map<TransactionProxyCleanupPhantomReference,
115                              TransactionProxyCleanupPhantomReference> phantomReferenceCache =
116                                                                         new ConcurrentHashMap<>();
117
118     /**
119      * A PhantomReference that closes remote transactions for a TransactionProxy when it's
120      * garbage collected. This is used for read-only transactions as they're not explicitly closed
121      * by clients. So the only way to detect that a transaction is no longer in use and it's safe
122      * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
123      * but TransactionProxy instances should generally be short-lived enough to avoid being moved
124      * to the old generation space and thus should be cleaned up in a timely manner as the GC
125      * runs on the young generation (eden, swap1...) space much more frequently.
126      */
127     private static class TransactionProxyCleanupPhantomReference
128                                            extends FinalizablePhantomReference<TransactionProxy> {
129
130         private final List<ActorSelection> remoteTransactionActors;
131         private final AtomicBoolean remoteTransactionActorsMB;
132         private final ActorContext actorContext;
133         private final TransactionIdentifier identifier;
134
135         protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
136             super(referent, phantomReferenceQueue);
137
138             // Note we need to cache the relevant fields from the TransactionProxy as we can't
139             // have a hard reference to the TransactionProxy instance itself.
140
141             remoteTransactionActors = referent.remoteTransactionActors;
142             remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
143             actorContext = referent.actorContext;
144             identifier = referent.identifier;
145         }
146
147         @Override
148         public void finalizeReferent() {
149             LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
150                     remoteTransactionActors.size(), identifier);
151
152             phantomReferenceCache.remove(this);
153
154             // Access the memory barrier volatile to ensure all previous updates to the
155             // remoteTransactionActors list are visible to this thread.
156
157             if(remoteTransactionActorsMB.get()) {
158                 for(ActorSelection actor : remoteTransactionActors) {
159                     LOG.trace("Sending CloseTransaction to {}", actor);
160                     actorContext.sendOperationAsync(actor,
161                             new CloseTransaction().toSerializable());
162                 }
163             }
164         }
165     }
166
167     /**
168      * Stores the remote Tx actors for each requested data store path to be used by the
169      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
170      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
171      * remoteTransactionActors list so they will be visible to the thread accessing the
172      * PhantomReference.
173      */
174     private List<ActorSelection> remoteTransactionActors;
175     private AtomicBoolean remoteTransactionActorsMB;
176
177     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
178
179     private final TransactionType transactionType;
180     private final ActorContext actorContext;
181     private final TransactionIdentifier identifier;
182     private final SchemaContext schemaContext;
183     private boolean inReadyState;
184
185     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
186         this(actorContext, transactionType, null);
187     }
188
189     @VisibleForTesting
190     List<Future<Object>> getRecordedOperationFutures() {
191         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
192         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
193             recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
194         }
195
196         return recordedOperationFutures;
197     }
198
199     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
200         this.actorContext = Preconditions.checkNotNull(actorContext,
201             "actorContext should not be null");
202         this.transactionType = Preconditions.checkNotNull(transactionType,
203             "transactionType should not be null");
204         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
205             "schemaContext should not be null");
206         this.transactionChainProxy = transactionChainProxy;
207
208         String memberName = actorContext.getCurrentMemberName();
209         if(memberName == null){
210             memberName = "UNKNOWN-MEMBER";
211         }
212
213         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
214             counter.getAndIncrement()).build();
215
216         if(transactionType == TransactionType.READ_ONLY) {
217             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
218             // to close the remote Tx's when this instance is no longer in use and is garbage
219             // collected.
220
221             remoteTransactionActors = Lists.newArrayList();
222             remoteTransactionActorsMB = new AtomicBoolean();
223
224             TransactionProxyCleanupPhantomReference cleanup =
225                 new TransactionProxyCleanupPhantomReference(this);
226             phantomReferenceCache.put(cleanup, cleanup);
227         }
228         if(LOG.isDebugEnabled()) {
229             LOG.debug("Created txn {} of type {}", identifier, transactionType);
230         }
231     }
232
233     @Override
234     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
235             final YangInstanceIdentifier path) {
236
237         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
238                 "Read operation on write-only transaction is not allowed");
239
240         if(LOG.isDebugEnabled()) {
241             LOG.debug("Tx {} read {}", identifier, path);
242         }
243         createTransactionIfMissing(actorContext, path);
244
245         return transactionContext(path).readData(path);
246     }
247
248     @Override
249     public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
250
251         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
252                 "Exists operation on write-only transaction is not allowed");
253
254         if(LOG.isDebugEnabled()) {
255             LOG.debug("Tx {} exists {}", identifier, path);
256         }
257         createTransactionIfMissing(actorContext, path);
258
259         return transactionContext(path).dataExists(path);
260     }
261
262     private void checkModificationState() {
263         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
264                 "Modification operation on read-only transaction is not allowed");
265         Preconditions.checkState(!inReadyState,
266                 "Transaction is sealed - further modifications are not allowed");
267     }
268
269     @Override
270     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
271
272         checkModificationState();
273
274         if(LOG.isDebugEnabled()) {
275             LOG.debug("Tx {} write {}", identifier, path);
276         }
277         createTransactionIfMissing(actorContext, path);
278
279         transactionContext(path).writeData(path, data);
280     }
281
282     @Override
283     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
284
285         checkModificationState();
286
287         if(LOG.isDebugEnabled()) {
288             LOG.debug("Tx {} merge {}", identifier, path);
289         }
290         createTransactionIfMissing(actorContext, path);
291
292         transactionContext(path).mergeData(path, data);
293     }
294
295     @Override
296     public void delete(YangInstanceIdentifier path) {
297
298         checkModificationState();
299         if(LOG.isDebugEnabled()) {
300             LOG.debug("Tx {} delete {}", identifier, path);
301         }
302         createTransactionIfMissing(actorContext, path);
303
304         transactionContext(path).deleteData(path);
305     }
306
307     @Override
308     public DOMStoreThreePhaseCommitCohort ready() {
309
310         checkModificationState();
311
312         inReadyState = true;
313
314         if(LOG.isDebugEnabled()) {
315             LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
316                 remoteTransactionPaths.size());
317         }
318         List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
319
320         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
321
322             if(LOG.isDebugEnabled()) {
323                 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
324                     transactionContext.getShardName());
325             }
326             cohortPathFutures.add(transactionContext.readyTransaction());
327         }
328
329         if(transactionChainProxy != null){
330             transactionChainProxy.onTransactionReady(cohortPathFutures);
331         }
332
333         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
334                 identifier.toString());
335     }
336
337     @Override
338     public Object getIdentifier() {
339         return this.identifier;
340     }
341
342     @Override
343     public void close() {
344         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
345             transactionContext.closeTransaction();
346         }
347
348         remoteTransactionPaths.clear();
349
350         if(transactionType == TransactionType.READ_ONLY) {
351             remoteTransactionActors.clear();
352             remoteTransactionActorsMB.set(true);
353         }
354     }
355
356     private TransactionContext transactionContext(YangInstanceIdentifier path){
357         String shardName = shardNameFromIdentifier(path);
358         return remoteTransactionPaths.get(shardName);
359     }
360
361     private String shardNameFromIdentifier(YangInstanceIdentifier path){
362         return ShardStrategyFactory.getStrategy(path).findShard(path);
363     }
364
365     private void createTransactionIfMissing(ActorContext actorContext,
366         YangInstanceIdentifier path) {
367
368         if(transactionChainProxy != null){
369             transactionChainProxy.waitTillCurrentTransactionReady();
370         }
371
372         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
373
374         TransactionContext transactionContext =
375             remoteTransactionPaths.get(shardName);
376
377         if (transactionContext != null) {
378             // A transaction already exists with that shard
379             return;
380         }
381
382         try {
383             Optional<ActorSelection> primaryShard = actorContext.findPrimaryShard(shardName);
384             if (!primaryShard.isPresent()) {
385                 throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName);
386             }
387
388             Object response = actorContext.executeOperation(primaryShard.get(),
389                     new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
390                             getTransactionChainId()).toSerializable());
391             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
392                 CreateTransactionReply reply =
393                     CreateTransactionReply.fromSerializable(response);
394
395                 String transactionPath = reply.getTransactionPath();
396
397                 if(LOG.isDebugEnabled()) {
398                     LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
399                 }
400                 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
401
402                 if (transactionType == TransactionType.READ_ONLY) {
403                     // Add the actor to the remoteTransactionActors list for access by the
404                     // cleanup PhantonReference.
405                     remoteTransactionActors.add(transactionActor);
406
407                     // Write to the memory barrier volatile to publish the above update to the
408                     // remoteTransactionActors list for thread visibility.
409                     remoteTransactionActorsMB.set(true);
410                 }
411
412                 transactionContext = new TransactionContextImpl(shardName, transactionPath,
413                     transactionActor, identifier, actorContext, schemaContext);
414
415                 remoteTransactionPaths.put(shardName, transactionContext);
416             } else {
417                 throw new IllegalArgumentException(String.format(
418                     "Invalid reply type {} for CreateTransaction", response.getClass()));
419             }
420         } catch (Exception e) {
421             if(LOG.isDebugEnabled()) {
422                 LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
423             }
424             remoteTransactionPaths
425                 .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
426         }
427     }
428
429     public String getTransactionChainId() {
430         if(transactionChainProxy == null){
431             return "";
432         }
433         return transactionChainProxy.getTransactionChainId();
434     }
435
436
437     private interface TransactionContext {
438         String getShardName();
439
440         void closeTransaction();
441
442         Future<ActorPath> readyTransaction();
443
444         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
445
446         void deleteData(YangInstanceIdentifier path);
447
448         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
449
450         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
451                 final YangInstanceIdentifier path);
452
453         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
454
455         List<Future<Object>> getRecordedOperationFutures();
456     }
457
458     private static abstract class AbstractTransactionContext implements TransactionContext {
459
460         protected final TransactionIdentifier identifier;
461         protected final String shardName;
462         protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
463
464         AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
465             this.shardName = shardName;
466             this.identifier = identifier;
467         }
468
469         @Override
470         public String getShardName() {
471             return shardName;
472         }
473
474         @Override
475         public List<Future<Object>> getRecordedOperationFutures() {
476             return recordedOperationFutures;
477         }
478     }
479
480     private static class TransactionContextImpl extends AbstractTransactionContext {
481         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
482
483         private final ActorContext actorContext;
484         private final SchemaContext schemaContext;
485         private final String actorPath;
486         private final ActorSelection actor;
487
488         private TransactionContextImpl(String shardName, String actorPath,
489                 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
490                 SchemaContext schemaContext) {
491             super(shardName, identifier);
492             this.actorPath = actorPath;
493             this.actor = actor;
494             this.actorContext = actorContext;
495             this.schemaContext = schemaContext;
496         }
497
498         private ActorSelection getActor() {
499             return actor;
500         }
501
502         private String getResolvedCohortPath(String cohortPath) {
503             return actorContext.resolvePath(actorPath, cohortPath);
504         }
505
506         @Override
507         public void closeTransaction() {
508             if(LOG.isDebugEnabled()) {
509                 LOG.debug("Tx {} closeTransaction called", identifier);
510             }
511             actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
512         }
513
514         @Override
515         public Future<ActorPath> readyTransaction() {
516             if(LOG.isDebugEnabled()) {
517                 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
518                     identifier, recordedOperationFutures.size());
519             }
520             // Send the ReadyTransaction message to the Tx actor.
521
522             final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
523                     new ReadyTransaction().toSerializable());
524
525             // Combine all the previously recorded put/merge/delete operation reply Futures and the
526             // ReadyTransactionReply Future into one Future. If any one fails then the combined
527             // Future will fail. We need all prior operations and the ready operation to succeed
528             // in order to attempt commit.
529
530             List<Future<Object>> futureList =
531                     Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
532             futureList.addAll(recordedOperationFutures);
533             futureList.add(replyFuture);
534
535             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
536                     actorContext.getActorSystem().dispatcher());
537
538             // Transform the combined Future into a Future that returns the cohort actor path from
539             // the ReadyTransactionReply. That's the end result of the ready operation.
540
541             return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
542                 @Override
543                 public ActorPath apply(Iterable<Object> notUsed) {
544                     if(LOG.isDebugEnabled()) {
545                         LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
546                             identifier);
547                     }
548                     // At this point all the Futures succeeded and we need to extract the cohort
549                     // actor path from the ReadyTransactionReply. For the recorded operations, they
550                     // don't return any data so we're only interested that they completed
551                     // successfully. We could be paranoid and verify the correct reply types but
552                     // that really should never happen so it's not worth the overhead of
553                     // de-serializing each reply.
554
555                     // Note the Future get call here won't block as it's complete.
556                     Object serializedReadyReply = replyFuture.value().get().get();
557                     if(serializedReadyReply.getClass().equals(
558                                                      ReadyTransactionReply.SERIALIZABLE_CLASS)) {
559                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
560                                 actorContext.getActorSystem(), serializedReadyReply);
561
562                         String resolvedCohortPath = getResolvedCohortPath(
563                                 reply.getCohortPath().toString());
564
565                         if(LOG.isDebugEnabled()) {
566                             LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
567                                 identifier, resolvedCohortPath);
568                         }
569                         return actorContext.actorFor(resolvedCohortPath);
570                     } else {
571                         // Throwing an exception here will fail the Future.
572
573                         throw new IllegalArgumentException(String.format("Invalid reply type {}",
574                                 serializedReadyReply.getClass()));
575                     }
576                 }
577             }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
578         }
579
580         @Override
581         public void deleteData(YangInstanceIdentifier path) {
582             if(LOG.isDebugEnabled()) {
583                 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
584             }
585             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
586                     new DeleteData(path).toSerializable()));
587         }
588
589         @Override
590         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
591             if(LOG.isDebugEnabled()) {
592                 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
593             }
594             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
595                     new MergeData(path, data, schemaContext).toSerializable()));
596         }
597
598         @Override
599         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
600             if(LOG.isDebugEnabled()) {
601                 LOG.debug("Tx {} writeData called path = {}", identifier, path);
602             }
603             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
604                     new WriteData(path, data, schemaContext).toSerializable()));
605         }
606
607         @Override
608         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
609                 final YangInstanceIdentifier path) {
610
611             if(LOG.isDebugEnabled()) {
612                 LOG.debug("Tx {} readData called path = {}", identifier, path);
613             }
614             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
615
616             // If there were any previous recorded put/merge/delete operation reply Futures then we
617             // must wait for them to successfully complete. This is necessary to honor the read
618             // uncommitted semantics of the public API contract. If any one fails then fail the read.
619
620             if(recordedOperationFutures.isEmpty()) {
621                 finishReadData(path, returnFuture);
622             } else {
623                 if(LOG.isDebugEnabled()) {
624                     LOG.debug("Tx {} readData: verifying {} previous recorded operations",
625                         identifier, recordedOperationFutures.size());
626                 }
627                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
628                 // Futures#sequence accesses the passed List on a different thread, as
629                 // recordedOperationFutures is not synchronized.
630
631                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
632                         Lists.newArrayList(recordedOperationFutures),
633                         actorContext.getActorSystem().dispatcher());
634                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
635                     @Override
636                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
637                             throws Throwable {
638                         if(failure != null) {
639                             if(LOG.isDebugEnabled()) {
640                                 LOG.debug("Tx {} readData: a recorded operation failed: {}",
641                                     identifier, failure);
642                             }
643                             returnFuture.setException(new ReadFailedException(
644                                     "The read could not be performed because a previous put, merge,"
645                                     + "or delete operation failed", failure));
646                         } else {
647                             finishReadData(path, returnFuture);
648                         }
649                     }
650                 };
651
652                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
653             }
654
655             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
656         }
657
658         private void finishReadData(final YangInstanceIdentifier path,
659                 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
660
661             if(LOG.isDebugEnabled()) {
662                 LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
663             }
664             OnComplete<Object> onComplete = new OnComplete<Object>() {
665                 @Override
666                 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
667                     if(failure != null) {
668                         if(LOG.isDebugEnabled()) {
669                             LOG.debug("Tx {} read operation failed: {}", identifier, failure);
670                         }
671                         returnFuture.setException(new ReadFailedException(
672                                 "Error reading data for path " + path, failure));
673
674                     } else {
675                         if(LOG.isDebugEnabled()) {
676                             LOG.debug("Tx {} read operation succeeded", identifier, failure);
677                         }
678                         if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
679                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
680                                     path, readResponse);
681                             if (reply.getNormalizedNode() == null) {
682                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
683                             } else {
684                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
685                                         reply.getNormalizedNode()));
686                             }
687                         } else {
688                             returnFuture.setException(new ReadFailedException(
689                                     "Invalid response reading data for path " + path));
690                         }
691                     }
692                 }
693             };
694
695             Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
696                     new ReadData(path).toSerializable());
697             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
698         }
699
700         @Override
701         public CheckedFuture<Boolean, ReadFailedException> dataExists(
702                 final YangInstanceIdentifier path) {
703
704             if(LOG.isDebugEnabled()) {
705                 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
706             }
707             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
708
709             // If there were any previous recorded put/merge/delete operation reply Futures then we
710             // must wait for them to successfully complete. This is necessary to honor the read
711             // uncommitted semantics of the public API contract. If any one fails then fail this
712             // request.
713
714             if(recordedOperationFutures.isEmpty()) {
715                 finishDataExists(path, returnFuture);
716             } else {
717                 if(LOG.isDebugEnabled()) {
718                     LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
719                         identifier, recordedOperationFutures.size());
720                 }
721                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
722                 // Futures#sequence accesses the passed List on a different thread, as
723                 // recordedOperationFutures is not synchronized.
724
725                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
726                         Lists.newArrayList(recordedOperationFutures),
727                         actorContext.getActorSystem().dispatcher());
728                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
729                     @Override
730                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
731                             throws Throwable {
732                         if(failure != null) {
733                             if(LOG.isDebugEnabled()) {
734                                 LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
735                                     identifier, failure);
736                             }
737                             returnFuture.setException(new ReadFailedException(
738                                     "The data exists could not be performed because a previous "
739                                     + "put, merge, or delete operation failed", failure));
740                         } else {
741                             finishDataExists(path, returnFuture);
742                         }
743                     }
744                 };
745
746                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
747             }
748
749             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
750         }
751
752         private void finishDataExists(final YangInstanceIdentifier path,
753                 final SettableFuture<Boolean> returnFuture) {
754
755             if(LOG.isDebugEnabled()) {
756                 LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
757             }
758             OnComplete<Object> onComplete = new OnComplete<Object>() {
759                 @Override
760                 public void onComplete(Throwable failure, Object response) throws Throwable {
761                     if(failure != null) {
762                         if(LOG.isDebugEnabled()) {
763                             LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
764                         }
765                         returnFuture.setException(new ReadFailedException(
766                                 "Error checking data exists for path " + path, failure));
767                     } else {
768                         if(LOG.isDebugEnabled()) {
769                             LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
770                         }
771                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
772                             returnFuture.set(Boolean.valueOf(DataExistsReply.
773                                         fromSerializable(response).exists()));
774                         } else {
775                             returnFuture.setException(new ReadFailedException(
776                                     "Invalid response checking exists for path " + path));
777                         }
778                     }
779                 }
780             };
781
782             Future<Object> future = actorContext.executeOperationAsync(getActor(),
783                     new DataExists(path).toSerializable());
784             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
785         }
786     }
787
788     private static class NoOpTransactionContext extends AbstractTransactionContext {
789
790         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
791
792         private final Exception failure;
793
794         public NoOpTransactionContext(String shardName, Exception failure,
795                 TransactionIdentifier identifier){
796             super(shardName, identifier);
797             this.failure = failure;
798         }
799
800         @Override
801         public void closeTransaction() {
802             if(LOG.isDebugEnabled()) {
803                 LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
804             }
805         }
806
807         @Override
808         public Future<ActorPath> readyTransaction() {
809             if(LOG.isDebugEnabled()) {
810                 LOG.debug("Tx {} readyTransaction called", identifier);
811             }
812             return akka.dispatch.Futures.failed(failure);
813         }
814
815         @Override
816         public void deleteData(YangInstanceIdentifier path) {
817             if(LOG.isDebugEnabled()) {
818                 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
819             }
820         }
821
822         @Override
823         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
824             if(LOG.isDebugEnabled()) {
825                 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
826             }
827         }
828
829         @Override
830         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
831             if(LOG.isDebugEnabled()) {
832                 LOG.debug("Tx {} writeData called path = {}", identifier, path);
833             }
834         }
835
836         @Override
837         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
838             YangInstanceIdentifier path) {
839             if(LOG.isDebugEnabled()) {
840                 LOG.debug("Tx {} readData called path = {}", identifier, path);
841             }
842             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
843                     "Error reading data for path " + path, failure));
844         }
845
846         @Override
847         public CheckedFuture<Boolean, ReadFailedException> dataExists(
848             YangInstanceIdentifier path) {
849             if(LOG.isDebugEnabled()) {
850                 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
851             }
852             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
853                     "Error checking exists for path " + path, failure));
854         }
855     }
856 }