import akka.actor.ActorSelection;
import akka.actor.Props;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.concurrent.MappingCheckedFuture;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
/**
private final ActorContext actorContext;
private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
private final String identifier;
- private final ExecutorService executor;
+ private final ListeningExecutorService executor;
private final SchemaContext schemaContext;
public TransactionProxy(
ActorContext actorContext,
TransactionType transactionType,
- ExecutorService executor,
+ ListeningExecutorService executor,
SchemaContext schemaContext
) {
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final YangInstanceIdentifier path) {
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
+ final YangInstanceIdentifier path) {
createTransactionIfMissing(actorContext, path);
try {
Object response = actorContext.executeShardOperation(shardName,
- new CreateTransaction(identifier).toSerializable(),
+ new CreateTransaction(identifier,this.transactionType.ordinal() ).toSerializable(),
ActorContext.ASK_DURATION);
if (response.getClass()
.equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
remoteTransactionPaths.put(shardName, transactionContext);
}
- } catch(TimeoutException e){
- remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName));
+ } catch(TimeoutException | PrimaryNotFoundException e){
+ LOG.error("Creating NoOpTransaction because of : {}", e.getMessage());
+ remoteTransactionPaths.put(shardName,
+ new NoOpTransactionContext(shardName));
}
}
void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
- ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(final YangInstanceIdentifier path);
+ CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path);
void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
}
getActor().tell(new MergeData(path, data, schemaContext).toSerializable(), null);
}
- @Override public ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(final YangInstanceIdentifier path) {
+ @Override public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
+ final YangInstanceIdentifier path) {
- Callable<Optional<NormalizedNode<?,?>>> call = new Callable() {
+ Callable<Optional<NormalizedNode<?,?>>> call = new Callable<Optional<NormalizedNode<?,?>>>() {
@Override public Optional<NormalizedNode<?,?>> call() throws Exception {
Object response = actorContext
if(reply.getNormalizedNode() == null){
return Optional.absent();
}
- //FIXME : A cast should not be required here ???
- return (Optional<NormalizedNode<?, ?>>) Optional.of(reply.getNormalizedNode());
+ return Optional.<NormalizedNode<?,?>>of(reply.getNormalizedNode());
}
return Optional.absent();
}
};
- ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
- future = ListenableFutureTask.create(call);
-
- executor.submit(future);
-
- return future;
+ return MappingCheckedFuture.create(executor.submit(call), ReadFailedException.MAPPER);
}
@Override public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
}
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> readData(
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readData(
YangInstanceIdentifier path) {
LOG.error("readData called path = {}", path);
- return Futures.immediateFuture(
+ return Futures.immediateCheckedFuture(
Optional.<NormalizedNode<?, ?>>absent());
}