Merge "Custom mailbox that is bounded and instrumented."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
27 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
28 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
29 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
35 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
40 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import java.util.ArrayList;
48 import java.util.HashMap;
49 import java.util.List;
50 import java.util.Map;
51 import java.util.concurrent.Callable;
52 import java.util.concurrent.atomic.AtomicLong;
53
54 /**
55  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
56  * <p>
57  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
58  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
59  * be created on each of those shards by the TransactionProxy
60  *</p>
61  * <p>
62  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
63  * shards will be executed.
64  * </p>
65  */
66 public class TransactionProxy implements DOMStoreReadWriteTransaction {
67     public enum TransactionType {
68         READ_ONLY,
69         WRITE_ONLY,
70         READ_WRITE
71     }
72
73     private static final AtomicLong counter = new AtomicLong();
74
75     private static final Logger
76         LOG = LoggerFactory.getLogger(TransactionProxy.class);
77
78
79     private final TransactionType transactionType;
80     private final ActorContext actorContext;
81     private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
82     private final TransactionIdentifier identifier;
83     private final ListeningExecutorService executor;
84     private final SchemaContext schemaContext;
85
86     public TransactionProxy(
87         ActorContext actorContext,
88         TransactionType transactionType,
89         ListeningExecutorService executor,
90         SchemaContext schemaContext
91     ) {
92         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
93         this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
94         this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
95         this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
96
97         String memberName = actorContext.getCurrentMemberName();
98         if(memberName == null){
99             memberName = "UNKNOWN-MEMBER";
100         }
101         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
102
103         LOG.debug("Created txn {}", identifier);
104
105     }
106
107     @Override
108     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
109             final YangInstanceIdentifier path) {
110
111         LOG.debug("txn {} read {}", identifier, path);
112
113         createTransactionIfMissing(actorContext, path);
114
115         return transactionContext(path).readData(path);
116     }
117
118     @Override public CheckedFuture<Boolean, ReadFailedException> exists(
119         YangInstanceIdentifier path) {
120         LOG.debug("txn {} exists {}", identifier, path);
121
122         createTransactionIfMissing(actorContext, path);
123
124         return transactionContext(path).dataExists(path);
125     }
126
127     @Override
128     public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
129
130         LOG.debug("txn {} write {}", identifier, path);
131
132         createTransactionIfMissing(actorContext, path);
133
134         transactionContext(path).writeData(path, data);
135     }
136
137     @Override
138     public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
139
140         LOG.debug("txn {} merge {}", identifier, path);
141
142         createTransactionIfMissing(actorContext, path);
143
144         transactionContext(path).mergeData(path, data);
145     }
146
147     @Override
148     public void delete(YangInstanceIdentifier path) {
149
150         LOG.debug("txn {} delete {}", identifier, path);
151
152         createTransactionIfMissing(actorContext, path);
153
154         transactionContext(path).deleteData(path);
155     }
156
157     @Override
158     public DOMStoreThreePhaseCommitCohort ready() {
159         List<ActorPath> cohortPaths = new ArrayList<>();
160
161         LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
162
163         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
164
165             LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
166
167             Object result = transactionContext.readyTransaction();
168
169             if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
170                 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
171                 String resolvedCohortPath = transactionContext
172                     .getResolvedCohortPath(reply.getCohortPath().toString());
173                 cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
174             }
175         }
176
177         return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
178     }
179
180     @Override
181     public Object getIdentifier() {
182         return this.identifier;
183     }
184
185     @Override
186     public void close() {
187         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
188             transactionContext.closeTransaction();
189         }
190     }
191
192     private TransactionContext transactionContext(YangInstanceIdentifier path){
193         String shardName = shardNameFromIdentifier(path);
194         return remoteTransactionPaths.get(shardName);
195     }
196
197     private String shardNameFromIdentifier(YangInstanceIdentifier path){
198         return ShardStrategyFactory.getStrategy(path).findShard(path);
199     }
200
201     private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
202         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
203
204         TransactionContext transactionContext =
205             remoteTransactionPaths.get(shardName);
206
207         if(transactionContext != null){
208             // A transaction already exists with that shard
209             return;
210         }
211
212         try {
213             Object response = actorContext.executeShardOperation(shardName,
214                 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
215                 ActorContext.ASK_DURATION);
216             if (response.getClass()
217                 .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
218                 CreateTransactionReply reply =
219                     CreateTransactionReply.fromSerializable(response);
220
221                 String transactionPath = reply.getTransactionPath();
222
223                 LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
224
225                 ActorSelection transactionActor =
226                     actorContext.actorSelection(transactionPath);
227                 transactionContext =
228                     new TransactionContextImpl(shardName, transactionPath,
229                         transactionActor);
230
231                 remoteTransactionPaths.put(shardName, transactionContext);
232             }
233         } catch(TimeoutException | PrimaryNotFoundException e){
234             LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
235             remoteTransactionPaths.put(shardName,
236                 new NoOpTransactionContext(shardName));
237         }
238     }
239
240     private interface TransactionContext {
241         String getShardName();
242
243         String getResolvedCohortPath(String cohortPath);
244
245         public void closeTransaction();
246
247         public Object readyTransaction();
248
249         void deleteData(YangInstanceIdentifier path);
250
251         void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
252
253         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
254                 final YangInstanceIdentifier path);
255
256         void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
257
258         CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
259     }
260
261
262     private class TransactionContextImpl implements TransactionContext {
263         private final String shardName;
264         private final String actorPath;
265         private final ActorSelection actor;
266
267
268         private TransactionContextImpl(String shardName, String actorPath,
269             ActorSelection actor) {
270             this.shardName = shardName;
271             this.actorPath = actorPath;
272             this.actor = actor;
273         }
274
275         @Override public String getShardName() {
276             return shardName;
277         }
278
279         private ActorSelection getActor() {
280             return actor;
281         }
282
283         @Override public String getResolvedCohortPath(String cohortPath) {
284             return actorContext.resolvePath(actorPath, cohortPath);
285         }
286
287         @Override public void closeTransaction() {
288             getActor().tell(
289                 new CloseTransaction().toSerializable(), null);
290         }
291
292         @Override public Object readyTransaction() {
293             return actorContext.executeRemoteOperation(getActor(),
294                 new ReadyTransaction().toSerializable(),
295                 ActorContext.ASK_DURATION
296             );
297
298         }
299
300         @Override public void deleteData(YangInstanceIdentifier path) {
301             getActor().tell(new DeleteData(path).toSerializable(), null);
302         }
303
304         @Override public void mergeData(YangInstanceIdentifier path,
305             NormalizedNode<?, ?> data) {
306             getActor()
307                 .tell(new MergeData(path, data, schemaContext).toSerializable(),
308                     null);
309         }
310
311         @Override
312         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
313             final YangInstanceIdentifier path) {
314
315             Callable<Optional<NormalizedNode<?, ?>>> call =
316                 new Callable<Optional<NormalizedNode<?, ?>>>() {
317
318                     @Override public Optional<NormalizedNode<?, ?>> call()
319                         throws Exception {
320                         Object response = actorContext
321                             .executeRemoteOperation(getActor(),
322                                 new ReadData(path).toSerializable(),
323                                 ActorContext.ASK_DURATION);
324                         if (response.getClass()
325                             .equals(ReadDataReply.SERIALIZABLE_CLASS)) {
326                             ReadDataReply reply = ReadDataReply
327                                 .fromSerializable(schemaContext, path,
328                                     response);
329                             if (reply.getNormalizedNode() == null) {
330                                 return Optional.absent();
331                             }
332                             return Optional.<NormalizedNode<?, ?>>of(
333                                 reply.getNormalizedNode());
334                         }
335
336                         throw new ReadFailedException("Read Failed " + path);
337                     }
338                 };
339
340             return MappingCheckedFuture
341                 .create(executor.submit(call), ReadFailedException.MAPPER);
342         }
343
344         @Override public void writeData(YangInstanceIdentifier path,
345             NormalizedNode<?, ?> data) {
346             getActor()
347                 .tell(new WriteData(path, data, schemaContext).toSerializable(),
348                     null);
349         }
350
351         @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
352             final YangInstanceIdentifier path) {
353
354             Callable<Boolean> call = new Callable<Boolean>() {
355
356                 @Override public Boolean call() throws Exception {
357                     Object o = actorContext.executeRemoteOperation(getActor(),
358                         new DataExists(path).toSerializable(),
359                         ActorContext.ASK_DURATION
360                     );
361
362
363                     if (DataExistsReply.SERIALIZABLE_CLASS
364                         .equals(o.getClass())) {
365                         return DataExistsReply.fromSerializable(o).exists();
366                     }
367
368                     throw new ReadFailedException("Exists Failed " + path);
369                 }
370             };
371             return MappingCheckedFuture
372                 .create(executor.submit(call), ReadFailedException.MAPPER);
373         }
374     }
375
376     private class NoOpTransactionContext implements TransactionContext {
377
378         private final Logger
379             LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
380
381         private final String shardName;
382
383         private ActorRef cohort;
384
385         public NoOpTransactionContext(String shardName){
386             this.shardName = shardName;
387         }
388         @Override public String getShardName() {
389             return  shardName;
390
391         }
392
393         @Override public String getResolvedCohortPath(String cohortPath) {
394             return cohort.path().toString();
395         }
396
397         @Override public void closeTransaction() {
398             LOG.warn("txn {} closeTransaction called", identifier);
399         }
400
401         @Override public Object readyTransaction() {
402             LOG.warn("txn {} readyTransaction called", identifier);
403             cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
404             return new ReadyTransactionReply(cohort.path()).toSerializable();
405         }
406
407         @Override public void deleteData(YangInstanceIdentifier path) {
408             LOG.warn("txt {} deleteData called path = {}", identifier, path);
409         }
410
411         @Override public void mergeData(YangInstanceIdentifier path,
412             NormalizedNode<?, ?> data) {
413             LOG.warn("txn {} mergeData called path = {}", identifier, path);
414         }
415
416         @Override
417         public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
418             YangInstanceIdentifier path) {
419             LOG.warn("txn {} readData called path = {}", identifier, path);
420             return Futures.immediateCheckedFuture(
421                 Optional.<NormalizedNode<?, ?>>absent());
422         }
423
424         @Override public void writeData(YangInstanceIdentifier path,
425             NormalizedNode<?, ?> data) {
426             LOG.warn("txn {} writeData called path = {}", identifier, path);
427         }
428
429         @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
430             YangInstanceIdentifier path) {
431             LOG.warn("txn {} dataExists called path = {}", identifier, path);
432
433             // Returning false instead of an exception to keep this aligned with
434             // read
435             return Futures.immediateCheckedFuture(false);
436         }
437     }
438
439
440
441 }