import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
if(remoteTransactionActorsMB.get()) {
for(ActorSelection actor : remoteTransactionActors) {
LOG.trace("Sending CloseTransaction to {}", actor);
- actorContext.sendRemoteOperationAsync(actor,
+ actorContext.sendOperationAsync(actor,
new CloseTransaction().toSerializable());
}
}
}
try {
- Object response = actorContext.executeShardOperation(shardName,
- new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
- getTransactionChainId()).toSerializable());
+ Optional<ActorSelection> primaryShard = actorContext.findPrimaryShard(shardName);
+ if (!primaryShard.isPresent()) {
+ throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName);
+ }
+
+ Object response = actorContext.executeOperation(primaryShard.get(),
+ new CreateTransaction(identifier.toString(), this.transactionType.ordinal(),
+ getTransactionChainId()).toSerializable());
if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
CreateTransactionReply reply =
CreateTransactionReply.fromSerializable(response);
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} closeTransaction called", identifier);
}
- actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
+ actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable());
}
@Override
}
// Send the ReadyTransaction message to the Tx actor.
- final Future<Object> replyFuture = actorContext.executeRemoteOperationAsync(getActor(),
+ final Future<Object> replyFuture = actorContext.executeOperationAsync(getActor(),
new ReadyTransaction().toSerializable());
// Combine all the previously recorded put/merge/delete operation reply Futures and the
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} deleteData called path = {}", identifier, path);
}
- recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
- new DeleteData(path).toSerializable() ));
+ recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
+ new DeleteData(path).toSerializable()));
}
@Override
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} mergeData called path = {}", identifier, path);
}
- recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
new MergeData(path, data, schemaContext).toSerializable()));
}
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} writeData called path = {}", identifier, path);
}
- recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(),
+ recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(),
new WriteData(path, data, schemaContext).toSerializable()));
}
}
};
- Future<Object> readFuture = actorContext.executeRemoteOperationAsync(getActor(),
+ Future<Object> readFuture = actorContext.executeOperationAsync(getActor(),
new ReadData(path).toSerializable());
readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}
}
};
- Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
+ Future<Object> future = actorContext.executeOperationAsync(getActor(),
new DataExists(path).toSerializable());
future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
}