import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
-
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
-
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
private final TransactionType transactionType;
private final ActorContext actorContext;
private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
- private final String identifier;
+ private final TransactionIdentifier identifier;
private final ListeningExecutorService executor;
private final SchemaContext schemaContext;
ListeningExecutorService executor,
SchemaContext schemaContext
) {
+ this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+ this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
+ this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
+ this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+
+ String memberName = actorContext.getCurrentMemberName();
+ if(memberName == null){
+ memberName = "UNKNOWN-MEMBER";
+ }
+ this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
- this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement();
- this.transactionType = transactionType;
- this.actorContext = actorContext;
- this.executor = executor;
- this.schemaContext = schemaContext;
-
+ LOG.debug("Created txn {}", identifier);
}
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
final YangInstanceIdentifier path) {
+ LOG.debug("txn {} read {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
return transactionContext(path).readData(path);
@Override
public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("txn {} write {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).writeData(path, data);
@Override
public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ LOG.debug("txn {} merge {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).mergeData(path, data);
@Override
public void delete(YangInstanceIdentifier path) {
+ LOG.debug("txn {} delete {}", identifier, path);
+
createTransactionIfMissing(actorContext, path);
transactionContext(path).deleteData(path);
public DOMStoreThreePhaseCommitCohort ready() {
List<ActorPath> cohortPaths = new ArrayList<>();
+ LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
+
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
+
+ LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
+
Object result = transactionContext.readyTransaction();
if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
}
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
}
@Override
try {
Object response = actorContext.executeShardOperation(shardName,
- new CreateTransaction(identifier).toSerializable(),
+ new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
ActorContext.ASK_DURATION);
if (response.getClass()
.equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
String transactionPath = reply.getTransactionPath();
- LOG.info("Received transaction path = {}" , transactionPath );
+ LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
ActorSelection transactionActor =
actorContext.actorSelection(transactionPath);
remoteTransactionPaths.put(shardName, transactionContext);
}
- } catch(TimeoutException e){
- LOG.warn("Timed out trying to create transaction on shard {}: {}", shardName, e);
- remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName));
+ } catch(TimeoutException | PrimaryNotFoundException e){
+ LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
+ remoteTransactionPaths.put(shardName,
+ new NoOpTransactionContext(shardName));
}
}
}
@Override public void closeTransaction() {
- LOG.error("closeTransaction called");
+ LOG.warn("txn {} closeTransaction called", identifier);
}
@Override public Object readyTransaction() {
- LOG.error("readyTransaction called");
+ LOG.warn("txn {} readyTransaction called", identifier);
cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
return new ReadyTransactionReply(cohort.path()).toSerializable();
}
@Override public void deleteData(YangInstanceIdentifier path) {
- LOG.error("deleteData called path = {}", path);
+ LOG.warn("txt {} deleteData called path = {}", identifier, path);
}
@Override public void mergeData(YangInstanceIdentifier path,
NormalizedNode<?, ?> data) {
- LOG.error("mergeData called path = {}", path);
+ LOG.warn("txn {} mergeData called path = {}", identifier, path);
}
@Override
public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
- LOG.error("readData called path = {}", path);
+ LOG.warn("txn {} readData called path = {}", identifier, path);
return Futures.immediateCheckedFuture(
Optional.<NormalizedNode<?, ?>>absent());
}
@Override public void writeData(YangInstanceIdentifier path,
NormalizedNode<?, ?> data) {
- LOG.error("writeData called path = {}", path);
+ LOG.warn("txn {} writeData called path = {}", identifier, path);
}
}