Merge "BUG-2184 Fix config.yang module(add type as a key for modules list)"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorSelection;
12 import akka.dispatch.OnComplete;
13
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.base.FinalizablePhantomReference;
16 import com.google.common.base.FinalizableReferenceQueue;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.collect.Lists;
20 import com.google.common.util.concurrent.CheckedFuture;
21 import com.google.common.util.concurrent.Futures;
22 import com.google.common.util.concurrent.SettableFuture;
23
24 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
25 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
26 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
27 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
28 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
30 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
31 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
32 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
35 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
36 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
38 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
41 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
42 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
43 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import scala.Function1;
51 import scala.concurrent.Future;
52 import scala.runtime.AbstractFunction1;
53
54 import java.util.HashMap;
55 import java.util.List;
56 import java.util.Map;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.atomic.AtomicBoolean;
59 import java.util.concurrent.atomic.AtomicLong;
60
61 /**
62  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
63  * <p>
64  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
65  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
66  * be created on each of those shards by the TransactionProxy
67  *</p>
68  * <p>
69  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
70  * shards will be executed.
71  * </p>
72  */
73 public class TransactionProxy implements DOMStoreReadWriteTransaction {
74
75     private final TransactionChainProxy transactionChainProxy;
76
77
78
79     public enum TransactionType {
80         READ_ONLY,
81         WRITE_ONLY,
82         READ_WRITE
83     }
84
85     static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
86                                                                           Throwable, Throwable>() {
87         @Override
88         public Throwable apply(Throwable failure) {
89             return failure;
90         }
91     };
92
93     private static final AtomicLong counter = new AtomicLong();
94
95     private static final Logger
96         LOG = LoggerFactory.getLogger(TransactionProxy.class);
97
98
99     /**
100      * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
101      * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
102      * trickery to clean up its internal thread when the bundle is unloaded.
103      */
104     private static final FinalizableReferenceQueue phantomReferenceQueue =
105                                                                   new FinalizableReferenceQueue();
106
107     /**
108      * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
109      * necessary because PhantomReferences need a hard reference so they're not garbage collected.
110      * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
111      * and thus becomes eligible for garbage collection.
112      */
113     private static final Map<TransactionProxyCleanupPhantomReference,
114                              TransactionProxyCleanupPhantomReference> phantomReferenceCache =
115                                                                         new ConcurrentHashMap<>();
116
117     /**
118      * A PhantomReference that closes remote transactions for a TransactionProxy when it's
119      * garbage collected. This is used for read-only transactions as they're not explicitly closed
120      * by clients. So the only way to detect that a transaction is no longer in use and it's safe
121      * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
122      * but TransactionProxy instances should generally be short-lived enough to avoid being moved
123      * to the old generation space and thus should be cleaned up in a timely manner as the GC
124      * runs on the young generation (eden, swap1...) space much more frequently.
125      */
126     private static class TransactionProxyCleanupPhantomReference
127                                            extends FinalizablePhantomReference<TransactionProxy> {
128
129         private final List<ActorSelection> remoteTransactionActors;
130         private final AtomicBoolean remoteTransactionActorsMB;
131         private final ActorContext actorContext;
132         private final TransactionIdentifier identifier;
133
134         protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
135             super(referent, phantomReferenceQueue);
136
137             // Note we need to cache the relevant fields from the TransactionProxy as we can't
138             // have a hard reference to the TransactionProxy instance itself.
139
140             remoteTransactionActors = referent.remoteTransactionActors;
141             remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
142             actorContext = referent.actorContext;
143             identifier = referent.identifier;
144         }
145
146         @Override
147         public void finalizeReferent() {
148             LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
149                     remoteTransactionActors.size(), identifier);
150
151             phantomReferenceCache.remove(this);
152
153             // Access the memory barrier volatile to ensure all previous updates to the
154             // remoteTransactionActors list are visible to this thread.
155
156             if(remoteTransactionActorsMB.get()) {
157                 for(ActorSelection actor : remoteTransactionActors) {
158                     LOG.trace("Sending CloseTransaction to {}", actor);
159                     actorContext.sendOperationAsync(actor,
160                         new CloseTransaction().toSerializable());
161                 }
162             }
163         }
164     }
165
166     /**
167      * Stores the remote Tx actors for each requested data store path to be used by the
168      * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
169      * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
170      * remoteTransactionActors list so they will be visible to the thread accessing the
171      * PhantomReference.
172      */
173     private List<ActorSelection> remoteTransactionActors;
174     private AtomicBoolean remoteTransactionActorsMB;
175
176     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
177
178     private final TransactionType transactionType;
179     private final ActorContext actorContext;
180     private final TransactionIdentifier identifier;
181     private final SchemaContext schemaContext;
182     private boolean inReadyState;
183
184     public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
185         this(actorContext, transactionType, null);
186     }
187
188     @VisibleForTesting
189     List<Future<Object>> getRecordedOperationFutures() {
190         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
191         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
192             recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
193         }
194
195         return recordedOperationFutures;
196     }
197
198     public TransactionProxy(ActorContext actorContext, TransactionType transactionType, TransactionChainProxy transactionChainProxy) {
199         this.actorContext = Preconditions.checkNotNull(actorContext,
200             "actorContext should not be null");
201         this.transactionType = Preconditions.checkNotNull(transactionType,
202             "transactionType should not be null");
203         this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
204             "schemaContext should not be null");
205         this.transactionChainProxy = transactionChainProxy;
206
207         String memberName = actorContext.getCurrentMemberName();
208         if(memberName == null){
209             memberName = "UNKNOWN-MEMBER";
210         }
211
212         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
213             counter.getAndIncrement()).build();
214
215         if(transactionType == TransactionType.READ_ONLY) {
216             // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
217             // to close the remote Tx's when this instance is no longer in use and is garbage
218             // collected.
219
220             remoteTransactionActors = Lists.newArrayList();
221             remoteTransactionActorsMB = new AtomicBoolean();
222
223             TransactionProxyCleanupPhantomReference cleanup =
224                 new TransactionProxyCleanupPhantomReference(this);
225             phantomReferenceCache.put(cleanup, cleanup);
226         }
227         if(LOG.isDebugEnabled()) {
228             LOG.debug("Created txn {} of type {}", identifier, transactionType);
229         }
230     }
231
232     @Override
233     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
234             final YangInstanceIdentifier path) {
235
236         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
237                 "Read operation on write-only transaction is not allowed");
238
239         if(LOG.isDebugEnabled()) {
240             LOG.debug("Tx {} read {}", identifier, path);
241         }
242         createTransactionIfMissing(actorContext, path);
243
244         return transactionContext(path).readData(path);
245     }
246
247     @Override
248     public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
249
250         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
251                 "Exists operation on write-only transaction is not allowed");
252
253         if(LOG.isDebugEnabled()) {
254             LOG.debug("Tx {} exists {}", identifier, path);
255         }
256         createTransactionIfMissing(actorContext, path);
257
258         return transactionContext(path).dataExists(path);
259     }
260
261     private void checkModificationState() {
262         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
263                 "Modification operation on read-only transaction is not allowed");
264         Preconditions.checkState(!inReadyState,
265                 "Transaction is sealed - further modifications are not allowed");
266     }
267
268     @Override
269     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
270
271         checkModificationState();
272
273         if(LOG.isDebugEnabled()) {
274             LOG.debug("Tx {} write {}", identifier, path);
275         }
276         createTransactionIfMissing(actorContext, path);
277
278         transactionContext(path).writeData(path, data);
279     }
280
281     @Override
282     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
283
284         checkModificationState();
285
286         if(LOG.isDebugEnabled()) {
287             LOG.debug("Tx {} merge {}", identifier, path);
288         }
289         createTransactionIfMissing(actorContext, path);
290
291         transactionContext(path).mergeData(path, data);
292     }
293
294     @Override
295     public void delete(YangInstanceIdentifier path) {
296
297         checkModificationState();
298         if(LOG.isDebugEnabled()) {
299             LOG.debug("Tx {} delete {}", identifier, path);
300         }
301         createTransactionIfMissing(actorContext, path);
302
303         transactionContext(path).deleteData(path);
304     }
305
306     @Override
307     public DOMStoreThreePhaseCommitCohort ready() {
308
309         checkModificationState();
310
311         inReadyState = true;
312
313         if(LOG.isDebugEnabled()) {
314             LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
315                 remoteTransactionPaths.size());
316         }
317         List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
318
319         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
320
321             if(LOG.isDebugEnabled()) {
322                 LOG.debug("Tx {} Readying transaction for shard {}", identifier,
323                     transactionContext.getShardName());
324             }
325             cohortFutures.add(transactionContext.readyTransaction());
326         }
327
328         if(transactionChainProxy != null){
329             transactionChainProxy.onTransactionReady(cohortFutures);
330         }
331
332         return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
333                 identifier.toString());
334     }
335
336     @Override
337     public Object getIdentifier() {
338         return this.identifier;
339     }
340
341     @Override
342     public void close() {
343         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
344             transactionContext.closeTransaction();
345         }
346
347         remoteTransactionPaths.clear();
348
349         if(transactionType == TransactionType.READ_ONLY) {
350             remoteTransactionActors.clear();
351             remoteTransactionActorsMB.set(true);
352         }
353     }
354
355     private TransactionContext transactionContext(YangInstanceIdentifier path){
356         String shardName = shardNameFromIdentifier(path);
357         return remoteTransactionPaths.get(shardName);
358     }
359
360     private String shardNameFromIdentifier(YangInstanceIdentifier path){
361         return ShardStrategyFactory.getStrategy(path).findShard(path);
362     }
363
364     private void createTransactionIfMissing(ActorContext actorContext,
365         YangInstanceIdentifier path) {
366
367         if(transactionChainProxy != null){
368             transactionChainProxy.waitTillCurrentTransactionReady();
369         }
370
371         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
372
373         TransactionContext transactionContext =
374             remoteTransactionPaths.get(shardName);
375
376         if (transactionContext != null) {
377             // A transaction already exists with that shard
378             return;
379         }
380
381         try {
382             Optional<ActorSelection> primaryShard = actorContext.findPrimaryShard(shardName);
383             if (!primaryShard.isPresent()) {
384                 throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName);
385             }
386
387             Object response = actorContext.executeOperation(primaryShard.get(),
388                 new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
389                     getTransactionChainId()).toSerializable());
390             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
391                 CreateTransactionReply reply =
392                     CreateTransactionReply.fromSerializable(response);
393
394                 String transactionPath = reply.getTransactionPath();
395
396                 if(LOG.isDebugEnabled()) {
397                     LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
398                 }
399                 ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
400
401                 if (transactionType == TransactionType.READ_ONLY) {
402                     // Add the actor to the remoteTransactionActors list for access by the
403                     // cleanup PhantonReference.
404                     remoteTransactionActors.add(transactionActor);
405
406                     // Write to the memory barrier volatile to publish the above update to the
407                     // remoteTransactionActors list for thread visibility.
408                     remoteTransactionActorsMB.set(true);
409                 }
410
411                 // TxActor is always created where the leader of the shard is.
412                 // Check if TxActor is created in the same node
413                 boolean isTxActorLocal = actorContext.isLocalPath(transactionPath);
414
415                 transactionContext = new TransactionContextImpl(shardName, transactionPath,
416                     transactionActor, identifier, actorContext, schemaContext, isTxActorLocal);
417
418                 remoteTransactionPaths.put(shardName, transactionContext);
419             } else {
420                 throw new IllegalArgumentException(String.format(
421                     "Invalid reply type {} for CreateTransaction", response.getClass()));
422             }
423         } catch (Exception e) {
424             if(LOG.isDebugEnabled()) {
425                 LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
426             }
427             remoteTransactionPaths
428                 .put(shardName, new NoOpTransactionContext(shardName, e, identifier));
429         }
430     }
431
432     public String getTransactionChainId() {
433         if(transactionChainProxy == null){
434             return "";
435         }
436         return transactionChainProxy.getTransactionChainId();
437     }
438
439
440     private interface TransactionContext {
441         String getShardName();
442
443         void closeTransaction();
444
445         Future<ActorSelection> readyTransaction();
446
447         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
448
449         void deleteData(YangInstanceIdentifier path);
450
451         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
452
453         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
454                 final YangInstanceIdentifier path);
455
456         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
457
458         List<Future<Object>> getRecordedOperationFutures();
459     }
460
461     private static abstract class AbstractTransactionContext implements TransactionContext {
462
463         protected final TransactionIdentifier identifier;
464         protected final String shardName;
465         protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
466
467         AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
468             this.shardName = shardName;
469             this.identifier = identifier;
470         }
471
472         @Override
473         public String getShardName() {
474             return shardName;
475         }
476
477         @Override
478         public List<Future<Object>> getRecordedOperationFutures() {
479             return recordedOperationFutures;
480         }
481     }
482
483     private static class TransactionContextImpl extends AbstractTransactionContext {
484         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
485
486         private final ActorContext actorContext;
487         private final SchemaContext schemaContext;
488         private final String actorPath;
489         private final ActorSelection actor;
490         private final boolean isTxActorLocal;
491
492         private TransactionContextImpl(String shardName, String actorPath,
493                 ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
494                 SchemaContext schemaContext, boolean isTxActorLocal) {
495             super(shardName, identifier);
496             this.actorPath = actorPath;
497             this.actor = actor;
498             this.actorContext = actorContext;
499             this.schemaContext = schemaContext;
500             this.isTxActorLocal = isTxActorLocal;
501         }
502
503         private ActorSelection getActor() {
504             return actor;
505         }
506
507         @Override
508         public void closeTransaction() {
509             if(LOG.isDebugEnabled()) {
510                 LOG.debug("Tx {} closeTransaction called", identifier);
511             }
512             actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
513         }
514
515         @Override
516         public Future<ActorSelection> readyTransaction() {
517             if(LOG.isDebugEnabled()) {
518                 LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
519                     identifier, recordedOperationFutures.size());
520             }
521             // Send the ReadyTransaction message to the Tx actor.
522
523             ReadyTransaction readyTransaction = new ReadyTransaction();
524             final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
525                 isTxActorLocal ? readyTransaction : readyTransaction.toSerializable());
526
527             // Combine all the previously recorded put/merge/delete operation reply Futures and the
528             // ReadyTransactionReply Future into one Future. If any one fails then the combined
529             // Future will fail. We need all prior operations and the ready operation to succeed
530             // in order to attempt commit.
531
532             List<Future<Object>> futureList =
533                     Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
534             futureList.addAll(recordedOperationFutures);
535             futureList.add(replyFuture);
536
537             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
538                     actorContext.getActorSystem().dispatcher());
539
540             // Transform the combined Future into a Future that returns the cohort actor path from
541             // the ReadyTransactionReply. That's the end result of the ready operation.
542
543             return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorSelection>() {
544                 @Override
545                 public ActorSelection apply(Iterable<Object> notUsed) {
546                     if(LOG.isDebugEnabled()) {
547                         LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
548                             identifier);
549                     }
550                     // At this point all the Futures succeeded and we need to extract the cohort
551                     // actor path from the ReadyTransactionReply. For the recorded operations, they
552                     // don't return any data so we're only interested that they completed
553                     // successfully. We could be paranoid and verify the correct reply types but
554                     // that really should never happen so it's not worth the overhead of
555                     // de-serializing each reply.
556
557                     // Note the Future get call here won't block as it's complete.
558                     Object serializedReadyReply = replyFuture.value().get().get();
559                     if (serializedReadyReply instanceof ReadyTransactionReply) {
560                         return actorContext.actorSelection(((ReadyTransactionReply)serializedReadyReply).getCohortPath());
561
562                     } else if(serializedReadyReply.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
563                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(serializedReadyReply);
564                         return actorContext.actorSelection(reply.getCohortPath());
565
566                     } else {
567                         // Throwing an exception here will fail the Future.
568                         throw new IllegalArgumentException(String.format("Invalid reply type {}",
569                                 serializedReadyReply.getClass()));
570                     }
571                 }
572             }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
573         }
574
575         @Override
576         public void deleteData(YangInstanceIdentifier path) {
577             if(LOG.isDebugEnabled()) {
578                 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
579             }
580
581             DeleteData deleteData = new DeleteData(path);
582             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
583                 isTxActorLocal ? deleteData : deleteData.toSerializable()));
584         }
585
586         @Override
587         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
588             if(LOG.isDebugEnabled()) {
589                 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
590             }
591
592             MergeData mergeData = new MergeData(path, data, schemaContext);
593             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
594                 isTxActorLocal ? mergeData : mergeData.toSerializable()));
595         }
596
597         @Override
598         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
599             if(LOG.isDebugEnabled()) {
600                 LOG.debug("Tx {} writeData called path = {}", identifier, path);
601             }
602
603             WriteData writeData = new WriteData(path, data, schemaContext);
604             recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
605                 isTxActorLocal ? writeData : writeData.toSerializable()));
606         }
607
608         @Override
609         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
610                 final YangInstanceIdentifier path) {
611
612             if(LOG.isDebugEnabled()) {
613                 LOG.debug("Tx {} readData called path = {}", identifier, path);
614             }
615             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
616
617             // If there were any previous recorded put/merge/delete operation reply Futures then we
618             // must wait for them to successfully complete. This is necessary to honor the read
619             // uncommitted semantics of the public API contract. If any one fails then fail the read.
620
621             if(recordedOperationFutures.isEmpty()) {
622                 finishReadData(path, returnFuture);
623             } else {
624                 if(LOG.isDebugEnabled()) {
625                     LOG.debug("Tx {} readData: verifying {} previous recorded operations",
626                         identifier, recordedOperationFutures.size());
627                 }
628                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
629                 // Futures#sequence accesses the passed List on a different thread, as
630                 // recordedOperationFutures is not synchronized.
631
632                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
633                         Lists.newArrayList(recordedOperationFutures),
634                         actorContext.getActorSystem().dispatcher());
635
636                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
637                     @Override
638                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
639                             throws Throwable {
640                         if(failure != null) {
641                             if(LOG.isDebugEnabled()) {
642                                 LOG.debug("Tx {} readData: a recorded operation failed: {}",
643                                     identifier, failure);
644                             }
645                             returnFuture.setException(new ReadFailedException(
646                                     "The read could not be performed because a previous put, merge,"
647                                     + "or delete operation failed", failure));
648                         } else {
649                             finishReadData(path, returnFuture);
650                         }
651                     }
652                 };
653
654                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
655             }
656
657             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
658         }
659
660         private void finishReadData(final YangInstanceIdentifier path,
661                 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
662
663             if(LOG.isDebugEnabled()) {
664                 LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
665             }
666             OnComplete<Object> onComplete = new OnComplete<Object>() {
667                 @Override
668                 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
669                     if(failure != null) {
670                         if(LOG.isDebugEnabled()) {
671                             LOG.debug("Tx {} read operation failed: {}", identifier, failure);
672                         }
673                         returnFuture.setException(new ReadFailedException(
674                                 "Error reading data for path " + path, failure));
675
676                     } else {
677                         if(LOG.isDebugEnabled()) {
678                             LOG.debug("Tx {} read operation succeeded", identifier, failure);
679                         }
680
681                         if (readResponse instanceof ReadDataReply) {
682                             ReadDataReply reply = (ReadDataReply) readResponse;
683                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
684
685                         } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
686                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
687                             returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
688
689                         } else {
690                             returnFuture.setException(new ReadFailedException(
691                                 "Invalid response reading data for path " + path));
692                         }
693                     }
694                 }
695             };
696
697             ReadData readData = new ReadData(path);
698             Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
699                 isTxActorLocal ? readData : readData.toSerializable());
700
701             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
702         }
703
704         @Override
705         public CheckedFuture<Boolean, ReadFailedException> dataExists(
706                 final YangInstanceIdentifier path) {
707
708             if(LOG.isDebugEnabled()) {
709                 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
710             }
711             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
712
713             // If there were any previous recorded put/merge/delete operation reply Futures then we
714             // must wait for them to successfully complete. This is necessary to honor the read
715             // uncommitted semantics of the public API contract. If any one fails then fail this
716             // request.
717
718             if(recordedOperationFutures.isEmpty()) {
719                 finishDataExists(path, returnFuture);
720             } else {
721                 if(LOG.isDebugEnabled()) {
722                     LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
723                         identifier, recordedOperationFutures.size());
724                 }
725                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
726                 // Futures#sequence accesses the passed List on a different thread, as
727                 // recordedOperationFutures is not synchronized.
728
729                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
730                         Lists.newArrayList(recordedOperationFutures),
731                         actorContext.getActorSystem().dispatcher());
732                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
733                     @Override
734                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
735                             throws Throwable {
736                         if(failure != null) {
737                             if(LOG.isDebugEnabled()) {
738                                 LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
739                                     identifier, failure);
740                             }
741                             returnFuture.setException(new ReadFailedException(
742                                     "The data exists could not be performed because a previous "
743                                     + "put, merge, or delete operation failed", failure));
744                         } else {
745                             finishDataExists(path, returnFuture);
746                         }
747                     }
748                 };
749
750                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
751             }
752
753             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
754         }
755
756         private void finishDataExists(final YangInstanceIdentifier path,
757                 final SettableFuture<Boolean> returnFuture) {
758
759             if(LOG.isDebugEnabled()) {
760                 LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
761             }
762             OnComplete<Object> onComplete = new OnComplete<Object>() {
763                 @Override
764                 public void onComplete(Throwable failure, Object response) throws Throwable {
765                     if(failure != null) {
766                         if(LOG.isDebugEnabled()) {
767                             LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
768                         }
769                         returnFuture.setException(new ReadFailedException(
770                                 "Error checking data exists for path " + path, failure));
771                     } else {
772                         if(LOG.isDebugEnabled()) {
773                             LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
774                         }
775
776                         if (response instanceof DataExistsReply) {
777                             returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
778
779                         } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
780                             returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
781
782                         } else {
783                             returnFuture.setException(new ReadFailedException(
784                                     "Invalid response checking exists for path " + path));
785                         }
786                     }
787                 }
788             };
789
790             DataExists dataExists = new DataExists(path);
791             Future<Object> future = actorContext.executeOperationAsync(getActor(),
792                 isTxActorLocal ? dataExists : dataExists.toSerializable());
793
794             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
795         }
796     }
797
798     private static class NoOpTransactionContext extends AbstractTransactionContext {
799
800         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
801
802         private final Exception failure;
803
804         public NoOpTransactionContext(String shardName, Exception failure,
805                 TransactionIdentifier identifier){
806             super(shardName, identifier);
807             this.failure = failure;
808         }
809
810         @Override
811         public void closeTransaction() {
812             if(LOG.isDebugEnabled()) {
813                 LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
814             }
815         }
816
817         @Override
818         public Future<ActorSelection> readyTransaction() {
819             if(LOG.isDebugEnabled()) {
820                 LOG.debug("Tx {} readyTransaction called", identifier);
821             }
822             return akka.dispatch.Futures.failed(failure);
823         }
824
825         @Override
826         public void deleteData(YangInstanceIdentifier path) {
827             if(LOG.isDebugEnabled()) {
828                 LOG.debug("Tx {} deleteData called path = {}", identifier, path);
829             }
830         }
831
832         @Override
833         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
834             if(LOG.isDebugEnabled()) {
835                 LOG.debug("Tx {} mergeData called path = {}", identifier, path);
836             }
837         }
838
839         @Override
840         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
841             if(LOG.isDebugEnabled()) {
842                 LOG.debug("Tx {} writeData called path = {}", identifier, path);
843             }
844         }
845
846         @Override
847         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
848             YangInstanceIdentifier path) {
849             if(LOG.isDebugEnabled()) {
850                 LOG.debug("Tx {} readData called path = {}", identifier, path);
851             }
852             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
853                     "Error reading data for path " + path, failure));
854         }
855
856         @Override
857         public CheckedFuture<Boolean, ReadFailedException> dataExists(
858             YangInstanceIdentifier path) {
859             if(LOG.isDebugEnabled()) {
860                 LOG.debug("Tx {} dataExists called path = {}", identifier, path);
861             }
862             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
863                     "Error checking exists for path " + path, failure));
864         }
865     }
866 }