Merge changes I442a0ee9,I11825b90
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / TransactionProxy.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorSelection;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListenableFutureTask;
16 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
17 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
18 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
19 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
20 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
21 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
22 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
25 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
26 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
29 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
31 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
32
33 import java.util.ArrayList;
34 import java.util.HashMap;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.atomic.AtomicLong;
40
41 /**
42  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
43  * <p>
44  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
45  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
46  * be created on each of those shards by the TransactionProxy
47  *</p>
48  * <p>
49  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
50  * shards will be executed.
51  * </p>
52  */
53 public class TransactionProxy implements DOMStoreReadWriteTransaction {
54     public enum TransactionType {
55         READ_ONLY,
56         WRITE_ONLY,
57         READ_WRITE
58     }
59
60     private static final AtomicLong counter = new AtomicLong();
61
62     private final TransactionType transactionType;
63     private final ActorContext actorContext;
64     private final Map<String, ActorSelection> remoteTransactionPaths = new HashMap<>();
65     private final String identifier;
66     private final ExecutorService executor;
67     private final SchemaContext schemaContext;
68
69     public TransactionProxy(
70         ActorContext actorContext,
71         TransactionType transactionType,
72         ExecutorService executor,
73         SchemaContext schemaContext
74         ) {
75
76         this.identifier = "txn-" + counter.getAndIncrement();
77         this.transactionType = transactionType;
78         this.actorContext = actorContext;
79         this.executor = executor;
80         this.schemaContext = schemaContext;
81
82         Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(identifier), ActorContext.ASK_DURATION);
83         if(response instanceof CreateTransactionReply){
84             CreateTransactionReply reply = (CreateTransactionReply) response;
85             remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionActorPath()));
86         }
87     }
88
89     @Override
90     public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
91         final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
92
93         Callable<Optional<NormalizedNode<?,?>>> call = new Callable() {
94
95             @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
96                 Object response = actorContext
97                     .executeRemoteOperation(remoteTransaction, new ReadData(path).toSerializable(),
98                         ActorContext.ASK_DURATION);
99                 if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
100                     ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
101                     if(reply.getNormalizedNode() == null){
102                         return Optional.absent();
103                     }
104                     //FIXME : A cast should not be required here ???
105                     return (Optional<NormalizedNode<?, ?>>) Optional.of(reply.getNormalizedNode());
106                 }
107
108                 return Optional.absent();
109             }
110         };
111
112         ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
113             future = ListenableFutureTask.create(call);
114
115         executor.submit(future);
116
117         return future;
118     }
119
120     @Override
121     public void write(InstanceIdentifier path, NormalizedNode<?, ?> data) {
122         final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
123         remoteTransaction.tell(new WriteData(path, data, schemaContext).toSerializable(), null);
124     }
125
126     @Override
127     public void merge(InstanceIdentifier path, NormalizedNode<?, ?> data) {
128         final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
129         remoteTransaction.tell(new MergeData(path, data, schemaContext).toSerializable(), null);
130     }
131
132     @Override
133     public void delete(InstanceIdentifier path) {
134         final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
135         remoteTransaction.tell(new DeleteData(path).toSerializable(), null);
136     }
137
138     @Override
139     public DOMStoreThreePhaseCommitCohort ready() {
140         List<ActorPath> cohortPaths = new ArrayList<>();
141
142         for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) {
143             Object result = actorContext.executeRemoteOperation(remoteTransaction,
144                 new ReadyTransaction(),
145                 ActorContext.ASK_DURATION
146             );
147
148             if(result instanceof ReadyTransactionReply){
149                 ReadyTransactionReply reply = (ReadyTransactionReply) result;
150                 cohortPaths.add(reply.getCohortPath());
151             }
152         }
153
154         return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
155     }
156
157     @Override
158     public Object getIdentifier() {
159         return this.identifier;
160     }
161
162     @Override
163     public void close() {
164         for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) {
165             remoteTransaction.tell(new CloseTransaction(), null);
166         }
167     }
168
169     private ActorSelection remoteTransactionFromIdentifier(InstanceIdentifier path){
170         String shardName = shardNameFromIdentifier(path);
171         return remoteTransactionPaths.get(shardName);
172     }
173
174     private String shardNameFromIdentifier(InstanceIdentifier path){
175         return Shard.DEFAULT_NAME;
176     }
177 }