import akka.event.LoggingAdapter;
import akka.japi.Creator;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.CheckedFuture;
+import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.DeleteDataReply;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import java.util.concurrent.ExecutionException;
-
/**
* The ShardTransaction Actor represents a remote transaction
* <p>
protected ShardTransaction(DOMStoreTransactionChain transactionChain,
ActorRef shardActor, SchemaContext schemaContext) {
this.transactionChain = transactionChain;
- //this.transaction = transaction;
this.shardActor = shardActor;
this.schemaContext = schemaContext;
}
getSender().tell(new GetCompositeModificationReply(
new ImmutableCompositeModification(modification)), getSelf());
}else{
- throw new Exception ("ShardTransaction:handleRecieve received an unknown message"+message);
+ throw new UnknownMessageException(message);
}
}
final ActorRef sender = getSender();
final ActorRef self = getSelf();
final YangInstanceIdentifier path = message.getPath();
- final ListenableFuture<Optional<NormalizedNode<?, ?>>> future =
- transaction.read(path);
+ final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
+ transaction.read(path);
- future.addListener(new Runnable() {
+ future.addListener(new Runnable() {
@Override
public void run() {
try {
- Optional<NormalizedNode<?, ?>> optional = future.get();
+ Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
if (optional.isPresent()) {
sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
} else {
sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
}
- } catch (InterruptedException | ExecutionException e) {
- log.error(e,
- "An exception happened when reading data from path : "
- + path.toString());
+ } catch (Exception e) {
+ sender.tell(new akka.actor.Status.Failure(e),self);
}
}
}, getContext().dispatcher());
}
+ protected void dataExists(DOMStoreReadTransaction transaction, DataExists message) {
+ final YangInstanceIdentifier path = message.getPath();
+
+ try {
+ Boolean exists = transaction.exists(path).checkedGet();
+ getSender().tell(new DataExistsReply(exists).toSerializable(), getSelf());
+ } catch (ReadFailedException e) {
+ getSender().tell(new akka.actor.Status.Failure(e),getSelf());
+ }
+
+ }
protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
modification.addModification(
new WriteModification(message.getPath(), message.getData(),schemaContext));
LOG.debug("writeData at path : " + message.getPath().toString());
- transaction.write(message.getPath(), message.getData());
- getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+
+ try {
+ transaction.write(message.getPath(), message.getData());
+ getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+ }catch(Exception e){
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
}
protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
modification.addModification(
new MergeModification(message.getPath(), message.getData(), schemaContext));
LOG.debug("mergeData at path : " + message.getPath().toString());
- transaction.merge(message.getPath(), message.getData());
- getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+ try {
+ transaction.merge(message.getPath(), message.getData());
+ getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+ }catch(Exception e){
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
}
protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
+ LOG.debug("deleteData at path : " + message.getPath().toString());
modification.addModification(new DeleteModification(message.getPath()));
- transaction.delete(message.getPath());
- getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+ try {
+ transaction.delete(message.getPath());
+ getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+ }catch(Exception e){
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
}
protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {