2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
27 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
28 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
29 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
32 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
33 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
34 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
35 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
38 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
39 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 import java.util.ArrayList;
46 import java.util.HashMap;
47 import java.util.List;
49 import java.util.concurrent.Callable;
50 import java.util.concurrent.atomic.AtomicLong;
53 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
55 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
56 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
57 * be created on each of those shards by the TransactionProxy
60 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
61 * shards will be executed.
64 public class TransactionProxy implements DOMStoreReadWriteTransaction {
65 public enum TransactionType {
71 private static final AtomicLong counter = new AtomicLong();
73 private static final Logger
74 LOG = LoggerFactory.getLogger(TransactionProxy.class);
77 private final TransactionType transactionType;
78 private final ActorContext actorContext;
79 private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
80 private final TransactionIdentifier identifier;
81 private final ListeningExecutorService executor;
82 private final SchemaContext schemaContext;
84 public TransactionProxy(
85 ActorContext actorContext,
86 TransactionType transactionType,
87 ListeningExecutorService executor,
88 SchemaContext schemaContext
90 this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
91 this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
92 this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
93 this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
95 String memberName = actorContext.getCurrentMemberName();
96 if(memberName == null){
97 memberName = "UNKNOWN-MEMBER";
99 this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
101 LOG.debug("Created txn {}", identifier);
106 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
107 final YangInstanceIdentifier path) {
109 LOG.debug("txn {} read {}", identifier, path);
111 createTransactionIfMissing(actorContext, path);
113 return transactionContext(path).readData(path);
117 public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
119 LOG.debug("txn {} write {}", identifier, path);
121 createTransactionIfMissing(actorContext, path);
123 transactionContext(path).writeData(path, data);
127 public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
129 LOG.debug("txn {} merge {}", identifier, path);
131 createTransactionIfMissing(actorContext, path);
133 transactionContext(path).mergeData(path, data);
137 public void delete(YangInstanceIdentifier path) {
139 LOG.debug("txn {} delete {}", identifier, path);
141 createTransactionIfMissing(actorContext, path);
143 transactionContext(path).deleteData(path);
147 public DOMStoreThreePhaseCommitCohort ready() {
148 List<ActorPath> cohortPaths = new ArrayList<>();
150 LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
152 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
154 LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
156 Object result = transactionContext.readyTransaction();
158 if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
159 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
160 String resolvedCohortPath = transactionContext
161 .getResolvedCohortPath(reply.getCohortPath().toString());
162 cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
166 return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
170 public Object getIdentifier() {
171 return this.identifier;
175 public void close() {
176 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
177 transactionContext.closeTransaction();
181 private TransactionContext transactionContext(YangInstanceIdentifier path){
182 String shardName = shardNameFromIdentifier(path);
183 return remoteTransactionPaths.get(shardName);
186 private String shardNameFromIdentifier(YangInstanceIdentifier path){
187 return ShardStrategyFactory.getStrategy(path).findShard(path);
190 private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
191 String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
193 TransactionContext transactionContext =
194 remoteTransactionPaths.get(shardName);
196 if(transactionContext != null){
197 // A transaction already exists with that shard
202 Object response = actorContext.executeShardOperation(shardName,
203 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
204 ActorContext.ASK_DURATION);
205 if (response.getClass()
206 .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
207 CreateTransactionReply reply =
208 CreateTransactionReply.fromSerializable(response);
210 String transactionPath = reply.getTransactionPath();
212 LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
214 ActorSelection transactionActor =
215 actorContext.actorSelection(transactionPath);
217 new TransactionContextImpl(shardName, transactionPath,
220 remoteTransactionPaths.put(shardName, transactionContext);
222 } catch(TimeoutException | PrimaryNotFoundException e){
223 LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
224 remoteTransactionPaths.put(shardName,
225 new NoOpTransactionContext(shardName));
229 private interface TransactionContext {
230 String getShardName();
232 String getResolvedCohortPath(String cohortPath);
234 public void closeTransaction();
236 public Object readyTransaction();
238 void deleteData(YangInstanceIdentifier path);
240 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
242 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
243 final YangInstanceIdentifier path);
245 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
249 private class TransactionContextImpl implements TransactionContext{
250 private final String shardName;
251 private final String actorPath;
252 private final ActorSelection actor;
255 private TransactionContextImpl(String shardName, String actorPath,
256 ActorSelection actor) {
257 this.shardName = shardName;
258 this.actorPath = actorPath;
262 @Override public String getShardName() {
266 private ActorSelection getActor() {
270 @Override public String getResolvedCohortPath(String cohortPath){
271 return actorContext.resolvePath(actorPath, cohortPath);
274 @Override public void closeTransaction() {
276 new CloseTransaction().toSerializable(), null);
279 @Override public Object readyTransaction() {
280 return actorContext.executeRemoteOperation(getActor(),
281 new ReadyTransaction().toSerializable(),
282 ActorContext.ASK_DURATION
287 @Override public void deleteData(YangInstanceIdentifier path) {
288 getActor().tell(new DeleteData(path).toSerializable(), null);
291 @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data){
292 getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
295 @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
296 final YangInstanceIdentifier path) {
298 Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
300 @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
301 Object response = actorContext
302 .executeRemoteOperation(getActor(), new ReadData(path).toSerializable(),
303 ActorContext.ASK_DURATION);
304 if(response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)){
305 ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,path, response);
306 if(reply.getNormalizedNode() == null){
307 return Optional.absent();
309 return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
312 return Optional.absent();
316 return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
319 @Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
320 getActor().tell(new WriteData(path, data, schemaContext).toSerializable(), null);
325 private class NoOpTransactionContext implements TransactionContext {
328 LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
330 private final String shardName;
332 private ActorRef cohort;
334 public NoOpTransactionContext(String shardName){
335 this.shardName = shardName;
337 @Override public String getShardName() {
342 @Override public String getResolvedCohortPath(String cohortPath) {
343 return cohort.path().toString();
346 @Override public void closeTransaction() {
347 LOG.warn("txn {} closeTransaction called", identifier);
350 @Override public Object readyTransaction() {
351 LOG.warn("txn {} readyTransaction called", identifier);
352 cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
353 return new ReadyTransactionReply(cohort.path()).toSerializable();
356 @Override public void deleteData(YangInstanceIdentifier path) {
357 LOG.warn("txt {} deleteData called path = {}", identifier, path);
360 @Override public void mergeData(YangInstanceIdentifier path,
361 NormalizedNode<?, ?> data) {
362 LOG.warn("txn {} mergeData called path = {}", identifier, path);
366 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
367 YangInstanceIdentifier path) {
368 LOG.warn("txn {} readData called path = {}", identifier, path);
369 return Futures.immediateCheckedFuture(
370 Optional.<NormalizedNode<?, ?>>absent());
373 @Override public void writeData(YangInstanceIdentifier path,
374 NormalizedNode<?, ?> data) {
375 LOG.warn("txn {} writeData called path = {}", identifier, path);