Merge "Bug 1576: Handle remote failures for write Tx async"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import akka.dispatch.OnComplete;
14
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import com.google.common.collect.Lists;
19 import com.google.common.util.concurrent.CheckedFuture;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.SettableFuture;
22
23 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
24 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
28 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
30 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
36 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
41 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 import scala.Function1;
49 import scala.concurrent.Future;
50 import scala.runtime.AbstractFunction1;
51
52 import java.util.HashMap;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.concurrent.atomic.AtomicLong;
56
57 /**
58  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
59  * <p>
60  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
61  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
62  * be created on each of those shards by the TransactionProxy
63  *</p>
64  * <p>
65  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
66  * shards will be executed.
67  * </p>
68  */
69 public class TransactionProxy implements DOMStoreReadWriteTransaction {
70     public enum TransactionType {
71         READ_ONLY,
72         WRITE_ONLY,
73         READ_WRITE
74     }
75
76     static Function1<Throwable, Throwable> SAME_FAILURE_TRANSFORMER = new AbstractFunction1<
77                                                                           Throwable, Throwable>() {
78         @Override
79         public Throwable apply(Throwable failure) {
80             return failure;
81         }
82     };
83
84     private static final AtomicLong counter = new AtomicLong();
85
86     private static final Logger
87         LOG = LoggerFactory.getLogger(TransactionProxy.class);
88
89
90     private final TransactionType transactionType;
91     private final ActorContext actorContext;
92     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
93     private final TransactionIdentifier identifier;
94     private final SchemaContext schemaContext;
95     private boolean inReadyState;
96
97     public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
98             SchemaContext schemaContext) {
99         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
100         this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
101         this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
102
103         String memberName = actorContext.getCurrentMemberName();
104         if(memberName == null){
105             memberName = "UNKNOWN-MEMBER";
106         }
107
108         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
109                 counter.getAndIncrement()).build();
110
111         LOG.debug("Created txn {} of type {}", identifier, transactionType);
112
113     }
114
115     @VisibleForTesting
116     List<Future<Object>> getRecordedOperationFutures() {
117         List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
118         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
119             recordedOperationFutures.addAll(transactionContext.getRecordedOperationFutures());
120         }
121
122         return recordedOperationFutures;
123     }
124
125     @Override
126     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
127             final YangInstanceIdentifier path) {
128
129         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
130                 "Read operation on write-only transaction is not allowed");
131
132         LOG.debug("Tx {} read {}", identifier, path);
133
134         createTransactionIfMissing(actorContext, path);
135
136         return transactionContext(path).readData(path);
137     }
138
139     @Override
140     public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
141
142         Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
143                 "Exists operation on write-only transaction is not allowed");
144
145         LOG.debug("Tx {} exists {}", identifier, path);
146
147         createTransactionIfMissing(actorContext, path);
148
149         return transactionContext(path).dataExists(path);
150     }
151
152     private void checkModificationState() {
153         Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
154                 "Modification operation on read-only transaction is not allowed");
155         Preconditions.checkState(!inReadyState,
156                 "Transaction is sealed - further modifications are allowed");
157     }
158
159     @Override
160     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
161
162         checkModificationState();
163
164         LOG.debug("Tx {} write {}", identifier, path);
165
166         createTransactionIfMissing(actorContext, path);
167
168         transactionContext(path).writeData(path, data);
169     }
170
171     @Override
172     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
173
174         checkModificationState();
175
176         LOG.debug("Tx {} merge {}", identifier, path);
177
178         createTransactionIfMissing(actorContext, path);
179
180         transactionContext(path).mergeData(path, data);
181     }
182
183     @Override
184     public void delete(YangInstanceIdentifier path) {
185
186         checkModificationState();
187
188         LOG.debug("Tx {} delete {}", identifier, path);
189
190         createTransactionIfMissing(actorContext, path);
191
192         transactionContext(path).deleteData(path);
193     }
194
195     @Override
196     public DOMStoreThreePhaseCommitCohort ready() {
197
198         checkModificationState();
199
200         inReadyState = true;
201
202         LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
203                 remoteTransactionPaths.size());
204
205         List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
206
207         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
208
209             LOG.debug("Tx {} Readying transaction for shard {}", identifier,
210                     transactionContext.getShardName());
211
212             cohortPathFutures.add(transactionContext.readyTransaction());
213         }
214
215         return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
216                 identifier.toString());
217     }
218
219     @Override
220     public Object getIdentifier() {
221         return this.identifier;
222     }
223
224     @Override
225     public void close() {
226         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
227             transactionContext.closeTransaction();
228         }
229     }
230
231     private TransactionContext transactionContext(YangInstanceIdentifier path){
232         String shardName = shardNameFromIdentifier(path);
233         return remoteTransactionPaths.get(shardName);
234     }
235
236     private String shardNameFromIdentifier(YangInstanceIdentifier path){
237         return ShardStrategyFactory.getStrategy(path).findShard(path);
238     }
239
240     private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
241         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
242
243         TransactionContext transactionContext =
244             remoteTransactionPaths.get(shardName);
245
246         if(transactionContext != null){
247             // A transaction already exists with that shard
248             return;
249         }
250
251         try {
252             Object response = actorContext.executeShardOperation(shardName,
253                 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
254                 ActorContext.ASK_DURATION);
255             if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
256                 CreateTransactionReply reply =
257                     CreateTransactionReply.fromSerializable(response);
258
259                 String transactionPath = reply.getTransactionPath();
260
261                 LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
262
263                 ActorSelection transactionActor =
264                     actorContext.actorSelection(transactionPath);
265                 transactionContext =
266                     new TransactionContextImpl(shardName, transactionPath,
267                         transactionActor);
268
269                 remoteTransactionPaths.put(shardName, transactionContext);
270             } else {
271                 throw new IllegalArgumentException(String.format(
272                         "Invalid reply type {} for CreateTransaction", response.getClass()));
273             }
274         } catch(Exception e){
275             LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
276             remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
277         }
278     }
279
280     private interface TransactionContext {
281         String getShardName();
282
283         void closeTransaction();
284
285         Future<ActorPath> readyTransaction();
286
287         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
288
289         void deleteData(YangInstanceIdentifier path);
290
291         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
292
293         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
294                 final YangInstanceIdentifier path);
295
296         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
297
298         List<Future<Object>> getRecordedOperationFutures();
299     }
300
301     private abstract class AbstractTransactionContext implements TransactionContext {
302
303         protected final String shardName;
304         protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
305
306         AbstractTransactionContext(String shardName) {
307             this.shardName = shardName;
308         }
309
310         @Override
311         public String getShardName() {
312             return shardName;
313         }
314
315         @Override
316         public List<Future<Object>> getRecordedOperationFutures() {
317             return recordedOperationFutures;
318         }
319     }
320
321     private class TransactionContextImpl extends AbstractTransactionContext {
322         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
323
324         private final String actorPath;
325         private final ActorSelection actor;
326
327         private TransactionContextImpl(String shardName, String actorPath,
328             ActorSelection actor) {
329             super(shardName);
330             this.actorPath = actorPath;
331             this.actor = actor;
332         }
333
334         private ActorSelection getActor() {
335             return actor;
336         }
337
338         private String getResolvedCohortPath(String cohortPath) {
339             return actorContext.resolvePath(actorPath, cohortPath);
340         }
341
342         @Override
343         public void closeTransaction() {
344             LOG.debug("Tx {} closeTransaction called", identifier);
345             actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
346         }
347
348         @Override
349         public Future<ActorPath> readyTransaction() {
350             LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
351                     identifier, recordedOperationFutures.size());
352
353             // Send the ReadyTransaction message to the Tx actor.
354
355             final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
356                     new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
357
358             // Combine all the previously recorded put/merge/delete operation reply Futures and the
359             // ReadyTransactionReply Future into one Future. If any one fails then the combined
360             // Future will fail. We need all prior operations and the ready operation to succeed
361             // in order to attempt commit.
362
363             List<Future<Object>> futureList =
364                     Lists.newArrayListWithCapacity(recordedOperationFutures.size() + 1);
365             futureList.addAll(recordedOperationFutures);
366             futureList.add(replyFuture);
367
368             Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(futureList,
369                     actorContext.getActorSystem().dispatcher());
370
371             // Transform the combined Future into a Future that returns the cohort actor path from
372             // the ReadyTransactionReply. That's the end result of the ready operation.
373
374             return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
375                 @Override
376                 public ActorPath apply(Iterable<Object> notUsed) {
377
378                     LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
379                             identifier);
380
381                     // At this point all the Futures succeeded and we need to extract the cohort
382                     // actor path from the ReadyTransactionReply. For the recorded operations, they
383                     // don't return any data so we're only interested that they completed
384                     // successfully. We could be paranoid and verify the correct reply types but
385                     // that really should never happen so it's not worth the overhead of
386                     // de-serializing each reply.
387
388                     // Note the Future get call here won't block as it's complete.
389                     Object serializedReadyReply = replyFuture.value().get().get();
390                     if(serializedReadyReply.getClass().equals(
391                                                      ReadyTransactionReply.SERIALIZABLE_CLASS)) {
392                         ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
393                                 actorContext.getActorSystem(), serializedReadyReply);
394
395                         String resolvedCohortPath = getResolvedCohortPath(
396                                 reply.getCohortPath().toString());
397
398                         LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
399                                 identifier, resolvedCohortPath);
400
401                         return actorContext.actorFor(resolvedCohortPath);
402                     } else {
403                         // Throwing an exception here will fail the Future.
404
405                         throw new IllegalArgumentException(String.format("Invalid reply type {}",
406                                 serializedReadyReply.getClass()));
407                     }
408                 }
409             }, SAME_FAILURE_TRANSFORMER, actorContext.getActorSystem().dispatcher());
410         }
411
412         @Override
413         public void deleteData(YangInstanceIdentifier path) {
414             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
415             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
416                     new DeleteData(path).toSerializable(), ActorContext.ASK_DURATION ));
417         }
418
419         @Override
420         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
421             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
422             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
423                     new MergeData(path, data, schemaContext).toSerializable(),
424                     ActorContext.ASK_DURATION));
425         }
426
427         @Override
428         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
429             LOG.debug("Tx {} writeData called path = {}", identifier, path);
430             recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
431                     new WriteData(path, data, schemaContext).toSerializable(),
432                     ActorContext.ASK_DURATION));
433         }
434
435         @Override
436         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
437                 final YangInstanceIdentifier path) {
438
439             LOG.debug("Tx {} readData called path = {}", identifier, path);
440
441             final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
442
443             // If there were any previous recorded put/merge/delete operation reply Futures then we
444             // must wait for them to successfully complete. This is necessary to honor the read
445             // uncommitted semantics of the public API contract. If any one fails then fail the read.
446
447             if(recordedOperationFutures.isEmpty()) {
448                 finishReadData(path, returnFuture);
449             } else {
450                 LOG.debug("Tx {} readData: verifying {} previous recorded operations",
451                         identifier, recordedOperationFutures.size());
452
453                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
454                 // Futures#sequence accesses the passed List on a different thread, as
455                 // recordedOperationFutures is not synchronized.
456
457                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
458                         Lists.newArrayList(recordedOperationFutures),
459                         actorContext.getActorSystem().dispatcher());
460                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
461                     @Override
462                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
463                             throws Throwable {
464                         if(failure != null) {
465                             LOG.debug("Tx {} readData: a recorded operation failed: {}",
466                                     identifier, failure);
467
468                             returnFuture.setException(new ReadFailedException(
469                                     "The read could not be performed because a previous put, merge,"
470                                     + "or delete operation failed", failure));
471                         } else {
472                             finishReadData(path, returnFuture);
473                         }
474                     }
475                 };
476
477                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
478             }
479
480             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
481         }
482
483         private void finishReadData(final YangInstanceIdentifier path,
484                 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
485
486             LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
487
488             OnComplete<Object> onComplete = new OnComplete<Object>() {
489                 @Override
490                 public void onComplete(Throwable failure, Object readResponse) throws Throwable {
491                     if(failure != null) {
492                         LOG.debug("Tx {} read operation failed: {}", identifier, failure);
493
494                         returnFuture.setException(new ReadFailedException(
495                                 "Error reading data for path " + path, failure));
496                     } else {
497                         LOG.debug("Tx {} read operation succeeded", identifier, failure);
498
499                         if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
500                             ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
501                                     path, readResponse);
502                             if (reply.getNormalizedNode() == null) {
503                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
504                             } else {
505                                 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
506                                         reply.getNormalizedNode()));
507                             }
508                         } else {
509                             returnFuture.setException(new ReadFailedException(
510                                     "Invalid response reading data for path " + path));
511                         }
512                     }
513                 }
514             };
515
516             Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
517                     new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
518             readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
519         }
520
521         @Override
522         public CheckedFuture<Boolean, ReadFailedException> dataExists(
523                 final YangInstanceIdentifier path) {
524
525             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
526
527             final SettableFuture<Boolean> returnFuture = SettableFuture.create();
528
529             // If there were any previous recorded put/merge/delete operation reply Futures then we
530             // must wait for them to successfully complete. This is necessary to honor the read
531             // uncommitted semantics of the public API contract. If any one fails then fail this
532             // request.
533
534             if(recordedOperationFutures.isEmpty()) {
535                 finishDataExists(path, returnFuture);
536             } else {
537                 LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
538                         identifier, recordedOperationFutures.size());
539
540                 // Note: we make a copy of recordedOperationFutures to be on the safe side in case
541                 // Futures#sequence accesses the passed List on a different thread, as
542                 // recordedOperationFutures is not synchronized.
543
544                 Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
545                         Lists.newArrayList(recordedOperationFutures),
546                         actorContext.getActorSystem().dispatcher());
547                 OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
548                     @Override
549                     public void onComplete(Throwable failure, Iterable<Object> notUsed)
550                             throws Throwable {
551                         if(failure != null) {
552                             LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
553                                     identifier, failure);
554
555                             returnFuture.setException(new ReadFailedException(
556                                     "The data exists could not be performed because a previous "
557                                     + "put, merge, or delete operation failed", failure));
558                         } else {
559                             finishDataExists(path, returnFuture);
560                         }
561                     }
562                 };
563
564                 combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
565             }
566
567             return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
568         }
569
570         private void finishDataExists(final YangInstanceIdentifier path,
571                 final SettableFuture<Boolean> returnFuture) {
572
573             LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
574
575             OnComplete<Object> onComplete = new OnComplete<Object>() {
576                 @Override
577                 public void onComplete(Throwable failure, Object response) throws Throwable {
578                     if(failure != null) {
579                         LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
580
581                         returnFuture.setException(new ReadFailedException(
582                                 "Error checking data exists for path " + path, failure));
583                     } else {
584                         LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
585
586                         if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
587                             returnFuture.set(Boolean.valueOf(DataExistsReply.
588                                         fromSerializable(response).exists()));
589                         } else {
590                             returnFuture.setException(new ReadFailedException(
591                                     "Invalid response checking exists for path " + path));
592                         }
593                     }
594                 }
595             };
596
597             Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
598                     new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
599             future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
600         }
601     }
602
603     private class NoOpTransactionContext extends AbstractTransactionContext {
604
605         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
606
607         private final Exception failure;
608
609         public NoOpTransactionContext(String shardName, Exception failure){
610             super(shardName);
611             this.failure = failure;
612         }
613
614         @Override
615         public void closeTransaction() {
616             LOG.debug("NoOpTransactionContext {} closeTransaction called", identifier);
617         }
618
619         @Override
620         public Future<ActorPath> readyTransaction() {
621             LOG.debug("Tx {} readyTransaction called", identifier);
622             return akka.dispatch.Futures.failed(failure);
623         }
624
625         @Override
626         public void deleteData(YangInstanceIdentifier path) {
627             LOG.debug("Tx {} deleteData called path = {}", identifier, path);
628         }
629
630         @Override
631         public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
632             LOG.debug("Tx {} mergeData called path = {}", identifier, path);
633         }
634
635         @Override
636         public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
637             LOG.debug("Tx {} writeData called path = {}", identifier, path);
638         }
639
640         @Override
641         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
642             YangInstanceIdentifier path) {
643             LOG.debug("Tx {} readData called path = {}", identifier, path);
644             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
645                     "Error reading data for path " + path, failure));
646         }
647
648         @Override
649         public CheckedFuture<Boolean, ReadFailedException> dataExists(
650             YangInstanceIdentifier path) {
651             LOG.debug("Tx {} dataExists called path = {}", identifier, path);
652             return Futures.immediateFailedCheckedFuture(new ReadFailedException(
653                     "Error checking exists for path " + path, failure));
654         }
655     }
656 }