+
+ private interface TransactionContext {
+ String getShardName();
+
+ String getResolvedCohortPath(String cohortPath);
+
+ public void closeTransaction();
+
+ public Object readyTransaction();
+
+ void deleteData(YangInstanceIdentifier path);
+
+ void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
+
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(final YangInstanceIdentifier path);
+
+ void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
+ }
+
+
+ private class TransactionContextImpl implements TransactionContext{
+ private final String shardName;
+ private final String actorPath;
+ private final ActorSelection actor;
+
+
+ private TransactionContextImpl(String shardName, String actorPath,
+ ActorSelection actor) {
+ this.shardName = shardName;
+ this.actorPath = actorPath;
+ this.actor = actor;
+ }
+
+ @Override public String getShardName() {
+ return shardName;
+ }
+
+ private ActorSelection getActor() {
+ return actor;
+ }
+
+ @Override public String getResolvedCohortPath(String cohortPath){
+ return actorContext.resolvePath(actorPath, cohortPath);
+ }
+
+ @Override public void closeTransaction() {
+ getActor().tell(
+ new CloseTransaction().toSerializable(), null);
+ }
+
+ @Override public Object readyTransaction() {
+ return actorContext.executeRemoteOperation(getActor(),
+ new ReadyTransaction().toSerializable(),
+ ActorContext.ASK_DURATION
+ );
+
+ }
+
+ @Override public void deleteData(YangInstanceIdentifier path) {
+ getActor().tell(new DeleteData(path).toSerializable(), null);
+ }
+
+ @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data){
+ getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
+ }
+
+ @Override public ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(final YangInstanceIdentifier path) {
+
+ Callable<Optional<NormalizedNode<?,?>>> call = new Callable() {
+
+ @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
+ Object response = actorContext
+ .executeRemoteOperation(getActor(), new ReadData(path).toSerializable(),
+ ActorContext.ASK_DURATION);
+ if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
+ ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
+ if(reply.getNormalizedNode() == null){
+ return Optional.absent();
+ }
+ //FIXME : A cast should not be required here ???
+ return (Optional<NormalizedNode<?, ?>>) Optional.of(reply.getNormalizedNode());
+ }
+
+ return Optional.absent();
+ }
+ };
+
+ ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
+ future = ListenableFutureTask.create(call);
+
+ executor.submit(future);
+
+ return future;
+ }
+
+ @Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ getActor().tell(new WriteData(path, data, schemaContext).toSerializable(), null);
+ }
+
+ }
+
+ private class NoOpTransactionContext implements TransactionContext {
+
+ private final Logger
+ LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
+
+ private final String shardName;
+
+ private ActorRef cohort;
+
+ public NoOpTransactionContext(String shardName){
+ this.shardName = shardName;
+ }
+ @Override public String getShardName() {
+ return shardName;
+
+ }
+
+ @Override public String getResolvedCohortPath(String cohortPath) {
+ return cohort.path().toString();
+ }
+
+ @Override public void closeTransaction() {
+ LOG.error("closeTransaction called");
+ }
+
+ @Override public Object readyTransaction() {
+ LOG.error("readyTransaction called");
+ cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
+ return new ReadyTransactionReply(cohort.path()).toSerializable();
+ }
+
+ @Override public void deleteData(YangInstanceIdentifier path) {
+ LOG.error("deleteData called path = {}", path);
+ }
+
+ @Override public void mergeData(YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data) {
+ LOG.error("mergeData called path = {}", path);
+ }
+
+ @Override
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(
+ YangInstanceIdentifier path) {
+ LOG.error("readData called path = {}", path);
+ return Futures.immediateFuture(
+ Optional.<NormalizedNode<?, ?>>absent());
+ }
+
+ @Override public void writeData(YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data) {
+ LOG.error("writeData called path = {}", path);
+ }
+ }
+
+
+