2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import com.google.common.base.Optional;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.Futures;
18 import com.google.common.util.concurrent.ListeningExecutorService;
19 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
20 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
21 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
22 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
24 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
25 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
26 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
30 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
31 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
33 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
36 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 import java.util.ArrayList;
44 import java.util.HashMap;
45 import java.util.List;
47 import java.util.concurrent.Callable;
48 import java.util.concurrent.atomic.AtomicLong;
51 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
53 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
54 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
55 * be created on each of those shards by the TransactionProxy
58 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
59 * shards will be executed.
62 public class TransactionProxy implements DOMStoreReadWriteTransaction {
63 public enum TransactionType {
69 private static final AtomicLong counter = new AtomicLong();
71 private static final Logger
72 LOG = LoggerFactory.getLogger(TransactionProxy.class);
75 private final TransactionType transactionType;
76 private final ActorContext actorContext;
77 private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
78 private final String identifier;
79 private final ListeningExecutorService executor;
80 private final SchemaContext schemaContext;
82 public TransactionProxy(
83 ActorContext actorContext,
84 TransactionType transactionType,
85 ListeningExecutorService executor,
86 SchemaContext schemaContext
89 this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement();
90 this.transactionType = transactionType;
91 this.actorContext = actorContext;
92 this.executor = executor;
93 this.schemaContext = schemaContext;
99 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
100 final YangInstanceIdentifier path) {
102 createTransactionIfMissing(actorContext, path);
104 return transactionContext(path).readData(path);
108 public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
110 createTransactionIfMissing(actorContext, path);
112 transactionContext(path).writeData(path, data);
116 public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
118 createTransactionIfMissing(actorContext, path);
120 transactionContext(path).mergeData(path, data);
124 public void delete(YangInstanceIdentifier path) {
126 createTransactionIfMissing(actorContext, path);
128 transactionContext(path).deleteData(path);
132 public DOMStoreThreePhaseCommitCohort ready() {
133 List<ActorPath> cohortPaths = new ArrayList<>();
135 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
136 Object result = transactionContext.readyTransaction();
138 if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
139 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
140 String resolvedCohortPath = transactionContext
141 .getResolvedCohortPath(reply.getCohortPath().toString());
142 cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
146 return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
150 public Object getIdentifier() {
151 return this.identifier;
155 public void close() {
156 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
157 transactionContext.closeTransaction();
161 private TransactionContext transactionContext(YangInstanceIdentifier path){
162 String shardName = shardNameFromIdentifier(path);
163 return remoteTransactionPaths.get(shardName);
166 private String shardNameFromIdentifier(YangInstanceIdentifier path){
167 return ShardStrategyFactory.getStrategy(path).findShard(path);
170 private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
171 String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
173 TransactionContext transactionContext =
174 remoteTransactionPaths.get(shardName);
176 if(transactionContext != null){
177 // A transaction already exists with that shard
182 Object response = actorContext.executeShardOperation(shardName,
183 new CreateTransaction(identifier).toSerializable(),
184 ActorContext.ASK_DURATION);
185 if (response.getClass()
186 .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
187 CreateTransactionReply reply =
188 CreateTransactionReply.fromSerializable(response);
190 String transactionPath = reply.getTransactionPath();
192 LOG.info("Received transaction path = {}" , transactionPath );
194 ActorSelection transactionActor =
195 actorContext.actorSelection(transactionPath);
197 new TransactionContextImpl(shardName, transactionPath,
200 remoteTransactionPaths.put(shardName, transactionContext);
202 } catch(TimeoutException | PrimaryNotFoundException e){
203 LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
204 remoteTransactionPaths.put(shardName,
205 new NoOpTransactionContext(shardName));
209 private interface TransactionContext {
210 String getShardName();
212 String getResolvedCohortPath(String cohortPath);
214 public void closeTransaction();
216 public Object readyTransaction();
218 void deleteData(YangInstanceIdentifier path);
220 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
222 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
223 final YangInstanceIdentifier path);
225 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
229 private class TransactionContextImpl implements TransactionContext{
230 private final String shardName;
231 private final String actorPath;
232 private final ActorSelection actor;
235 private TransactionContextImpl(String shardName, String actorPath,
236 ActorSelection actor) {
237 this.shardName = shardName;
238 this.actorPath = actorPath;
242 @Override public String getShardName() {
246 private ActorSelection getActor() {
250 @Override public String getResolvedCohortPath(String cohortPath){
251 return actorContext.resolvePath(actorPath, cohortPath);
254 @Override public void closeTransaction() {
256 new CloseTransaction().toSerializable(), null);
259 @Override public Object readyTransaction() {
260 return actorContext.executeRemoteOperation(getActor(),
261 new ReadyTransaction().toSerializable(),
262 ActorContext.ASK_DURATION
267 @Override public void deleteData(YangInstanceIdentifier path) {
268 getActor().tell(new DeleteData(path).toSerializable(), null);
271 @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data){
272 getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
275 @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
276 final YangInstanceIdentifier path) {
278 Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
280 @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
281 Object response = actorContext
282 .executeRemoteOperation(getActor(), new ReadData(path).toSerializable(),
283 ActorContext.ASK_DURATION);
284 if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
285 ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
286 if(reply.getNormalizedNode() == null){
287 return Optional.absent();
289 return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
292 return Optional.absent();
296 return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
299 @Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
300 getActor().tell(new WriteData(path, data, schemaContext).toSerializable(), null);
305 private class NoOpTransactionContext implements TransactionContext {
308 LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
310 private final String shardName;
312 private ActorRef cohort;
314 public NoOpTransactionContext(String shardName){
315 this.shardName = shardName;
317 @Override public String getShardName() {
322 @Override public String getResolvedCohortPath(String cohortPath) {
323 return cohort.path().toString();
326 @Override public void closeTransaction() {
327 LOG.error("closeTransaction called");
330 @Override public Object readyTransaction() {
331 LOG.error("readyTransaction called");
332 cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
333 return new ReadyTransactionReply(cohort.path()).toSerializable();
336 @Override public void deleteData(YangInstanceIdentifier path) {
337 LOG.error("deleteData called path = {}", path);
340 @Override public void mergeData(YangInstanceIdentifier path,
341 NormalizedNode<?, ?> data) {
342 LOG.error("mergeData called path = {}", path);
346 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
347 YangInstanceIdentifier path) {
348 LOG.error("readData called path = {}", path);
349 return Futures.immediateCheckedFuture(
350 Optional.<NormalizedNode<?, ?>>absent());
353 @Override public void writeData(YangInstanceIdentifier path,
354 NormalizedNode<?, ?> data) {
355 LOG.error("writeData called path = {}", path);