+ @Override
+ public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("Tx {} writeData called path = {}", identifier, path);
+
+ WriteData writeData = new WriteData(path, data, schemaContext);
+ recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
+ isTxActorLocal ? writeData : writeData.toSerializable()));
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path) {
+
+ LOG.debug("Tx {} readData called path = {}", identifier, path);
+
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
+
+ // If there were any previous recorded put/merge/delete operation reply Futures then we
+ // must wait for them to successfully complete. This is necessary to honor the read
+ // uncommitted semantics of the public API contract. If any one fails then fail the read.
+
+ if(recordedOperationFutures.isEmpty()) {
+ finishReadData(path, returnFuture);
+ } else {
+ LOG.debug("Tx {} readData: verifying {} previous recorded operations",
+ identifier, recordedOperationFutures.size());
+
+ // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+ // Futures#sequence accesses the passed List on a different thread, as
+ // recordedOperationFutures is not synchronized.
+
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+ Lists.newArrayList(recordedOperationFutures),
+ actorContext.getActorSystem().dispatcher());
+
+ OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> notUsed)
+ throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {} readData: a recorded operation failed: {}",
+ identifier, failure);
+ returnFuture.setException(new ReadFailedException(
+ "The read could not be performed because a previous put, merge,"
+ + "or delete operation failed", failure));
+ } else {
+ finishReadData(path, returnFuture);
+ }
+ }
+ };
+
+ combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ }
+
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+ }
+
+ private void finishReadData(final YangInstanceIdentifier path,
+ final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
+
+ LOG.debug("Tx {} finishReadData called path = {}", identifier, path);
+
+ OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object readResponse) throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {} read operation failed: {}", identifier, failure);
+ returnFuture.setException(new ReadFailedException(
+ "Error reading data for path " + path, failure));
+
+ } else {
+ LOG.debug("Tx {} read operation succeeded", identifier, failure);
+
+ if (readResponse instanceof ReadDataReply) {
+ ReadDataReply reply = (ReadDataReply) readResponse;
+ returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
+
+ } else if (readResponse.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+ ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, readResponse);
+ returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
+
+ } else {
+ returnFuture.setException(new ReadFailedException(
+ "Invalid response reading data for path " + path));
+ }
+ }
+ }
+ };
+
+ ReadData readData = new ReadData(path);
+ Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
+ isTxActorLocal ? readData : readData.toSerializable());
+
+ readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> dataExists(
+ final YangInstanceIdentifier path) {
+
+ LOG.debug("Tx {} dataExists called path = {}", identifier, path);
+
+ final SettableFuture<Boolean> returnFuture = SettableFuture.create();
+
+ // If there were any previous recorded put/merge/delete operation reply Futures then we
+ // must wait for them to successfully complete. This is necessary to honor the read
+ // uncommitted semantics of the public API contract. If any one fails then fail this
+ // request.
+
+ if(recordedOperationFutures.isEmpty()) {
+ finishDataExists(path, returnFuture);
+ } else {
+ LOG.debug("Tx {} dataExists: verifying {} previous recorded operations",
+ identifier, recordedOperationFutures.size());
+
+ // Note: we make a copy of recordedOperationFutures to be on the safe side in case
+ // Futures#sequence accesses the passed List on a different thread, as
+ // recordedOperationFutures is not synchronized.
+
+ Future<Iterable<Object>> combinedFutures = akka.dispatch.Futures.sequence(
+ Lists.newArrayList(recordedOperationFutures),
+ actorContext.getActorSystem().dispatcher());
+ OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
+ @Override
+ public void onComplete(Throwable failure, Iterable<Object> notUsed)
+ throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {} dataExists: a recorded operation failed: {}",
+ identifier, failure);
+ returnFuture.setException(new ReadFailedException(
+ "The data exists could not be performed because a previous "
+ + "put, merge, or delete operation failed", failure));
+ } else {
+ finishDataExists(path, returnFuture);
+ }
+ }
+ };
+
+ combinedFutures.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
+ }
+
+ return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
+ }
+
+ private void finishDataExists(final YangInstanceIdentifier path,
+ final SettableFuture<Boolean> returnFuture) {
+
+ LOG.debug("Tx {} finishDataExists called path = {}", identifier, path);
+
+ OnComplete<Object> onComplete = new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) throws Throwable {
+ if(failure != null) {
+ LOG.debug("Tx {} dataExists operation failed: {}", identifier, failure);
+ returnFuture.setException(new ReadFailedException(
+ "Error checking data exists for path " + path, failure));
+ } else {
+ LOG.debug("Tx {} dataExists operation succeeded", identifier, failure);
+
+ if (response instanceof DataExistsReply) {
+ returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
+
+ } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+ returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
+
+ } else {
+ returnFuture.setException(new ReadFailedException(
+ "Invalid response checking exists for path " + path));
+ }
+ }
+ }
+ };
+
+ DataExists dataExists = new DataExists(path);
+ Future<Object> future = actorContext.executeOperationAsync(getActor(),
+ isTxActorLocal ? dataExists : dataExists.toSerializable());
+
+ future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());