Deprecate ReadData/DataExists protobuff messages
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
30
31 /**
32  * The ShardTransaction Actor represents a remote transaction
33  * <p>
34  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
35  * </p>
36  * <p>
37  * Handles Messages <br/>
38  * ---------------- <br/>
39  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
40  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
41  * </p>
42  */
43 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
44     private final ActorRef shardActor;
45     private final ShardStats shardStats;
46     private final String transactionID;
47     private final short clientTxVersion;
48
49     protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
50             short clientTxVersion) {
51         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
52         this.shardActor = shardActor;
53         this.shardStats = shardStats;
54         this.transactionID = Preconditions.checkNotNull(transactionID);
55         this.clientTxVersion = clientTxVersion;
56     }
57
58     public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
59             DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
60         return Props.create(new ShardTransactionCreator(type, transaction, shardActor,
61            datastoreContext, shardStats, transactionID, txnClientVersion));
62     }
63
64     protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
65
66     protected ActorRef getShardActor() {
67         return shardActor;
68     }
69
70     protected String getTransactionID() {
71         return transactionID;
72     }
73
74     protected short getClientTxVersion() {
75         return clientTxVersion;
76     }
77
78     @Override
79     public void handleReceive(Object message) throws Exception {
80         if (CloseTransaction.isSerializedType(message)) {
81             closeTransaction(true);
82         } else if (message instanceof ReceiveTimeout) {
83             if(LOG.isDebugEnabled()) {
84                 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
85             }
86             closeTransaction(false);
87         } else {
88             throw new UnknownMessageException(message);
89         }
90     }
91
92     protected boolean returnCloseTransactionReply() {
93         return true;
94     }
95
96     private void closeTransaction(boolean sendReply) {
97         getDOMStoreTransaction().abort();
98
99         if(sendReply && returnCloseTransactionReply()) {
100             getSender().tell(new CloseTransactionReply(), getSelf());
101         }
102
103         getSelf().tell(PoisonPill.getInstance(), getSelf());
104     }
105
106     private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
107         final boolean ret = transaction.isClosed();
108         if (ret) {
109             shardStats.incrementFailedReadTransactionsCount();
110             getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf());
111         }
112         return ret;
113     }
114
115     protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message) {
116         if (checkClosed(transaction)) {
117             return;
118         }
119
120         final YangInstanceIdentifier path = message.getPath();
121         Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
122         ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), message.getVersion());
123         sender().tell(readDataReply.toSerializable(), self());
124     }
125
126     protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message) {
127         if (checkClosed(transaction)) {
128             return;
129         }
130
131         final YangInstanceIdentifier path = message.getPath();
132         boolean exists = transaction.getSnapshot().readNode(path).isPresent();
133         getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
134     }
135
136     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
137
138         private static final long serialVersionUID = 1L;
139
140         final AbstractShardDataTreeTransaction<?> transaction;
141         final ActorRef shardActor;
142         final DatastoreContext datastoreContext;
143         final ShardStats shardStats;
144         final String transactionID;
145         final short txnClientVersion;
146         final TransactionType type;
147
148         ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
149                 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
150             this.transaction = Preconditions.checkNotNull(transaction);
151             this.shardActor = shardActor;
152             this.shardStats = shardStats;
153             this.datastoreContext = datastoreContext;
154             this.transactionID = Preconditions.checkNotNull(transactionID);
155             this.txnClientVersion = txnClientVersion;
156             this.type = type;
157         }
158
159         @Override
160         public ShardTransaction create() throws Exception {
161             final ShardTransaction tx;
162             switch (type) {
163             case READ_ONLY:
164                 tx = new ShardReadTransaction(transaction, shardActor,
165                     shardStats, transactionID, txnClientVersion);
166                 break;
167             case READ_WRITE:
168                 tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
169                     shardActor, shardStats, transactionID, txnClientVersion);
170                 break;
171             case WRITE_ONLY:
172                 tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
173                     shardActor, shardStats, transactionID, txnClientVersion);
174                 break;
175             default:
176                 throw new IllegalArgumentException("Unhandled transaction type " + type);
177             }
178
179             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
180             return tx;
181         }
182     }
183 }