+
+ @Override public void closeTransaction() {
+ getActor().tell(
+ new CloseTransaction().toSerializable(), null);
+ }
+
+ @Override public Object readyTransaction() {
+ return actorContext.executeRemoteOperation(getActor(),
+ new ReadyTransaction().toSerializable(),
+ ActorContext.ASK_DURATION
+ );
+
+ }
+
+ @Override public void deleteData(YangInstanceIdentifier path) {
+ getActor().tell(new DeleteData(path).toSerializable(), null);
+ }
+
+ @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data){
+ getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
+ }
+
+ @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path) {
+
+ Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
+
+ @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
+ Object response = actorContext
+ .executeRemoteOperation(getActor(), new ReadData(path).toSerializable(),
+ ActorContext.ASK_DURATION);
+ if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
+ ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
+ if(reply.getNormalizedNode() == null){
+ return Optional.absent();
+ }
+ return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
+ }
+
+ return Optional.absent();
+ }
+ };
+
+ return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
+ }
+
+ @Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ getActor().tell(new WriteData(path, data, schemaContext).toSerializable(), null);
+ }
+
+ }
+
+ private class NoOpTransactionContext implements TransactionContext {
+
+ private final Logger
+ LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
+
+ private final String shardName;
+
+ private ActorRef cohort;
+
+ public NoOpTransactionContext(String shardName){
+ this.shardName = shardName;
+ }
+ @Override public String getShardName() {
+ return shardName;
+
+ }
+
+ @Override public String getResolvedCohortPath(String cohortPath) {
+ return cohort.path().toString();
+ }
+
+ @Override public void closeTransaction() {
+ LOG.warn("txn {} closeTransaction called", identifier);
+ }
+
+ @Override public Object readyTransaction() {
+ LOG.warn("txn {} readyTransaction called", identifier);
+ cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
+ return new ReadyTransactionReply(cohort.path()).toSerializable();
+ }
+
+ @Override public void deleteData(YangInstanceIdentifier path) {
+ LOG.warn("txt {} deleteData called path = {}", identifier, path);
+ }
+
+ @Override public void mergeData(YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data) {
+ LOG.warn("txn {} mergeData called path = {}", identifier, path);
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ YangInstanceIdentifier path) {
+ LOG.warn("txn {} readData called path = {}", identifier, path);
+ return Futures.immediateCheckedFuture(
+ Optional.<NormalizedNode<?, ?>>absent());
+ }
+
+ @Override public void writeData(YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data) {
+ LOG.warn("txn {} writeData called path = {}", identifier, path);
+ }