+public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
+
+ protected static final boolean SERIALIZED_REPLY = true;
+
+ private final ActorRef shardActor;
+ private final SchemaContext schemaContext;
+ private final ShardStats shardStats;
+ private final String transactionID;
+ private final short clientTxVersion;
+
+ protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
+ ShardStats shardStats, String transactionID, short clientTxVersion) {
+ super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
+ this.shardActor = shardActor;
+ this.schemaContext = schemaContext;
+ this.shardStats = shardStats;
+ this.transactionID = transactionID;
+ this.clientTxVersion = clientTxVersion;
+ }
+
+ public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
+ SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
+ String transactionID, short txnClientVersion) {
+ return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
+ datastoreContext, shardStats, transactionID, txnClientVersion));
+ }
+
+ protected abstract DOMStoreTransaction getDOMStoreTransaction();
+
+ protected ActorRef getShardActor() {
+ return shardActor;
+ }
+
+ protected String getTransactionID() {
+ return transactionID;
+ }
+
+ protected SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
+
+ protected short getClientTxVersion() {
+ return clientTxVersion;
+ }
+
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
+ closeTransaction(true);
+ } else if (message instanceof ReceiveTimeout) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
+ }
+ closeTransaction(false);
+ } else {
+ throw new UnknownMessageException(message);
+ }
+ }
+
+ protected boolean returnCloseTransactionReply() {
+ return true;
+ }
+
+ private void closeTransaction(boolean sendReply) {
+ getDOMStoreTransaction().close();
+
+ if(sendReply && returnCloseTransactionReply()) {
+ getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
+ }
+
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
+
+ protected void readData(DOMStoreReadTransaction transaction, ReadData message,
+ final boolean returnSerialized) {
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+ final YangInstanceIdentifier path = message.getPath();
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
+ transaction.read(path);
+
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
+ ReadDataReply readDataReply = new ReadDataReply(optional.orNull());
+
+ sender.tell((returnSerialized ? readDataReply.toSerializable(clientTxVersion):
+ readDataReply), self);
+
+ } catch (Exception e) {
+ shardStats.incrementFailedReadTransactionsCount();
+ sender.tell(new akka.actor.Status.Failure(e), self);
+ }
+
+ }
+ }, getContext().dispatcher());
+ }
+
+ protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
+ final boolean returnSerialized) {
+ final YangInstanceIdentifier path = message.getPath();
+
+ try {
+ Boolean exists = transaction.exists(path).checkedGet();
+ DataExistsReply dataExistsReply = new DataExistsReply(exists);
+ getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
+ dataExistsReply, getSelf());
+ } catch (ReadFailedException e) {
+ getSender().tell(new akka.actor.Status.Failure(e),getSelf());
+ }
+
+ }