import akka.actor.ActorSelection;
import akka.actor.Props;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/**
private final TransactionType transactionType;
private final ActorContext actorContext;
private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
- private final String identifier;
- private final ExecutorService executor;
+ private final TransactionIdentifier identifier;
+ private final ListeningExecutorService executor;
private final SchemaContext schemaContext;
public TransactionProxy(
ActorContext actorContext,
TransactionType transactionType,
- ExecutorService executor,
+ ListeningExecutorService executor,
SchemaContext schemaContext
) {
+ this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+ this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
+ this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
+ this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+
+ String memberName = actorContext.getCurrentMemberName();
+ if(memberName == null){
+ memberName = "UNKNOWN-MEMBER";
+ }
+ this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
- this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement();
- this.transactionType = transactionType;
- this.actorContext = actorContext;
- this.executor = executor;
- this.schemaContext = schemaContext;
-
+ LOG.debug("Created txn {}", identifier);
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+ final YangInstanceIdentifier path) {
+
+ LOG.debug("txn {} read {}", identifier, path);
createTransactionIfMissing(actorContext, path);
@Override
public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("txn {} write {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).writeData(path, data);
@Override
public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("txn {} merge {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).mergeData(path, data);
@Override
public void delete(YangInstanceIdentifier path) {
+ LOG.debug("txn {} delete {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).deleteData(path);
public DOMStoreThreePhaseCommitCohort ready() {
List<ActorPath> cohortPaths = new ArrayList<>();
+ LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
+
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+
+ LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
+
Object result = transactionContext.readyTransaction();
if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
}
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
}
@Override
try {
Object response = actorContext.executeShardOperation(shardName,
- new CreateTransaction(identifier).toSerializable(),
+ new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
ActorContext.ASK_DURATION);
if (response.getClass()
.equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
String transactionPath = reply.getTransactionPath();
- LOG.info("Received transaction path = {}" , transactionPath );
+ LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
ActorSelection transactionActor =
actorContext.actorSelection(transactionPath);
remoteTransactionPaths.put(shardName, transactionContext);
}
- } catch(TimeoutException e){
- remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName));
+ } catch(TimeoutException | PrimaryNotFoundException e){
+ LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+ remoteTransactionPaths.put(shardName,
+ new NoOpTransactionContext(shardName));
}
}
void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
- ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(final YangInstanceIdentifier path);
+ CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path);
void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
}
getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
}
- @Override public ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(final YangInstanceIdentifier path) {
+ @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path) {
- Callable<Optional<NormalizedNode<?,?>>> call = new Callable() {
+ Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
@Override public Optional<NormalizedNode<?,?>> call() throws Exception {
Object response = actorContext
if(reply.getNormalizedNode() == null){
return Optional.absent();
}
- //FIXME : A cast should not be required here ???
- return (Optional<NormalizedNode<?, ?>>) Optional.of(reply.getNormalizedNode());
+ return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
}
return Optional.absent();
}
};
- ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
- future = ListenableFutureTask.create(call);
-
- executor.submit(future);
-
- return future;
+ return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
}
@Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
}
@Override public void closeTransaction() {
- LOG.error("closeTransaction called");
+ LOG.warn("txn {} closeTransaction called", identifier);
}
@Override public Object readyTransaction() {
- LOG.error("readyTransaction called");
+ LOG.warn("txn {} readyTransaction called", identifier);
cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
return new ReadyTransactionReply(cohort.path()).toSerializable();
}
@Override public void deleteData(YangInstanceIdentifier path) {
- LOG.error("deleteData called path = {}", path);
+ LOG.warn("txt {} deleteData called path = {}", identifier, path);
}
@Override public void mergeData(YangInstanceIdentifier path,
NormalizedNode<?, ?> data) {
- LOG.error("mergeData called path = {}", path);
+ LOG.warn("txn {} mergeData called path = {}", identifier, path);
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
- LOG.error("readData called path = {}", path);
- return Futures.immediateFuture(
+ LOG.warn("txn {} readData called path = {}", identifier, path);
+ return Futures.immediateCheckedFuture(
Optional.<NormalizedNode<?, ?>>absent());
}
@Override public void writeData(YangInstanceIdentifier path,
NormalizedNode<?, ?> data) {
- LOG.error("writeData called path = {}", path);
+ LOG.warn("txn {} writeData called path = {}", identifier, path);
}
}