/* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; import akka.dispatch.OnComplete; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.WriteData; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; /** * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard *

* Creating a transaction on the consumer side will create one instance of a transaction proxy. If during * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will * be created on each of those shards by the TransactionProxy *

*

* The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various * shards will be executed. *

*/ public class TransactionProxy implements DOMStoreReadWriteTransaction { public enum TransactionType { READ_ONLY, WRITE_ONLY, READ_WRITE } private static final AtomicLong counter = new AtomicLong(); private static final Logger LOG = LoggerFactory.getLogger(TransactionProxy.class); private final TransactionType transactionType; private final ActorContext actorContext; private final Map remoteTransactionPaths = new HashMap<>(); private final TransactionIdentifier identifier; private final SchemaContext schemaContext; private boolean inReadyState; public TransactionProxy(ActorContext actorContext, TransactionType transactionType, SchemaContext schemaContext) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null"); this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null"); String memberName = actorContext.getCurrentMemberName(); if(memberName == null){ memberName = "UNKNOWN-MEMBER"; } this.identifier = TransactionIdentifier.builder().memberName(memberName).counter( counter.getAndIncrement()).build(); LOG.debug("Created txn {}", identifier); } @Override public CheckedFuture>, ReadFailedException> read( final YangInstanceIdentifier path) { Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, "Read operation on write-only transaction is not allowed"); LOG.debug("txn {} read {}", identifier, path); createTransactionIfMissing(actorContext, path); return transactionContext(path).readData(path); } @Override public CheckedFuture exists(YangInstanceIdentifier path) { Preconditions.checkState(transactionType != TransactionType.WRITE_ONLY, "Exists operation on write-only transaction is not allowed"); LOG.debug("txn {} exists {}", identifier, path); createTransactionIfMissing(actorContext, path); return transactionContext(path).dataExists(path); } private void checkModificationState() { Preconditions.checkState(transactionType != TransactionType.READ_ONLY, "Modification operation on read-only transaction is not allowed"); Preconditions.checkState(!inReadyState, "Transaction is sealed - further modifications are allowed"); } @Override public void write(YangInstanceIdentifier path, NormalizedNode data) { checkModificationState(); LOG.debug("txn {} write {}", identifier, path); createTransactionIfMissing(actorContext, path); transactionContext(path).writeData(path, data); } @Override public void merge(YangInstanceIdentifier path, NormalizedNode data) { checkModificationState(); LOG.debug("txn {} merge {}", identifier, path); createTransactionIfMissing(actorContext, path); transactionContext(path).mergeData(path, data); } @Override public void delete(YangInstanceIdentifier path) { checkModificationState(); LOG.debug("txn {} delete {}", identifier, path); createTransactionIfMissing(actorContext, path); transactionContext(path).deleteData(path); } @Override public DOMStoreThreePhaseCommitCohort ready() { checkModificationState(); inReadyState = true; List cohortPaths = new ArrayList<>(); LOG.debug("txn {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size()); for(TransactionContext transactionContext : remoteTransactionPaths.values()) { LOG.debug("txn {} Readying transaction for shard {}", identifier, transactionContext.getShardName()); Object result = transactionContext.readyTransaction(); if(result.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)){ ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable( actorContext.getActorSystem(),result); String resolvedCohortPath = transactionContext.getResolvedCohortPath( reply.getCohortPath().toString()); cohortPaths.add(actorContext.actorFor(resolvedCohortPath)); } else { LOG.error("Was expecting {} but got {}", ReadyTransactionReply.SERIALIZABLE_CLASS, result.getClass()); } } return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier.toString()); } @Override public Object getIdentifier() { return this.identifier; } @Override public void close() { for(TransactionContext transactionContext : remoteTransactionPaths.values()) { transactionContext.closeTransaction(); } } private TransactionContext transactionContext(YangInstanceIdentifier path){ String shardName = shardNameFromIdentifier(path); return remoteTransactionPaths.get(shardName); } private String shardNameFromIdentifier(YangInstanceIdentifier path){ return ShardStrategyFactory.getStrategy(path).findShard(path); } private void createTransactionIfMissing(ActorContext actorContext, YangInstanceIdentifier path) { String shardName = ShardStrategyFactory.getStrategy(path).findShard(path); TransactionContext transactionContext = remoteTransactionPaths.get(shardName); if(transactionContext != null){ // A transaction already exists with that shard return; } try { Object response = actorContext.executeShardOperation(shardName, new CreateTransaction(identifier.toString(),this.transactionType.ordinal() ).toSerializable(), ActorContext.ASK_DURATION); if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response); String transactionPath = reply.getTransactionPath(); LOG.debug("txn {} Received transaction path = {}", identifier, transactionPath); ActorSelection transactionActor = actorContext.actorSelection(transactionPath); transactionContext = new TransactionContextImpl(shardName, transactionPath, transactionActor); remoteTransactionPaths.put(shardName, transactionContext); } else { LOG.error("Was expecting {} but got {}", CreateTransactionReply.SERIALIZABLE_CLASS, response.getClass()); } } catch(Exception e){ LOG.error("txn {} Creating NoOpTransaction because of : {}", identifier, e.getMessage()); remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e)); } } private interface TransactionContext { String getShardName(); String getResolvedCohortPath(String cohortPath); public void closeTransaction(); public Object readyTransaction(); void deleteData(YangInstanceIdentifier path); void mergeData(YangInstanceIdentifier path, NormalizedNode data); CheckedFuture>, ReadFailedException> readData( final YangInstanceIdentifier path); void writeData(YangInstanceIdentifier path, NormalizedNode data); CheckedFuture dataExists(YangInstanceIdentifier path); } private class TransactionContextImpl implements TransactionContext { private final String shardName; private final String actorPath; private final ActorSelection actor; private TransactionContextImpl(String shardName, String actorPath, ActorSelection actor) { this.shardName = shardName; this.actorPath = actorPath; this.actor = actor; } @Override public String getShardName() { return shardName; } private ActorSelection getActor() { return actor; } @Override public String getResolvedCohortPath(String cohortPath) { return actorContext.resolvePath(actorPath, cohortPath); } @Override public void closeTransaction() { actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable()); } @Override public Object readyTransaction() { return actorContext.executeRemoteOperation(getActor(), new ReadyTransaction().toSerializable(), ActorContext.ASK_DURATION); } @Override public void deleteData(YangInstanceIdentifier path) { actorContext.sendRemoteOperationAsync(getActor(), new DeleteData(path).toSerializable() ); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { actorContext.sendRemoteOperationAsync(getActor(), new MergeData(path, data, schemaContext).toSerializable()); } @Override public CheckedFuture>, ReadFailedException> readData( final YangInstanceIdentifier path) { final SettableFuture>> returnFuture = SettableFuture.create(); OnComplete onComplete = new OnComplete() { @Override public void onComplete(Throwable failure, Object response) throws Throwable { if(failure != null) { returnFuture.setException(new ReadFailedException( "Error reading data for path " + path, failure)); } else { if (response.getClass().equals(ReadDataReply.SERIALIZABLE_CLASS)) { ReadDataReply reply = ReadDataReply.fromSerializable(schemaContext, path, response); if (reply.getNormalizedNode() == null) { returnFuture.set(Optional.>absent()); } else { returnFuture.set(Optional.>of( reply.getNormalizedNode())); } } else { returnFuture.setException(new ReadFailedException( "Invalid response reading data for path " + path)); } } } }; Future future = actorContext.executeRemoteOperationAsync(getActor(), new ReadData(path).toSerializable(), ActorContext.ASK_DURATION); future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { actorContext.sendRemoteOperationAsync(getActor(), new WriteData(path, data, schemaContext).toSerializable()); } @Override public CheckedFuture dataExists( final YangInstanceIdentifier path) { final SettableFuture returnFuture = SettableFuture.create(); OnComplete onComplete = new OnComplete() { @Override public void onComplete(Throwable failure, Object response) throws Throwable { if(failure != null) { returnFuture.setException(new ReadFailedException( "Error checking exists for path " + path, failure)); } else { if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { returnFuture.set(Boolean.valueOf(DataExistsReply. fromSerializable(response).exists())); } else { returnFuture.setException(new ReadFailedException( "Invalid response checking exists for path " + path)); } } } }; Future future = actorContext.executeRemoteOperationAsync(getActor(), new DataExists(path).toSerializable(), ActorContext.ASK_DURATION); future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER); } } private class NoOpTransactionContext implements TransactionContext { private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class); private final String shardName; private final Exception failure; private ActorRef cohort; public NoOpTransactionContext(String shardName, Exception failure){ this.shardName = shardName; this.failure = failure; } @Override public String getShardName() { return shardName; } @Override public String getResolvedCohortPath(String cohortPath) { return cohort.path().toString(); } @Override public void closeTransaction() { LOG.warn("txn {} closeTransaction called", identifier); } @Override public Object readyTransaction() { LOG.warn("txn {} readyTransaction called", identifier); cohort = actorContext.getActorSystem().actorOf(Props.create(NoOpCohort.class)); return new ReadyTransactionReply(cohort.path()).toSerializable(); } @Override public void deleteData(YangInstanceIdentifier path) { LOG.warn("txt {} deleteData called path = {}", identifier, path); } @Override public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.warn("txn {} mergeData called path = {}", identifier, path); } @Override public CheckedFuture>, ReadFailedException> readData( YangInstanceIdentifier path) { LOG.warn("txn {} readData called path = {}", identifier, path); return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error reading data for path " + path, failure)); } @Override public void writeData(YangInstanceIdentifier path, NormalizedNode data) { LOG.warn("txn {} writeData called path = {}", identifier, path); } @Override public CheckedFuture dataExists( YangInstanceIdentifier path) { LOG.warn("txn {} dataExists called path = {}", identifier, path); return Futures.immediateFailedCheckedFuture(new ReadFailedException( "Error checking exists for path " + path, failure)); } } }