package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.ActorSelection;
+import akka.actor.Props;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/**
private static final AtomicLong counter = new AtomicLong();
+ private static final Logger
+ LOG = LoggerFactory.getLogger(TransactionProxy.class);
+
+
private final TransactionType transactionType;
private final ActorContext actorContext;
private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
private final String identifier;
- private final ExecutorService executor;
+ private final ListeningExecutorService executor;
private final SchemaContext schemaContext;
public TransactionProxy(
ActorContext actorContext,
TransactionType transactionType,
- ExecutorService executor,
+ ListeningExecutorService executor,
SchemaContext schemaContext
- ) {
+ ) {
this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement();
this.transactionType = transactionType;
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+ final YangInstanceIdentifier path) {
createTransactionIfMissing(actorContext, path);
- final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
-
- Callable<Optional<NormalizedNode<?,?>>> call = new Callable() {
-
- @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
- Object response = actorContext
- .executeRemoteOperation(remoteTransaction, new ReadData(path).toSerializable(),
- ActorContext.ASK_DURATION);
- if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
- ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
- if(reply.getNormalizedNode() == null){
- return Optional.absent();
- }
- //FIXME : A cast should not be required here ???
- return (Optional<NormalizedNode<?, ?>>) Optional.of(reply.getNormalizedNode());
- }
-
- return Optional.absent();
- }
- };
-
- ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
- future = ListenableFutureTask.create(call);
-
- executor.submit(future);
-
- return future;
+ return transactionContext(path).readData(path);
}
@Override
createTransactionIfMissing(actorContext, path);
- final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
- remoteTransaction.tell(new WriteData(path, data, schemaContext).toSerializable(), null);
+ transactionContext(path).writeData(path, data);
}
@Override
createTransactionIfMissing(actorContext, path);
- final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
- remoteTransaction.tell(new MergeData(path, data, schemaContext).toSerializable(), null);
+ transactionContext(path).mergeData(path, data);
}
@Override
createTransactionIfMissing(actorContext, path);
- final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
- remoteTransaction.tell(new DeleteData(path).toSerializable(), null);
+ transactionContext(path).deleteData(path);
}
@Override
List<ActorPath> cohortPaths = new ArrayList<>();
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- Object result = actorContext.executeRemoteOperation(transactionContext.getActor(),
- new ReadyTransaction().toSerializable(),
- ActorContext.ASK_DURATION
- );
+ Object result = transactionContext.readyTransaction();
if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
@Override
public void close() {
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
- transactionContext.getActor().tell(
- new CloseTransaction().toSerializable(), null);
+ transactionContext.closeTransaction();
}
}
- private ActorSelection remoteTransactionFromIdentifier(YangInstanceIdentifier path){
+ private TransactionContext transactionContext(YangInstanceIdentifier path){
String shardName = shardNameFromIdentifier(path);
- return remoteTransactionPaths.get(shardName).getActor();
+ return remoteTransactionPaths.get(shardName);
}
private String shardNameFromIdentifier(YangInstanceIdentifier path){
return;
}
- Object response = actorContext.executeShardOperation(shardName, new CreateTransaction(identifier).toSerializable(), ActorContext.ASK_DURATION);
- if(response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)){
- CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response);
- String transactionPath = actorContext.getRemoteActorPath(shardName, reply.getTransactionPath());
+ try {
+ Object response = actorContext.executeShardOperation(shardName,
+ new CreateTransaction(identifier,this.transactionType.ordinal() ).toSerializable(),
+ ActorContext.ASK_DURATION);
+ if (response.getClass()
+ .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+ CreateTransactionReply reply =
+ CreateTransactionReply.fromSerializable(response);
- ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
- transactionContext = new TransactionContext(shardName, transactionPath, transactionActor);
+ String transactionPath = reply.getTransactionPath();
- remoteTransactionPaths.put(shardName, transactionContext);
+ LOG.info("Received transaction path = {}" , transactionPath );
+
+ ActorSelection transactionActor =
+ actorContext.actorSelection(transactionPath);
+ transactionContext =
+ new TransactionContextImpl(shardName, transactionPath,
+ transactionActor);
+
+ remoteTransactionPaths.put(shardName, transactionContext);
+ }
+ } catch(TimeoutException | PrimaryNotFoundException e){
+ LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
+ remoteTransactionPaths.put(shardName,
+ new NoOpTransactionContext(shardName));
}
}
+ private interface TransactionContext {
+ String getShardName();
+
+ String getResolvedCohortPath(String cohortPath);
+
+ public void closeTransaction();
+
+ public Object readyTransaction();
+
+ void deleteData(YangInstanceIdentifier path);
+
+ void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
+
+ CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path);
+
+ void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
+ }
+
- private class TransactionContext {
+ private class TransactionContextImpl implements TransactionContext{
private final String shardName;
private final String actorPath;
private final ActorSelection actor;
- private TransactionContext(String shardName, String actorPath,
+ private TransactionContextImpl(String shardName, String actorPath,
ActorSelection actor) {
this.shardName = shardName;
this.actorPath = actorPath;
this.actor = actor;
}
-
- public String getShardName() {
+ @Override public String getShardName() {
return shardName;
}
- public String getActorPath() {
- return actorPath;
- }
-
- public ActorSelection getActor() {
+ private ActorSelection getActor() {
return actor;
}
- public String getResolvedCohortPath(String cohortPath){
+ @Override public String getResolvedCohortPath(String cohortPath){
return actorContext.resolvePath(actorPath, cohortPath);
}
+
+ @Override public void closeTransaction() {
+ getActor().tell(
+ new CloseTransaction().toSerializable(), null);
+ }
+
+ @Override public Object readyTransaction() {
+ return actorContext.executeRemoteOperation(getActor(),
+ new ReadyTransaction().toSerializable(),
+ ActorContext.ASK_DURATION
+ );
+
+ }
+
+ @Override public void deleteData(YangInstanceIdentifier path) {
+ getActor().tell(new DeleteData(path).toSerializable(), null);
+ }
+
+ @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data){
+ getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
+ }
+
+ @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path) {
+
+ Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
+
+ @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
+ Object response = actorContext
+ .executeRemoteOperation(getActor(), new ReadData(path).toSerializable(),
+ ActorContext.ASK_DURATION);
+ if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
+ ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
+ if(reply.getNormalizedNode() == null){
+ return Optional.absent();
+ }
+ return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
+ }
+
+ return Optional.absent();
+ }
+ };
+
+ return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
+ }
+
+ @Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ getActor().tell(new WriteData(path, data, schemaContext).toSerializable(), null);
+ }
+
}
+ private class NoOpTransactionContext implements TransactionContext {
+
+ private final Logger
+ LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
+
+ private final String shardName;
+
+ private ActorRef cohort;
+
+ public NoOpTransactionContext(String shardName){
+ this.shardName = shardName;
+ }
+ @Override public String getShardName() {
+ return shardName;
+
+ }
+
+ @Override public String getResolvedCohortPath(String cohortPath) {
+ return cohort.path().toString();
+ }
+
+ @Override public void closeTransaction() {
+ LOG.error("closeTransaction called");
+ }
+
+ @Override public Object readyTransaction() {
+ LOG.error("readyTransaction called");
+ cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
+ return new ReadyTransactionReply(cohort.path()).toSerializable();
+ }
+
+ @Override public void deleteData(YangInstanceIdentifier path) {
+ LOG.error("deleteData called path = {}", path);
+ }
+
+ @Override public void mergeData(YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data) {
+ LOG.error("mergeData called path = {}", path);
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ YangInstanceIdentifier path) {
+ LOG.error("readData called path = {}", path);
+ return Futures.immediateCheckedFuture(
+ Optional.<NormalizedNode<?, ?>>absent());
+ }
+
+ @Override public void writeData(YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data) {
+ LOG.error("writeData called path = {}", path);
+ }
+ }
+
+
+
}