2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import com.google.common.base.Optional;
16 import com.google.common.base.Preconditions;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.Futures;
19 import com.google.common.util.concurrent.ListeningExecutorService;
20 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
21 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
22 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
23 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
24 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
26 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
27 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
28 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
29 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
30 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
34 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
35 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
36 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
37 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
40 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
41 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
42 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
47 import java.util.ArrayList;
48 import java.util.HashMap;
49 import java.util.List;
51 import java.util.concurrent.Callable;
52 import java.util.concurrent.atomic.AtomicLong;
55 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
57 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
58 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
59 * be created on each of those shards by the TransactionProxy
62 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
63 * shards will be executed.
66 public class TransactionProxy implements DOMStoreReadWriteTransaction {
67 public enum TransactionType {
73 private static final AtomicLong counter = new AtomicLong();
75 private static final Logger
76 LOG = LoggerFactory.getLogger(TransactionProxy.class);
79 private final TransactionType transactionType;
80 private final ActorContext actorContext;
81 private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
82 private final TransactionIdentifier identifier;
83 private final ListeningExecutorService executor;
84 private final SchemaContext schemaContext;
86 public TransactionProxy(
87 ActorContext actorContext,
88 TransactionType transactionType,
89 ListeningExecutorService executor,
90 SchemaContext schemaContext
92 this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
93 this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
94 this.executor = Preconditions.checkNotNull(executor, "executor should not be null");
95 this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
97 String memberName = actorContext.getCurrentMemberName();
98 if(memberName == null){
99 memberName = "UNKNOWN-MEMBER";
101 this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(counter.getAndIncrement()).build();
103 LOG.debug("Created txn {}", identifier);
108 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
109 final YangInstanceIdentifier path) {
111 LOG.debug("txn {} read {}", identifier, path);
113 createTransactionIfMissing(actorContext, path);
115 return transactionContext(path).readData(path);
118 @Override public CheckedFuture<Boolean, ReadFailedException> exists(
119 YangInstanceIdentifier path) {
120 LOG.debug("txn {} exists {}", identifier, path);
122 createTransactionIfMissing(actorContext, path);
124 return transactionContext(path).dataExists(path);
128 public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
130 LOG.debug("txn {} write {}", identifier, path);
132 createTransactionIfMissing(actorContext, path);
134 transactionContext(path).writeData(path, data);
138 public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
140 LOG.debug("txn {} merge {}", identifier, path);
142 createTransactionIfMissing(actorContext, path);
144 transactionContext(path).mergeData(path, data);
148 public void delete(YangInstanceIdentifier path) {
150 LOG.debug("txn {} delete {}", identifier, path);
152 createTransactionIfMissing(actorContext, path);
154 transactionContext(path).deleteData(path);
158 public DOMStoreThreePhaseCommitCohort ready() {
159 List<ActorPath> cohortPaths = new ArrayList<>();
161 LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size());
163 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
165 LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName());
167 Object result = transactionContext.readyTransaction();
169 if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
170 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(actorContext.getActorSystem(),result);
171 String resolvedCohortPath = transactionContext
172 .getResolvedCohortPath(reply.getCohortPath().toString());
173 cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
177 return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString(), executor);
181 public Object getIdentifier() {
182 return this.identifier;
186 public void close() {
187 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
188 transactionContext.closeTransaction();
192 private TransactionContext transactionContext(YangInstanceIdentifier path){
193 String shardName = shardNameFromIdentifier(path);
194 return remoteTransactionPaths.get(shardName);
197 private String shardNameFromIdentifier(YangInstanceIdentifier path){
198 return ShardStrategyFactory.getStrategy(path).findShard(path);
201 private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
202 String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
204 TransactionContext transactionContext =
205 remoteTransactionPaths.get(shardName);
207 if(transactionContext != null){
208 // A transaction already exists with that shard
213 Object response = actorContext.executeShardOperation(shardName,
214 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
215 ActorContext.ASK_DURATION);
216 if (response.getClass()
217 .equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
218 CreateTransactionReply reply =
219 CreateTransactionReply.fromSerializable(response);
221 String transactionPath = reply.getTransactionPath();
223 LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
225 ActorSelection transactionActor =
226 actorContext.actorSelection(transactionPath);
228 new TransactionContextImpl(shardName, transactionPath,
231 remoteTransactionPaths.put(shardName, transactionContext);
233 } catch(TimeoutException | PrimaryNotFoundException e){
234 LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
235 remoteTransactionPaths.put(shardName,
236 new NoOpTransactionContext(shardName));
240 private interface TransactionContext {
241 String getShardName();
243 String getResolvedCohortPath(String cohortPath);
245 public void closeTransaction();
247 public Object readyTransaction();
249 void deleteData(YangInstanceIdentifier path);
251 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
253 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
254 final YangInstanceIdentifier path);
256 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
258 CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
262 private class TransactionContextImpl implements TransactionContext {
263 private final String shardName;
264 private final String actorPath;
265 private final ActorSelection actor;
268 private TransactionContextImpl(String shardName, String actorPath,
269 ActorSelection actor) {
270 this.shardName = shardName;
271 this.actorPath = actorPath;
275 @Override public String getShardName() {
279 private ActorSelection getActor() {
283 @Override public String getResolvedCohortPath(String cohortPath) {
284 return actorContext.resolvePath(actorPath, cohortPath);
287 @Override public void closeTransaction() {
289 new CloseTransaction().toSerializable(), null);
292 @Override public Object readyTransaction() {
293 return actorContext.executeRemoteOperation(getActor(),
294 new ReadyTransaction().toSerializable(),
295 ActorContext.ASK_DURATION
300 @Override public void deleteData(YangInstanceIdentifier path) {
301 getActor().tell(new DeleteData(path).toSerializable(), null);
304 @Override public void mergeData(YangInstanceIdentifier path,
305 NormalizedNode<?, ?> data) {
307 .tell(new MergeData(path, data, schemaContext).toSerializable(),
312 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
313 final YangInstanceIdentifier path) {
315 Callable<Optional<NormalizedNode<?, ?>>> call =
316 new Callable<Optional<NormalizedNode<?, ?>>>() {
318 @Override public Optional<NormalizedNode<?, ?>> call()
320 Object response = actorContext
321 .executeRemoteOperation(getActor(),
322 new ReadData(path).toSerializable(),
323 ActorContext.ASK_DURATION);
324 if (response.getClass()
325 .equals(ReadDataReply.SERIALIZABLE_CLASS)) {
326 ReadDataReply reply = ReadDataReply
327 .fromSerializable(schemaContext, path,
329 if (reply.getNormalizedNode() == null) {
330 return Optional.absent();
332 return Optional.<NormalizedNode<?, ?>>of(
333 reply.getNormalizedNode());
336 throw new ReadFailedException("Read Failed " + path);
340 return MappingCheckedFuture
341 .create(executor.submit(call), ReadFailedException.MAPPER);
344 @Override public void writeData(YangInstanceIdentifier path,
345 NormalizedNode<?, ?> data) {
347 .tell(new WriteData(path, data, schemaContext).toSerializable(),
351 @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
352 final YangInstanceIdentifier path) {
354 Callable<Boolean> call = new Callable<Boolean>() {
356 @Override public Boolean call() throws Exception {
357 Object o = actorContext.executeRemoteOperation(getActor(),
358 new DataExists(path).toSerializable(),
359 ActorContext.ASK_DURATION
363 if (DataExistsReply.SERIALIZABLE_CLASS
364 .equals(o.getClass())) {
365 return DataExistsReply.fromSerializable(o).exists();
368 throw new ReadFailedException("Exists Failed " + path);
371 return MappingCheckedFuture
372 .create(executor.submit(call), ReadFailedException.MAPPER);
376 private class NoOpTransactionContext implements TransactionContext {
379 LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
381 private final String shardName;
383 private ActorRef cohort;
385 public NoOpTransactionContext(String shardName){
386 this.shardName = shardName;
388 @Override public String getShardName() {
393 @Override public String getResolvedCohortPath(String cohortPath) {
394 return cohort.path().toString();
397 @Override public void closeTransaction() {
398 LOG.warn("txn {} closeTransaction called", identifier);
401 @Override public Object readyTransaction() {
402 LOG.warn("txn {} readyTransaction called", identifier);
403 cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
404 return new ReadyTransactionReply(cohort.path()).toSerializable();
407 @Override public void deleteData(YangInstanceIdentifier path) {
408 LOG.warn("txt {} deleteData called path = {}", identifier, path);
411 @Override public void mergeData(YangInstanceIdentifier path,
412 NormalizedNode<?, ?> data) {
413 LOG.warn("txn {} mergeData called path = {}", identifier, path);
417 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
418 YangInstanceIdentifier path) {
419 LOG.warn("txn {} readData called path = {}", identifier, path);
420 return Futures.immediateCheckedFuture(
421 Optional.<NormalizedNode<?, ?>>absent());
424 @Override public void writeData(YangInstanceIdentifier path,
425 NormalizedNode<?, ?> data) {
426 LOG.warn("txn {} writeData called path = {}", identifier, path);
429 @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
430 YangInstanceIdentifier path) {
431 LOG.warn("txn {} dataExists called path = {}", identifier, path);
433 // Returning false instead of an exception to keep this aligned with
435 return Futures.immediateCheckedFuture(false);