Merge "BUG 2221 : Add metering to ShardTransaction actor"
authorMoiz Raja <moraja@cisco.com>
Wed, 29 Oct 2014 03:05:09 +0000 (03:05 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 29 Oct 2014 03:05:10 +0000 (03:05 +0000)
1  2 
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java

index 5289ad33bfb2b2bc3a5008c0211ffccc3b077721,bb676e3757e5038d49dcbfb56a745e8e27d11fa5..32de47f451d9ff53f301339975357440e651bea5
@@@ -15,7 -15,7 +15,7 @@@ import akka.actor.ReceiveTimeout
  import akka.japi.Creator;
  import com.google.common.base.Optional;
  import com.google.common.util.concurrent.CheckedFuture;
- import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+ import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
  import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
  import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
  import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
@@@ -55,16 -55,16 +55,17 @@@ import org.opendaylight.yangtools.yang.
   * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
   * </p>
   */
- public abstract class ShardTransaction extends AbstractUntypedActor {
+ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
  
      private final ActorRef shardActor;
      private final SchemaContext schemaContext;
      private final ShardStats shardStats;
      private final String transactionID;
 +    protected static final boolean SERIALIZED_REPLY = true;
  
      protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
              ShardStats shardStats, String transactionID) {
+         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
          this.shardActor = shardActor;
          this.schemaContext = schemaContext;
          this.shardStats = shardStats;
          getSelf().tell(PoisonPill.getInstance(), getSelf());
      }
  
 -    protected void readData(DOMStoreReadTransaction transaction,ReadData message) {
 +    protected void readData(DOMStoreReadTransaction transaction, ReadData message, final boolean returnSerialized) {
          final ActorRef sender = getSender();
          final ActorRef self = getSelf();
          final YangInstanceIdentifier path = message.getPath();
          final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
                  transaction.read(path);
  
 +
          future.addListener(new Runnable() {
              @Override
              public void run() {
                  try {
                      Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
 -                    if (optional.isPresent()) {
 -                        sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
 -                    } else {
 -                        sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
 -                    }
 +                    ReadDataReply readDataReply = new ReadDataReply(schemaContext, optional.orNull());
 +
 +                    sender.tell((returnSerialized ? readDataReply.toSerializable():
 +                        readDataReply), self);
 +
                  } catch (Exception e) {
                      shardStats.incrementFailedReadTransactionsCount();
                      sender.tell(new akka.actor.Status.Failure(e), self);
          }, getContext().dispatcher());
      }
  
 -    protected void dataExists(DOMStoreReadTransaction transaction, DataExists message) {
 +    protected void dataExists(DOMStoreReadTransaction transaction, DataExists message,
 +        final boolean returnSerialized) {
          final YangInstanceIdentifier path = message.getPath();
  
          try {
              Boolean exists = transaction.exists(path).checkedGet();
 -            getSender().tell(new DataExistsReply(exists).toSerializable(), getSelf());
 +            DataExistsReply dataExistsReply = new DataExistsReply(exists);
 +            getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
 +                dataExistsReply, getSelf());
          } catch (ReadFailedException e) {
              getSender().tell(new akka.actor.Status.Failure(e),getSelf());
          }