Merge "BUG 2820 - LLDP refactor"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.base.Preconditions;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.cluster.datastore.TransactionProxy.TransactionType;
20 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
21 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
25 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
28 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
31
32 /**
33  * The ShardTransaction Actor represents a remote transaction
34  * <p>
35  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
36  * </p>
37  * <p>
38  * Handles Messages <br/>
39  * ---------------- <br/>
40  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
41  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
42  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
43  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
44  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
45  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
46  * </p>
47  */
48 public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
49
50     protected static final boolean SERIALIZED_REPLY = true;
51
52     private final ActorRef shardActor;
53     private final ShardStats shardStats;
54     private final String transactionID;
55     private final short clientTxVersion;
56
57     protected ShardTransaction(ActorRef shardActor, ShardStats shardStats, String transactionID,
58             short clientTxVersion) {
59         super("shard-tx"); //actor name override used for metering. This does not change the "real" actor name
60         this.shardActor = shardActor;
61         this.shardStats = shardStats;
62         this.transactionID = transactionID;
63         this.clientTxVersion = clientTxVersion;
64     }
65
66     public static Props props(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
67             DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
68         return Props.create(new ShardTransactionCreator(type, transaction, shardActor,
69            datastoreContext, shardStats, transactionID, txnClientVersion));
70     }
71
72     protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
73
74     protected ActorRef getShardActor() {
75         return shardActor;
76     }
77
78     protected String getTransactionID() {
79         return transactionID;
80     }
81
82     protected short getClientTxVersion() {
83         return clientTxVersion;
84     }
85
86     @Override
87     public void handleReceive(Object message) throws Exception {
88         if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
89             closeTransaction(true);
90         } else if (message instanceof ReceiveTimeout) {
91             if(LOG.isDebugEnabled()) {
92                 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
93             }
94             closeTransaction(false);
95         } else {
96             throw new UnknownMessageException(message);
97         }
98     }
99
100     protected boolean returnCloseTransactionReply() {
101         return true;
102     }
103
104     private void closeTransaction(boolean sendReply) {
105         getDOMStoreTransaction().abort();
106
107         if(sendReply && returnCloseTransactionReply()) {
108             getSender().tell(CloseTransactionReply.INSTANCE.toSerializable(), getSelf());
109         }
110
111         getSelf().tell(PoisonPill.getInstance(), getSelf());
112     }
113
114     private boolean checkClosed(AbstractShardDataTreeTransaction<?> transaction) {
115         final boolean ret = transaction.isClosed();
116         if (ret) {
117             shardStats.incrementFailedReadTransactionsCount();
118             getSender().tell(new akka.actor.Status.Failure(new ReadFailedException("Transaction is closed")), getSelf());
119         }
120         return ret;
121     }
122
123     protected void readData(AbstractShardDataTreeTransaction<?> transaction, ReadData message,
124             final boolean returnSerialized) {
125
126         if (checkClosed(transaction)) {
127             return;
128         }
129
130         final YangInstanceIdentifier path = message.getPath();
131         Optional<NormalizedNode<?, ?>> optional = transaction.getSnapshot().readNode(path);
132         ReadDataReply readDataReply = new ReadDataReply(optional.orNull(), clientTxVersion);
133         sender().tell((returnSerialized ? readDataReply.toSerializable(): readDataReply), self());
134     }
135
136     protected void dataExists(AbstractShardDataTreeTransaction<?> transaction, DataExists message,
137         final boolean returnSerialized) {
138
139         if (checkClosed(transaction)) {
140             return;
141         }
142
143         final YangInstanceIdentifier path = message.getPath();
144         boolean exists = transaction.getSnapshot().readNode(path).isPresent();
145         DataExistsReply dataExistsReply = DataExistsReply.create(exists);
146         getSender().tell(returnSerialized ? dataExistsReply.toSerializable() :
147             dataExistsReply, getSelf());
148     }
149
150     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
151
152         private static final long serialVersionUID = 1L;
153
154         final AbstractShardDataTreeTransaction<?> transaction;
155         final ActorRef shardActor;
156         final DatastoreContext datastoreContext;
157         final ShardStats shardStats;
158         final String transactionID;
159         final short txnClientVersion;
160         final TransactionType type;
161
162         ShardTransactionCreator(TransactionType type, AbstractShardDataTreeTransaction<?> transaction, ActorRef shardActor,
163                 DatastoreContext datastoreContext, ShardStats shardStats, String transactionID, short txnClientVersion) {
164             this.transaction = Preconditions.checkNotNull(transaction);
165             this.shardActor = shardActor;
166             this.shardStats = shardStats;
167             this.datastoreContext = datastoreContext;
168             this.transactionID = transactionID;
169             this.txnClientVersion = txnClientVersion;
170             this.type = type;
171         }
172
173         @Override
174         public ShardTransaction create() throws Exception {
175             final ShardTransaction tx;
176             switch (type) {
177             case READ_ONLY:
178                 tx = new ShardReadTransaction(transaction, shardActor,
179                     shardStats, transactionID, txnClientVersion);
180                 break;
181             case READ_WRITE:
182                 tx = new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
183                     shardActor, shardStats, transactionID, txnClientVersion);
184                 break;
185             case WRITE_ONLY:
186                 tx = new ShardWriteTransaction((ReadWriteShardDataTreeTransaction)transaction,
187                     shardActor, shardStats, transactionID, txnClientVersion);
188                 break;
189             default:
190                 throw new IllegalArgumentException("Unhandled transaction type " + type);
191             }
192
193             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
194             return tx;
195         }
196     }
197 }