2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.Props;
15 import akka.dispatch.OnComplete;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import com.google.common.util.concurrent.CheckedFuture;
20 import com.google.common.util.concurrent.Futures;
21 import com.google.common.util.concurrent.SettableFuture;
23 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
24 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
25 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
26 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
27 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
28 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
29 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
30 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
31 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
32 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
33 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
34 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
35 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
36 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
38 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
41 import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
48 import scala.concurrent.Future;
50 import java.util.ArrayList;
51 import java.util.HashMap;
52 import java.util.List;
54 import java.util.concurrent.atomic.AtomicLong;
57 * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
59 * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
60 * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
61 * be created on each of those shards by the TransactionProxy
64 * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
65 * shards will be executed.
68 public class TransactionProxy implements DOMStoreReadWriteTransaction {
69 public enum TransactionType {
75 private static final AtomicLong counter = new AtomicLong();
77 private static final Logger
78 LOG = LoggerFactory.getLogger(TransactionProxy.class);
81 private final TransactionType transactionType;
82 private final ActorContext actorContext;
83 private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
84 private final TransactionIdentifier identifier;
85 private final SchemaContext schemaContext;
86 private boolean inReadyState;
88 public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
89 SchemaContext schemaContext) {
90 this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
91 this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
92 this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
94 String memberName = actorContext.getCurrentMemberName();
95 if(memberName == null){
96 memberName = "UNKNOWN-MEMBER";
99 this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
100 counter.getAndIncrement()).build();
102 LOG.debug("Created txn {}", identifier);
107 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
108 final YangInstanceIdentifier path) {
110 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
111 "Read operation on write-only transaction is not allowed");
113 LOG.debug("txn {} read {}", identifier, path);
115 createTransactionIfMissing(actorContext, path);
117 return transactionContext(path).readData(path);
121 public CheckedFuture<Boolean, ReadFailedException> exists(YangInstanceIdentifier path) {
123 Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY,
124 "Exists operation on write-only transaction is not allowed");
126 LOG.debug("txn {} exists {}", identifier, path);
128 createTransactionIfMissing(actorContext, path);
130 return transactionContext(path).dataExists(path);
133 private void checkModificationState() {
134 Preconditions.checkState(transactionType != TransactionType.READ_ONLY,
135 "Modification operation on read-only transaction is not allowed");
136 Preconditions.checkState(!inReadyState,
137 "Transaction is sealed - further modifications are allowed");
141 public void write(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
143 checkModificationState();
145 LOG.debug("txn {} write {}", identifier, path);
147 createTransactionIfMissing(actorContext, path);
149 transactionContext(path).writeData(path, data);
153 public void merge(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
155 checkModificationState();
157 LOG.debug("txn {} merge {}", identifier, path);
159 createTransactionIfMissing(actorContext, path);
161 transactionContext(path).mergeData(path, data);
165 public void delete(YangInstanceIdentifier path) {
167 checkModificationState();
169 LOG.debug("txn {} delete {}", identifier, path);
171 createTransactionIfMissing(actorContext, path);
173 transactionContext(path).deleteData(path);
177 public DOMStoreThreePhaseCommitCohort ready() {
179 checkModificationState();
183 List<ActorPath> cohortPaths = new ArrayList<>();
185 LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier,
186 remoteTransactionPaths.size());
188 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
190 LOG.debug("txn {} Readying transaction for shard {}", identifier,
191 transactionContext.getShardName());
193 Object result = transactionContext.readyTransaction();
195 if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){
196 ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
197 actorContext.getActorSystem(),result);
198 String resolvedCohortPath = transactionContext.getResolvedCohortPath(
199 reply.getCohortPath().toString());
200 cohortPaths.add(actorContext.actorFor(resolvedCohortPath));
202 LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS,
207 return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString());
211 public Object getIdentifier() {
212 return this.identifier;
216 public void close() {
217 for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
218 transactionContext.closeTransaction();
222 private TransactionContext transactionContext(YangInstanceIdentifier path){
223 String shardName = shardNameFromIdentifier(path);
224 return remoteTransactionPaths.get(shardName);
227 private String shardNameFromIdentifier(YangInstanceIdentifier path){
228 return ShardStrategyFactory.getStrategy(path).findShard(path);
231 private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) {
232 String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
234 TransactionContext transactionContext =
235 remoteTransactionPaths.get(shardName);
237 if(transactionContext != null){
238 // A transaction already exists with that shard
243 Object response = actorContext.executeShardOperation(shardName,
244 new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(),
245 ActorContext.ASK_DURATION);
246 if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
247 CreateTransactionReply reply =
248 CreateTransactionReply.fromSerializable(response);
250 String transactionPath = reply.getTransactionPath();
252 LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath);
254 ActorSelection transactionActor =
255 actorContext.actorSelection(transactionPath);
257 new TransactionContextImpl(shardName, transactionPath,
260 remoteTransactionPaths.put(shardName, transactionContext);
262 LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS,
263 response.getClass());
265 } catch(Exception e){
266 LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
267 remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
271 private interface TransactionContext {
272 String getShardName();
274 String getResolvedCohortPath(String cohortPath);
276 public void closeTransaction();
278 public Object readyTransaction();
280 void deleteData(YangInstanceIdentifier path);
282 void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
284 CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
285 final YangInstanceIdentifier path);
287 void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
289 CheckedFuture<Boolean, ReadFailedException> dataExists(YangInstanceIdentifier path);
293 private class TransactionContextImpl implements TransactionContext {
294 private final String shardName;
295 private final String actorPath;
296 private final ActorSelection actor;
299 private TransactionContextImpl(String shardName, String actorPath,
300 ActorSelection actor) {
301 this.shardName = shardName;
302 this.actorPath = actorPath;
307 public String getShardName() {
311 private ActorSelection getActor() {
316 public String getResolvedCohortPath(String cohortPath) {
317 return actorContext.resolvePath(actorPath, cohortPath);
321 public void closeTransaction() {
322 actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable());
326 public Object readyTransaction() {
327 return actorContext.executeRemoteOperation(getActor(),
328 new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION);
332 public void deleteData(YangInstanceIdentifier path) {
333 actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() );
337 public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
338 actorContext.sendRemoteOperationAsync(getActor(),
339 new MergeData(path, data, schemaContext).toSerializable());
343 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
344 final YangInstanceIdentifier path) {
346 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
348 OnComplete<Object> onComplete = new OnComplete<Object>() {
350 public void onComplete(Throwable failure, Object response) throws Throwable {
351 if(failure != null) {
352 returnFuture.setException(new ReadFailedException(
353 "Error reading data for path " + path, failure));
355 if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) {
356 ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext,
358 if (reply.getNormalizedNode() == null) {
359 returnFuture.set(Optional.<NormalizedNode<?, ?>>absent());
361 returnFuture.set(Optional.<NormalizedNode<?, ?>>of(
362 reply.getNormalizedNode()));
365 returnFuture.setException(new ReadFailedException(
366 "Invalid response reading data for path " + path));
372 Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
373 new ReadData(path).toSerializable(), ActorContext.ASK_DURATION);
374 future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
376 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
380 public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
381 actorContext.sendRemoteOperationAsync(getActor(),
382 new WriteData(path, data, schemaContext).toSerializable());
386 public CheckedFuture<Boolean, ReadFailedException> dataExists(
387 final YangInstanceIdentifier path) {
389 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
391 OnComplete<Object> onComplete = new OnComplete<Object>() {
393 public void onComplete(Throwable failure, Object response) throws Throwable {
394 if(failure != null) {
395 returnFuture.setException(new ReadFailedException(
396 "Error checking exists for path " + path, failure));
398 if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
399 returnFuture.set(Boolean.valueOf(DataExistsReply.
400 fromSerializable(response).exists()));
402 returnFuture.setException(new ReadFailedException(
403 "Invalid response checking exists for path " + path));
409 Future<Object> future = actorContext.executeRemoteOperationAsync(getActor(),
410 new DataExists(path).toSerializable(), ActorContext.ASK_DURATION);
411 future.onComplete(onComplete, actorContext.getActorSystem().dispatcher());
413 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
417 private class NoOpTransactionContext implements TransactionContext {
420 LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
422 private final String shardName;
423 private final Exception failure;
425 private ActorRef cohort;
427 public NoOpTransactionContext(String shardName, Exception failure){
428 this.shardName = shardName;
429 this.failure = failure;
433 public String getShardName() {
439 public String getResolvedCohortPath(String cohortPath) {
440 return cohort.path().toString();
444 public void closeTransaction() {
445 LOG.warn("txn {} closeTransaction called", identifier);
448 @Override public Object readyTransaction() {
449 LOG.warn("txn {} readyTransaction called", identifier);
450 cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class));
451 return new ReadyTransactionReply(cohort.path()).toSerializable();
455 public void deleteData(YangInstanceIdentifier path) {
456 LOG.warn("txt {} deleteData called path = {}", identifier, path);
460 public void mergeData(YangInstanceIdentifier path,
461 NormalizedNode<?, ?> data) {
462 LOG.warn("txn {} mergeData called path = {}", identifier, path);
466 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
467 YangInstanceIdentifier path) {
468 LOG.warn("txn {} readData called path = {}", identifier, path);
469 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
470 "Error reading data for path " + path, failure));
473 @Override public void writeData(YangInstanceIdentifier path,
474 NormalizedNode<?, ?> data) {
475 LOG.warn("txn {} writeData called path = {}", identifier, path);
478 @Override public CheckedFuture<Boolean, ReadFailedException> dataExists(
479 YangInstanceIdentifier path) {
480 LOG.warn("txn {} dataExists called path = {}", identifier, path);
481 return Futures.immediateFailedCheckedFuture(new ReadFailedException(
482 "Error checking exists for path " + path, failure));