import akka.actor.ActorSelection;
import akka.actor.Props;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
private final TransactionType transactionType;
private final ActorContext actorContext;
private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
- private final String identifier;
+ private final TransactionIdentifier identifier;
private final ListeningExecutorService executor;
private final SchemaContext schemaContext;
ListeningExecutorService executor,
SchemaContext schemaContext
) {
+ this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+ this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
+ this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
+ this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+
+ String memberName = actorContext.getCurrentMemberName();
+ if(memberName == null){
+ memberName = "UNKNOWN-MEMBER";
+ }
+ this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
- this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement();
- this.transactionType = transactionType;
- this.actorContext = actorContext;
- this.executor = executor;
- this.schemaContext = schemaContext;
-
+ LOG.debug("Created txn {}", identifier);
}
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
+ LOG.debug("txn {} read {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
return transactionContext(path).readData(path);
}
+ @Override public CheckedFuture<Boolean, ReadFailedException> exists(
+ YangInstanceIdentifier path) {
+ LOG.debug("txn {} exists {}", identifier, path);
+
+ createTransactionIfMissing(actorContext, path);
+
+ return transactionContext(path).dataExists(path);
+ }
+
@Override
public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("txn {} write {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).writeData(path, data);
@Override
public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("txn {} merge {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).mergeData(path, data);
@Override
public void delete(YangInstanceIdentifier path) {
+ LOG.debug("txn {} delete {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).deleteData(path);
public DOMStoreThreePhaseCommitCohort ready() {
List<ActorPath> cohortPaths = new ArrayList<>();
+ LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
+
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+
+ LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
+
Object result = transactionContext.readyTransaction();
if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
}
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
}
@Override
try {
Object response = actorContext.executeShardOperation(shardName,
- new CreateTransaction(identifier,this.transactionType.ordinal() ).toSerializable(),
+ new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
ActorContext.ASK_DURATION);
if (response.getClass()
.equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
String transactionPath = reply.getTransactionPath();
- LOG.info("Received transaction path = {}" , transactionPath );
+ LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
ActorSelection transactionActor =
actorContext.actorSelection(transactionPath);
remoteTransactionPaths.put(shardName, transactionContext);
}
} catch(TimeoutException | PrimaryNotFoundException e){
- LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
+ LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
remoteTransactionPaths.put(shardName,
new NoOpTransactionContext(shardName));
}
final YangInstanceIdentifier path);
void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
+
+ CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
}
- private class TransactionContextImpl implements TransactionContext{
+ private class TransactionContextImpl implements TransactionContext {
private final String shardName;
private final String actorPath;
- private final ActorSelection actor;
+ private final ActorSelection actor;
private TransactionContextImpl(String shardName, String actorPath,
return actor;
}
- @Override public String getResolvedCohortPath(String cohortPath){
+ @Override public String getResolvedCohortPath(String cohortPath) {
return actorContext.resolvePath(actorPath, cohortPath);
}
getActor().tell(new DeleteData(path).toSerializable(), null);
}
- @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data){
- getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
+ @Override public void mergeData(YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data) {
+ getActor()
+ .tell(new MergeData(path, data, schemaContext).toSerializable(),
+ null);
}
- @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
- final YangInstanceIdentifier path) {
-
- Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path) {
- @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
- Object response = actorContext
- .executeRemoteOperation(getActor(), new ReadData(path).toSerializable(),
- ActorContext.ASK_DURATION);
- if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
- ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
- if(reply.getNormalizedNode() == null){
- return Optional.absent();
+ Callable<Optional<NormalizedNode<?, ?>>> call =
+ new Callable<Optional<NormalizedNode<?, ?>>>() {
+
+ @Override public Optional<NormalizedNode<?, ?>> call()
+ throws Exception {
+ Object response = actorContext
+ .executeRemoteOperation(getActor(),
+ new ReadData(path).toSerializable(),
+ ActorContext.ASK_DURATION);
+ if (response.getClass()
+ .equals(ReadDataReply.SERIALIZABLE_CLASS)) {
+ ReadDataReply reply = ReadDataReply
+ .fromSerializable(schemaContext, path,
+ response);
+ if (reply.getNormalizedNode() == null) {
+ return Optional.absent();
+ }
+ return Optional.<NormalizedNode<?, ?>>of(
+ reply.getNormalizedNode());
}
- return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
- }
- return Optional.absent();
- }
- };
+ throw new ReadFailedException("Read Failed " + path);
+ }
+ };
- return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
+ return MappingCheckedFuture
+ .create(executor.submit(call), ReadFailedException.MAPPER);
}
- @Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- getActor().tell(new WriteData(path, data, schemaContext).toSerializable(), null);
+ @Override public void writeData(YangInstanceIdentifier path,
+ NormalizedNode<?, ?> data) {
+ getActor()
+ .tell(new WriteData(path, data, schemaContext).toSerializable(),
+ null);
}
+ @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
+ final YangInstanceIdentifier path) {
+
+ Callable<Boolean> call = new Callable<Boolean>() {
+
+ @Override public Boolean call() throws Exception {
+ Object o = actorContext.executeRemoteOperation(getActor(),
+ new DataExists(path).toSerializable(),
+ ActorContext.ASK_DURATION
+ );
+
+
+ if (DataExistsReply.SERIALIZABLE_CLASS
+ .equals(o.getClass())) {
+ return DataExistsReply.fromSerializable(o).exists();
+ }
+
+ throw new ReadFailedException("Exists Failed " + path);
+ }
+ };
+ return MappingCheckedFuture
+ .create(executor.submit(call), ReadFailedException.MAPPER);
+ }
}
private class NoOpTransactionContext implements TransactionContext {
}
@Override public void closeTransaction() {
- LOG.error("closeTransaction called");
+ LOG.warn("txn {} closeTransaction called", identifier);
}
@Override public Object readyTransaction() {
- LOG.error("readyTransaction called");
+ LOG.warn("txn {} readyTransaction called", identifier);
cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
return new ReadyTransactionReply(cohort.path()).toSerializable();
}
@Override public void deleteData(YangInstanceIdentifier path) {
- LOG.error("deleteData called path = {}", path);
+ LOG.warn("txt {} deleteData called path = {}", identifier, path);
}
@Override public void mergeData(YangInstanceIdentifier path,
NormalizedNode<?, ?> data) {
- LOG.error("mergeData called path = {}", path);
+ LOG.warn("txn {} mergeData called path = {}", identifier, path);
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
- LOG.error("readData called path = {}", path);
+ LOG.warn("txn {} readData called path = {}", identifier, path);
return Futures.immediateCheckedFuture(
Optional.<NormalizedNode<?, ?>>absent());
}
@Override public void writeData(YangInstanceIdentifier path,
NormalizedNode<?, ?> data) {
- LOG.error("writeData called path = {}", path);
+ LOG.warn("txn {} writeData called path = {}", identifier, path);
+ }
+
+ @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
+ YangInstanceIdentifier path) {
+ LOG.warn("txn {} dataExists called path = {}", identifier, path);
+
+ // Returning false instead of an exception to keep this aligned with
+ // read
+ return Futures.immediateCheckedFuture(false);
}
}