public static enum TransactionType {
READ_ONLY,
WRITE_ONLY,
- READ_WRITE
+ READ_WRITE;
+
+ public static TransactionType fromInt(int type) {
+ if(type == WRITE_ONLY.ordinal()) {
+ return WRITE_ONLY;
+ } else if(type == READ_WRITE.ordinal()) {
+ return READ_WRITE;
+ } else if(type == READ_ONLY.ordinal()) {
+ return READ_ONLY;
+ } else {
+ throw new IllegalArgumentException("In TransactionType enum value" + type);
+ }
+ }
}
static final Mapper<Throwable, Throwable> SAME_FAILURE_TRANSFORMER =
* Sets the target primary shard and initiates a CreateTransaction try.
*/
void setPrimaryShard(ActorSelection primaryShard) {
- LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
-
this.primaryShard = primaryShard;
- tryCreateTransaction();
+
+ if(transactionType == TransactionType.WRITE_ONLY &&
+ actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ LOG.debug("Tx {} Primary shard found - creating WRITE_ONLY transaction context", identifier);
+
+ // For write-only Tx's we prepare the transaction modifications directly on the shard actor
+ // to avoid the overhead of creating a separate transaction actor.
+ // FIXME: can't assume the shard version is LITHIUM_VERSION - need to obtain it somehow.
+ executeTxOperatonsOnComplete(createValidTransactionContext(this.primaryShard,
+ this.primaryShard.path().toString(), DataStoreVersions.LITHIUM_VERSION));
+ } else {
+ tryCreateTransaction();
+ }
}
/**
boolean invokeOperation = true;
synchronized(txOperationsOnComplete) {
if(transactionContext == null) {
- LOG.debug("Tx {} Adding operation on complete {}", identifier);
+ LOG.debug("Tx {} Adding operation on complete", identifier);
invokeOperation = false;
txOperationsOnComplete.add(operation);
* Performs a CreateTransaction try async.
*/
private void tryCreateTransaction() {
+ LOG.debug("Tx {} Primary shard found - trying create transaction", identifier);
+
Object serializedCreateMessage = new CreateTransaction(identifier.toString(),
TransactionProxy.this.transactionType.ordinal(),
getTransactionChainId()).toSerializable();
// TransactionContext until after we've executed all cached TransactionOperations.
TransactionContext localTransactionContext;
if(failure != null) {
- LOG.debug("Tx {} Creating NoOpTransaction because of error: {}", identifier,
- failure.getMessage());
+ LOG.debug("Tx {} Creating NoOpTransaction because of error", identifier, failure);
localTransactionContext = new NoOpTransactionContext(failure, identifier, operationLimiter);
} else if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
}
private TransactionContext createValidTransactionContext(CreateTransactionReply reply) {
- String transactionPath = reply.getTransactionPath();
-
LOG.debug("Tx {} Received {}", identifier, reply);
- ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+ return createValidTransactionContext(actorContext.actorSelection(reply.getTransactionPath()),
+ reply.getTransactionPath(), reply.getVersion());
+ }
+
+ private TransactionContext createValidTransactionContext(ActorSelection transactionActor,
+ String transactionPath, short remoteTransactionVersion) {
if (transactionType == TransactionType.READ_ONLY) {
// Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
// Check if TxActor is created in the same node
boolean isTxActorLocal = actorContext.isPathLocal(transactionPath);
- if(reply.getVersion() >= DataStoreVersions.LITHIUM_VERSION) {
- return new TransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
- } else {
+ if(remoteTransactionVersion < DataStoreVersions.LITHIUM_VERSION) {
return new PreLithiumTransactionContextImpl(transactionPath, transactionActor, identifier,
- actorContext, schemaContext, isTxActorLocal, reply.getVersion(), operationCompleter);
+ transactionChainId, actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion,
+ operationCompleter);
+ } else if (transactionType == TransactionType.WRITE_ONLY &&
+ actorContext.getDatastoreContext().isWriteOnlyTransactionOptimizationsEnabled()) {
+ return new WriteOnlyTransactionContextImpl(transactionActor, identifier, transactionChainId,
+ actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
+ } else {
+ return new TransactionContextImpl(transactionActor, identifier, transactionChainId,
+ actorContext, schemaContext, isTxActorLocal, remoteTransactionVersion, operationCompleter);
}
}
}