5f9f1f83c4dbf15b38ea5a3d13d534354b4527e2
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
27 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
33 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
38 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import java.util.ArrayList;
46 import java.util.HashMap;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.concurrent.Callable;
50 import java.util.concurrent.atomic.AtomicLong;
51
52 /**
53  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
54  * <p>
55  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
56  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
57  * be created on each of those shards by the TransactionProxy
58  *</p>
59  * <p>
60  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
61  * shards will be executed.
62  * </p>
63  */
64 public class TransactionProxy implements DOMStoreReadWriteTransaction {
65     public enum TransactionType {
66         READ_ONLY,
67         WRITE_ONLY,
68         READ_WRITE
69     }
70
71     private static final AtomicLong counter = new AtomicLong();
72
73     private static final Logger
74         LOG = LoggerFactory.getLogger(TransactionProxy.class);
75
76
77     private final TransactionType transactionType;
78     private final ActorContext actorContext;
79     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
80     private final TransactionIdentifier identifier;
81     private final ListeningExecutorService executor;
82     private final SchemaContext schemaContext;
83
84     public TransactionProxy(
85         ActorContext actorContext,
86         TransactionType transactionType,
87         ListeningExecutorService executor,
88         SchemaContext schemaContext
89     ) {
90         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
91         this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
92         this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
93         this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
94
95         String memberName = actorContext.getCurrentMemberName();
96         if(memberName == null){
97             memberName = "UNKNOWN-MEMBER";
98         }
99         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
100
101         LOG.debug("Created txn {}", identifier);
102
103     }
104
105     @Override
106     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
107             final YangInstanceIdentifier path) {
108
109         LOG.debug("txn {} read {}", identifier, path);
110
111         createTransactionIfMissing(actorContext, path);
112
113         return transactionContext(path).readData(path);
114     }
115
116     @Override
117     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
118
119         LOG.debug("txn {} write {}", identifier, path);
120
121         createTransactionIfMissing(actorContext, path);
122
123         transactionContext(path).writeData(path, data);
124     }
125
126     @Override
127     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
128
129         LOG.debug("txn {} merge {}", identifier, path);
130
131         createTransactionIfMissing(actorContext, path);
132
133         transactionContext(path).mergeData(path, data);
134     }
135
136     @Override
137     public void delete(YangInstanceIdentifier path) {
138
139         LOG.debug("txn {} delete {}", identifier, path);
140
141         createTransactionIfMissing(actorContext, path);
142
143         transactionContext(path).deleteData(path);
144     }
145
146     @Override
147     public DOMStoreThreePhaseCommitCohort ready() {
148         List<ActorPath> cohortPaths = new ArrayList<>();
149
150         LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
151
152         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
153
154             LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
155
156             Object result = transactionContext.readyTransaction();
157
158             if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
159                 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
160                 String resolvedCohortPath = transactionContext
161                     .getResolvedCohortPath(reply.getCohortPath().toString());
162                 cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
163             }
164         }
165
166         return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
167     }
168
169     @Override
170     public Object getIdentifier() {
171         return this.identifier;
172     }
173
174     @Override
175     public void close() {
176         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
177             transactionContext.closeTransaction();
178         }
179     }
180
181     private TransactionContext transactionContext(YangInstanceIdentifier path){
182         String shardName = shardNameFromIdentifier(path);
183         return remoteTransactionPaths.get(shardName);
184     }
185
186     private String shardNameFromIdentifier(YangInstanceIdentifier path){
187         return ShardStrategyFactory.getStrategy(path).findShard(path);
188     }
189
190     private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
191         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
192
193         TransactionContext transactionContext =
194             remoteTransactionPaths.get(shardName);
195
196         if(transactionContext != null){
197             // A transaction already exists with that shard
198             return;
199         }
200
201         try {
202             Object response = actorContext.executeShardOperation(shardName,
203                 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
204                 ActorContext.ASK_DURATION);
205             if (response.getClass()
206                 .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
207                 CreateTransactionReply reply =
208                     CreateTransactionReply.fromSerializable(response);
209
210                 String transactionPath = reply.getTransactionPath();
211
212                 LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
213
214                 ActorSelection transactionActor =
215                     actorContext.actorSelection(transactionPath);
216                 transactionContext =
217                     new TransactionContextImpl(shardName, transactionPath,
218                         transactionActor);
219
220                 remoteTransactionPaths.put(shardName, transactionContext);
221             }
222         } catch(TimeoutException | PrimaryNotFoundException e){
223             LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
224             remoteTransactionPaths.put(shardName,
225                 new NoOpTransactionContext(shardName));
226         }
227     }
228
229     private interface TransactionContext {
230         String getShardName();
231
232         String getResolvedCohortPath(String cohortPath);
233
234         public void closeTransaction();
235
236         public Object readyTransaction();
237
238         void deleteData(YangInstanceIdentifier path);
239
240         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
241
242         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
243                 final YangInstanceIdentifier path);
244
245         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
246     }
247
248
249     private class TransactionContextImpl implements TransactionContext{
250         private final String shardName;
251         private final String actorPath;
252         private final ActorSelection  actor;
253
254
255         private TransactionContextImpl(String shardName, String actorPath,
256             ActorSelection actor) {
257             this.shardName = shardName;
258             this.actorPath = actorPath;
259             this.actor = actor;
260         }
261
262         @Override public String getShardName() {
263             return shardName;
264         }
265
266         private ActorSelection getActor() {
267             return actor;
268         }
269
270         @Override public String getResolvedCohortPath(String cohortPath){
271             return actorContext.resolvePath(actorPath, cohortPath);
272         }
273
274         @Override public void closeTransaction() {
275             getActor().tell(
276                 new CloseTransaction().toSerializable(), null);
277         }
278
279         @Override public Object readyTransaction() {
280             return actorContext.executeRemoteOperation(getActor(),
281                 new ReadyTransaction().toSerializable(),
282                 ActorContext.ASK_DURATION
283             );
284
285         }
286
287         @Override public void deleteData(YangInstanceIdentifier path) {
288             getActor().tell(new DeleteData(path).toSerializable(), null);
289         }
290
291         @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data){
292             getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
293         }
294
295         @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
296                 final YangInstanceIdentifier path) {
297
298             Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
299
300                 @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
301                     Object response = actorContext
302                         .executeRemoteOperation(getActor(), new ReadData(path).toSerializable(),
303                             ActorContext.ASK_DURATION);
304                     if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
305                         ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
306                         if(reply.getNormalizedNode() == null){
307                             return Optional.absent();
308                         }
309                         return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
310                     }
311
312                     return Optional.absent();
313                 }
314             };
315
316             return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
317         }
318
319         @Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
320             getActor().tell(new WriteData(path, data, schemaContext).toSerializable(), null);
321         }
322
323     }
324
325     private class NoOpTransactionContext implements TransactionContext {
326
327         private final Logger
328             LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
329
330         private final String shardName;
331
332         private ActorRef cohort;
333
334         public NoOpTransactionContext(String shardName){
335             this.shardName = shardName;
336         }
337         @Override public String getShardName() {
338             return  shardName;
339
340         }
341
342         @Override public String getResolvedCohortPath(String cohortPath) {
343             return cohort.path().toString();
344         }
345
346         @Override public void closeTransaction() {
347             LOG.warn("txn {} closeTransaction called", identifier);
348         }
349
350         @Override public Object readyTransaction() {
351             LOG.warn("txn {} readyTransaction called", identifier);
352             cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
353             return new ReadyTransactionReply(cohort.path()).toSerializable();
354         }
355
356         @Override public void deleteData(YangInstanceIdentifier path) {
357             LOG.warn("txt {} deleteData called path = {}", identifier, path);
358         }
359
360         @Override public void mergeData(YangInstanceIdentifier path,
361             NormalizedNode<?, ?> data) {
362             LOG.warn("txn {} mergeData called path = {}", identifier, path);
363         }
364
365         @Override
366         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
367             YangInstanceIdentifier path) {
368             LOG.warn("txn {} readData called path = {}", identifier, path);
369             return Futures.immediateCheckedFuture(
370                 Optional.<NormalizedNode<?, ?>>absent());
371         }
372
373         @Override public void writeData(YangInstanceIdentifier path,
374             NormalizedNode<?, ?> data) {
375             LOG.warn("txn {} writeData called path = {}", identifier, path);
376         }
377     }
378
379
380
381 }