2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
16 import com.google.common.base.Optional;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListeningExecutorService;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
23 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
25 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
26 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
27 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
31 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
32 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
34 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
37 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
38 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import java.util.ArrayList;
45 import java.util.HashMap;
46 import java.util.List;
48 import java.util.concurrent.Callable;
49 import java.util.concurrent.atomic.AtomicLong;
52 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
54 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
55 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
56 * be created on each of those shards by the TransactionProxy
59 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
60 * shards will be executed.
63 public class TransactionProxy implements DOMStoreReadWriteTransaction {
64 public enum TransactionType {
70 private static final AtomicLong counter = new AtomicLong();
72 private static final Logger
73 LOG = LoggerFactory.getLogger(TransactionProxy.class);
76 private final TransactionType transactionType;
77 private final ActorContext actorContext;
78 private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
79 private final String identifier;
80 private final ListeningExecutorService executor;
81 private final SchemaContext schemaContext;
83 public TransactionProxy(
84 ActorContext actorContext,
85 TransactionType transactionType,
86 ListeningExecutorService executor,
87 SchemaContext schemaContext
90 this.identifier = actorContext.getCurrentMemberName() + "-txn-" + counter.getAndIncrement();
91 this.transactionType = transactionType;
92 this.actorContext = actorContext;
93 this.executor = executor;
94 this.schemaContext = schemaContext;
100 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
101 final YangInstanceIdentifier path) {
103 createTransactionIfMissing(actorContext, path);
105 return transactionContext(path).readData(path);
109 public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
111 createTransactionIfMissing(actorContext, path);
113 transactionContext(path).writeData(path, data);
117 public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
119 createTransactionIfMissing(actorContext, path);
121 transactionContext(path).mergeData(path, data);
125 public void delete(YangInstanceIdentifier path) {
127 createTransactionIfMissing(actorContext, path);
129 transactionContext(path).deleteData(path);
133 public DOMStoreThreePhaseCommitCohort ready() {
134 List<ActorPath> cohortPaths = new ArrayList<>();
136 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
137 Object result = transactionContext.readyTransaction();
139 if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
140 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
141 String resolvedCohortPath = transactionContext
142 .getResolvedCohortPath(reply.getCohortPath().toString());
143 cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
147 return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
151 public Object getIdentifier() {
152 return this.identifier;
156 public void close() {
157 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
158 transactionContext.closeTransaction();
162 private TransactionContext transactionContext(YangInstanceIdentifier path){
163 String shardName = shardNameFromIdentifier(path);
164 return remoteTransactionPaths.get(shardName);
167 private String shardNameFromIdentifier(YangInstanceIdentifier path){
168 return ShardStrategyFactory.getStrategy(path).findShard(path);
171 private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
172 String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
174 TransactionContext transactionContext =
175 remoteTransactionPaths.get(shardName);
177 if(transactionContext != null){
178 // A transaction already exists with that shard
183 Object response = actorContext.executeShardOperation(shardName,
184 new CreateTransaction(identifier).toSerializable(),
185 ActorContext.ASK_DURATION);
186 if (response.getClass()
187 .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
188 CreateTransactionReply reply =
189 CreateTransactionReply.fromSerializable(response);
191 String transactionPath = reply.getTransactionPath();
193 LOG.info("Received transaction path = {}" , transactionPath );
195 ActorSelection transactionActor =
196 actorContext.actorSelection(transactionPath);
198 new TransactionContextImpl(shardName, transactionPath,
201 remoteTransactionPaths.put(shardName, transactionContext);
203 } catch(TimeoutException e){
204 LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
205 remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName));
209 private interface TransactionContext {
210 String getShardName();
212 String getResolvedCohortPath(String cohortPath);
214 public void closeTransaction();
216 public Object readyTransaction();
218 void deleteData(YangInstanceIdentifier path);
220 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
222 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
223 final YangInstanceIdentifier path);
225 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
229 private class TransactionContextImpl implements TransactionContext{
230 private final String shardName;
231 private final String actorPath;
232 private final ActorSelection actor;
235 private TransactionContextImpl(String shardName, String actorPath,
236 ActorSelection actor) {
237 this.shardName = shardName;
238 this.actorPath = actorPath;
242 @Override public String getShardName() {
246 private ActorSelection getActor() {
250 @Override public String getResolvedCohortPath(String cohortPath){
251 return actorContext.resolvePath(actorPath, cohortPath);
254 @Override public void closeTransaction() {
256 new CloseTransaction().toSerializable(), null);
259 @Override public Object readyTransaction() {
260 return actorContext.executeRemoteOperation(getActor(),
261 new ReadyTransaction().toSerializable(),
262 ActorContext.ASK_DURATION
267 @Override public void deleteData(YangInstanceIdentifier path) {
268 getActor().tell(new DeleteData(path).toSerializable(), null);
271 @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data){
272 getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
275 @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
276 final YangInstanceIdentifier path) {
278 Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
280 @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
281 Object response = actorContext
282 .executeRemoteOperation(getActor(), new ReadData(path).toSerializable(),
283 ActorContext.ASK_DURATION);
284 if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
285 ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
286 if(reply.getNormalizedNode() == null){
287 return Optional.absent();
289 return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
292 return Optional.absent();
296 return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
299 @Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
300 getActor().tell(new WriteData(path, data, schemaContext).toSerializable(), null);
305 private class NoOpTransactionContext implements TransactionContext {
308 LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
310 private final String shardName;
312 private ActorRef cohort;
314 public NoOpTransactionContext(String shardName){
315 this.shardName = shardName;
317 @Override public String getShardName() {
322 @Override public String getResolvedCohortPath(String cohortPath) {
323 return cohort.path().toString();
326 @Override public void closeTransaction() {
327 LOG.error("closeTransaction called");
330 @Override public Object readyTransaction() {
331 LOG.error("readyTransaction called");
332 cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
333 return new ReadyTransactionReply(cohort.path()).toSerializable();
336 @Override public void deleteData(YangInstanceIdentifier path) {
337 LOG.error("deleteData called path = {}", path);
340 @Override public void mergeData(YangInstanceIdentifier path,
341 NormalizedNode<?, ?> data) {
342 LOG.error("mergeData called path = {}", path);
346 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
347 YangInstanceIdentifier path) {
348 LOG.error("readData called path = {}", path);
349 return Futures.immediateCheckedFuture(
350 Optional.<NormalizedNode<?, ?>>absent());
353 @Override public void writeData(YangInstanceIdentifier path,
354 NormalizedNode<?, ?> data) {
355 LOG.error("writeData called path = {}", path);