Merge "Issue fix for config subsystem"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardTransaction.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import akka.actor.Props;
14 import akka.actor.ReceiveTimeout;
15 import akka.japi.Creator;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
19 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
20 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
23 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
24 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
25 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
27 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35
36 /**
37  * The ShardTransaction Actor represents a remote transaction
38  * <p>
39  * The ShardTransaction Actor delegates all actions to DOMDataReadWriteTransaction
40  * </p>
41  * <p>
42  * Even though the DOMStore and the DOMStoreTransactionChain implement multiple types of transactions
43  * the ShardTransaction Actor only works with read-write transactions. This is just to keep the logic simple. At this
44  * time there are no known advantages for creating a read-only or write-only transaction which may change over time
45  * at which point we can optimize things in the distributed store as well.
46  * </p>
47  * <p>
48  * Handles Messages <br/>
49  * ---------------- <br/>
50  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadData}
51  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.WriteData}
52  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.MergeData}
53  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.DeleteData}
54  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction}
55  * <li> {@link org.opendaylight.controller.cluster.datastore.messages.CloseTransaction}
56  * </p>
57  */
58 public abstract class ShardTransaction extends AbstractUntypedActor {
59
60     private final ActorRef shardActor;
61     private final SchemaContext schemaContext;
62     private final ShardStats shardStats;
63     private final String transactionID;
64
65     protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext,
66             ShardStats shardStats, String transactionID) {
67         this.shardActor = shardActor;
68         this.schemaContext = schemaContext;
69         this.shardStats = shardStats;
70         this.transactionID = transactionID;
71     }
72
73     public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
74             SchemaContext schemaContext,DatastoreContext datastoreContext, ShardStats shardStats,
75             String transactionID) {
76         return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
77            datastoreContext, shardStats, transactionID));
78     }
79
80     protected abstract DOMStoreTransaction getDOMStoreTransaction();
81
82     protected ActorRef getShardActor() {
83         return shardActor;
84     }
85
86     protected String getTransactionID() {
87         return transactionID;
88     }
89
90     protected SchemaContext getSchemaContext() {
91         return schemaContext;
92     }
93
94     @Override
95     public void handleReceive(Object message) throws Exception {
96         if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
97             closeTransaction(true);
98         } else if (message instanceof ReceiveTimeout) {
99             if(LOG.isDebugEnabled()) {
100                 LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
101             }
102             closeTransaction(false);
103         } else {
104             throw new UnknownMessageException(message);
105         }
106     }
107
108     private void closeTransaction(boolean sendReply) {
109         getDOMStoreTransaction().close();
110
111         if(sendReply) {
112             getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
113         }
114
115         getSelf().tell(PoisonPill.getInstance(), getSelf());
116     }
117
118     protected void readData(DOMStoreReadTransaction transaction,ReadData message) {
119         final ActorRef sender = getSender();
120         final ActorRef self = getSelf();
121         final YangInstanceIdentifier path = message.getPath();
122         final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
123                 transaction.read(path);
124
125         future.addListener(new Runnable() {
126             @Override
127             public void run() {
128                 try {
129                     Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
130                     if (optional.isPresent()) {
131                         sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
132                     } else {
133                         sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
134                     }
135                 } catch (Exception e) {
136                     shardStats.incrementFailedReadTransactionsCount();
137                     sender.tell(new akka.actor.Status.Failure(e), self);
138                 }
139
140             }
141         }, getContext().dispatcher());
142     }
143
144     protected void dataExists(DOMStoreReadTransaction transaction, DataExists message) {
145         final YangInstanceIdentifier path = message.getPath();
146
147         try {
148             Boolean exists = transaction.exists(path).checkedGet();
149             getSender().tell(new DataExistsReply(exists).toSerializable(), getSelf());
150         } catch (ReadFailedException e) {
151             getSender().tell(new akka.actor.Status.Failure(e),getSelf());
152         }
153
154     }
155
156     private static class ShardTransactionCreator implements Creator<ShardTransaction> {
157
158         private static final long serialVersionUID = 1L;
159
160         final DOMStoreTransaction transaction;
161         final ActorRef shardActor;
162         final SchemaContext schemaContext;
163         final DatastoreContext datastoreContext;
164         final ShardStats shardStats;
165         final String transactionID;
166
167         ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
168                 SchemaContext schemaContext, DatastoreContext datastoreContext,
169                 ShardStats shardStats, String transactionID) {
170             this.transaction = transaction;
171             this.shardActor = shardActor;
172             this.shardStats = shardStats;
173             this.schemaContext = schemaContext;
174             this.datastoreContext = datastoreContext;
175             this.transactionID = transactionID;
176         }
177
178         @Override
179         public ShardTransaction create() throws Exception {
180             ShardTransaction tx;
181             if(transaction instanceof DOMStoreReadWriteTransaction) {
182                 tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
183                         shardActor, schemaContext, shardStats, transactionID);
184             } else if(transaction instanceof DOMStoreReadTransaction) {
185                 tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
186                         schemaContext, shardStats, transactionID);
187             } else {
188                 tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
189                         shardActor, schemaContext, shardStats, transactionID);
190             }
191
192             tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
193             return tx;
194         }
195     }
196 }