package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import scala.concurrent.Future;
/**
protected abstract DOMStoreReadTransaction getReadDelegate();
@Override
- public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+ public void executeModification(AbstractModification modification) {
incrementModificationCount();
if(operationError == null) {
try {
- getWriteDelegate().write(path, data);
+ modification.apply(getWriteDelegate());
} catch (Exception e) {
operationError = e;
}
}
-
- }
-
- @Override
- public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- incrementModificationCount();
- if(operationError == null) {
- try {
- getWriteDelegate().merge(path, data);
- } catch (Exception e) {
- operationError = e;
- }
- }
- }
-
- @Override
- public void deleteData(YangInstanceIdentifier path) {
- incrementModificationCount();
- if(operationError == null) {
- try {
- getWriteDelegate().delete(path);
- } catch (Exception e) {
- operationError = e;
- }
- }
- }
-
- @Override
- public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
- Futures.addCallback(getReadDelegate().read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
- @Override
- public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
- proxyFuture.set(result);
- }
-
- @Override
- public void onFailure(final Throwable t) {
- proxyFuture.setException(t);
- }
- });
}
@Override
- public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
- Futures.addCallback(getReadDelegate().exists(path), new FutureCallback<Boolean>() {
+ public <T> void executeRead(AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture) {
+ Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
@Override
- public void onSuccess(final Boolean result) {
+ public void onSuccess(final T result) {
proxyFuture.set(result);
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
}
@Override
- public void deleteData(YangInstanceIdentifier path) {
- LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
+ public void executeModification(AbstractModification modification) {
+ LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass().getSimpleName(),
+ modification.getPath());
}
@Override
- public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
- }
-
- @Override
- public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
- }
-
- @Override
- public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
- LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
+ public <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> proxyFuture) {
+ LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+ readCmd.getPath());
final Throwable t;
if (failure instanceof NoShardLeaderException) {
} else {
t = failure;
}
- proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, t));
- }
-
- @Override
- public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
- LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
- proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
+ proxyFuture.setException(new ReadFailedException("Error executeRead " + readCmd.getClass().getSimpleName()
+ + " for path " + readCmd.getPath(), t));
}
}
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
}
@Override
- public void deleteData(YangInstanceIdentifier path) {
- LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
-
- acquireOperation();
- batchModification(new DeleteModification(path));
- }
-
- @Override
- public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
-
- acquireOperation();
- batchModification(new MergeModification(path, data));
- }
-
- @Override
- public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
-
- acquireOperation();
- batchModification(new WriteModification(path, data));
- }
-
- @Override
- public void readData(final YangInstanceIdentifier path,
- final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
-
- LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
-
- // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
- // public API contract.
+ public void executeModification(AbstractModification modification) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass()
+ .getSimpleName(), modification.getPath());
+ }
acquireOperation();
- sendBatchedModifications();
-
- OnComplete<Object> onComplete = new OnComplete<Object>() {
- @Override
- public void onComplete(Throwable failure, Object readResponse) throws Throwable {
- if(failure != null) {
- LOG.debug("Tx {} read operation failed: {}", getIdentifier(), failure);
- returnFuture.setException(new ReadFailedException(
- "Error reading data for path " + path, failure));
-
- } else {
- LOG.debug("Tx {} read operation succeeded", getIdentifier(), failure);
-
- if (readResponse instanceof ReadDataReply) {
- ReadDataReply reply = (ReadDataReply) readResponse;
- returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
-
- } else if (ReadDataReply.isSerializedType(readResponse)) {
- ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
- returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
-
- } else {
- returnFuture.setException(new ReadFailedException(
- "Invalid response reading data for path " + path));
- }
- }
- }
- };
-
- Future<Object> readFuture = executeOperationAsync(new ReadData(path));
-
- readFuture.onComplete(onComplete, actorContext.getClientDispatcher());
+ batchModification(modification);
}
@Override
- public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> returnFuture) {
-
- LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
+ public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+ readCmd.getPath());
+ }
// Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
// public API contract.
@Override
public void onComplete(Throwable failure, Object response) throws Throwable {
if(failure != null) {
- LOG.debug("Tx {} dataExists operation failed: {}", getIdentifier(), failure);
- returnFuture.setException(new ReadFailedException(
- "Error checking data exists for path " + path, failure));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+ failure);
+ }
+ returnFuture.setException(new ReadFailedException("Error checking " + readCmd.getClass().getSimpleName()
+ + " for path " + readCmd.getPath(), failure));
} else {
- LOG.debug("Tx {} dataExists operation succeeded", getIdentifier(), failure);
-
- if (response instanceof DataExistsReply) {
- returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
-
- } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
- returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
-
- } else {
- returnFuture.setException(new ReadFailedException(
- "Invalid response checking exists for path " + path));
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName());
}
+ readCmd.processResponse(response, returnFuture);
}
}
};
- Future<Object> future = executeOperationAsync(new DataExists(path));
+ Future<Object> future = executeOperationAsync(readCmd);
future.onComplete(onComplete, actorContext.getClientDispatcher());
}
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
import scala.concurrent.Future;
/*
Future<ActorSelection> readyTransaction();
- void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
+ void executeModification(AbstractModification modification);
- void deleteData(YangInstanceIdentifier path);
-
- void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
-
- void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture);
-
- void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
+ <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise);
boolean supportsDirectCommit();
import java.util.Map.Entry;
import java.util.Set;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@Override
public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+ return executeRead(shardNameFromIdentifier(path), new DataExists(path));
+ }
+
+ private <T> CheckedFuture<T, ReadFailedException> executeRead(String shardName, final AbstractRead<T> readCmd) {
Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
- LOG.debug("Tx {} exists {}", getIdentifier(), path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
+ }
- final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
- TransactionContextWrapper contextWrapper = getContextWrapper(path);
+ final SettableFuture<T> proxyFuture = SettableFuture.create();
+ TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
public void invoke(TransactionContext transactionContext) {
- transactionContext.dataExists(path, proxyFuture);
+ transactionContext.executeRead(readCmd, proxyFuture);
}
});
private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> singleShardRead(
final String shardName, final YangInstanceIdentifier path) {
- final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
- TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
- contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(TransactionContext transactionContext) {
- transactionContext.readData(path, proxyFuture);
- }
- });
-
- return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
+ return executeRead(shardName, new ReadData(path));
}
private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readAllData() {
@Override
public void delete(final YangInstanceIdentifier path) {
- checkModificationState();
-
- LOG.debug("Tx {} delete {}", getIdentifier(), path);
-
- TransactionContextWrapper contextWrapper = getContextWrapper(path);
- contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(TransactionContext transactionContext) {
- transactionContext.deleteData(path);
- }
- });
+ executeModification(new DeleteModification(path));
}
@Override
public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
- checkModificationState();
-
- LOG.debug("Tx {} merge {}", getIdentifier(), path);
-
- TransactionContextWrapper contextWrapper = getContextWrapper(path);
- contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
- @Override
- public void invoke(TransactionContext transactionContext) {
- transactionContext.mergeData(path, data);
- }
- });
+ executeModification(new MergeModification(path, data));
}
@Override
public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ executeModification(new WriteModification(path, data));
+ }
+
+ private void executeModification(final AbstractModification modification) {
checkModificationState();
- LOG.debug("Tx {} write {}", getIdentifier(), path);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Tx {} executeModification {} {}", getIdentifier(), modification.getClass().getSimpleName(),
+ modification.getPath());
+ }
- TransactionContextWrapper contextWrapper = getContextWrapper(path);
+ TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath());
contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
@Override
- public void invoke(TransactionContext transactionContext) {
- transactionContext.writeData(path, data);
+ protected void invoke(TransactionContext transactionContext) {
+ transactionContext.executeModification(modification);
}
});
}
import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
import org.opendaylight.controller.cluster.datastore.messages.MergeData;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
}
@Override
- public void deleteData(YangInstanceIdentifier path) {
- executeOperationAsync(new DeleteData(path, getRemoteTransactionVersion()));
- }
+ public void executeModification(AbstractModification modification) {
+ final short remoteTransactionVersion = getRemoteTransactionVersion();
+ final YangInstanceIdentifier path = modification.getPath();
+ VersionedExternalizableMessage msg = null;
- @Override
- public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- executeOperationAsync(new MergeData(path, data, getRemoteTransactionVersion()));
- }
+ if(modification instanceof DeleteModification) {
+ msg = new DeleteData(path, remoteTransactionVersion);
+ } else if(modification instanceof WriteModification) {
+ final NormalizedNode<?, ?> data = ((WriteModification) modification).getData();
- @Override
- public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
- executeOperationAsync(new WriteData(path, data, getRemoteTransactionVersion()));
+ // be sure to check for Merge before Write, since Merge is a subclass of Write
+ if(modification instanceof MergeModification) {
+ msg = new MergeData(path, data, remoteTransactionVersion);
+ } else {
+ msg = new WriteData(path, data, remoteTransactionVersion);
+ }
+ } else {
+ LOG.error("Invalid modification type " + modification.getClass().getName());
+ }
+
+ if(msg != null) {
+ executeOperationAsync(msg);
+ }
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2015 Huawei, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Abstract base class for ReadData and DataExists messages.
+ *
+ * @author gwu
+ *
+ */
+public abstract class AbstractRead<T> implements SerializableMessage {
+ private final YangInstanceIdentifier path;
+
+ public AbstractRead(final YangInstanceIdentifier path) {
+ this.path = path;
+ }
+
+ public YangInstanceIdentifier getPath() {
+ return path;
+ }
+
+ public abstract CheckedFuture<T, ReadFailedException> apply(DOMStoreReadTransaction readDelegate);
+
+ public abstract void processResponse(Object reponse, SettableFuture<T> promise);
+
+}
package org.opendaylight.controller.cluster.datastore.messages;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-public class DataExists implements SerializableMessage{
+public class DataExists extends AbstractRead<Boolean> {
public static final Class<ShardTransactionMessages.DataExists> SERIALIZABLE_CLASS =
ShardTransactionMessages.DataExists.class;
- private final YangInstanceIdentifier path;
-
public DataExists(final YangInstanceIdentifier path) {
- this.path = path;
- }
-
- public YangInstanceIdentifier getPath() {
- return path;
+ super(path);
}
@Override public Object toSerializable() {
return ShardTransactionMessages.DataExists.newBuilder()
.setInstanceIdentifierPathArguments(
- InstanceIdentifierUtils.toSerializable(path)).build();
+ InstanceIdentifierUtils.toSerializable(getPath())).build();
}
public static DataExists fromSerializable(final Object serializable){
return new DataExists(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
}
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> apply(DOMStoreReadTransaction readDelegate) {
+ return readDelegate.exists(getPath());
+ }
+
+ @Override
+ public void processResponse(Object response, SettableFuture<Boolean> returnFuture) {
+ if(response instanceof DataExistsReply) {
+ returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
+
+ } else if(response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+ returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
+
+ } else {
+ returnFuture.setException(new ReadFailedException("Invalid response checking exists for path " + getPath()));
+ }
+ }
+
}
package org.opendaylight.controller.cluster.datastore.messages;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-public class ReadData implements SerializableMessage {
+public class ReadData extends AbstractRead<Optional<NormalizedNode<?, ?>>> {
public static final Class<ShardTransactionMessages.ReadData> SERIALIZABLE_CLASS =
ShardTransactionMessages.ReadData.class;
- private final YangInstanceIdentifier path;
public ReadData(final YangInstanceIdentifier path) {
- this.path = path;
- }
-
- public YangInstanceIdentifier getPath() {
- return path;
+ super(path);
}
public Object toSerializable(){
return ShardTransactionMessages.ReadData.newBuilder()
- .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path))
+ .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(getPath()))
.build();
}
ShardTransactionMessages.ReadData o = (ShardTransactionMessages.ReadData) serializable;
return new ReadData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
}
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> apply(DOMStoreReadTransaction readDelegate) {
+ return readDelegate.read(getPath());
+ }
+
+ @Override
+ public void processResponse(Object readResponse, SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
+ if(readResponse instanceof ReadDataReply) {
+ ReadDataReply reply = (ReadDataReply) readResponse;
+ returnFuture.set(Optional.<NormalizedNode<?, ?>> fromNullable(reply.getNormalizedNode()));
+
+ } else if(ReadDataReply.isSerializedType(readResponse)) {
+ ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
+ returnFuture.set(Optional.<NormalizedNode<?, ?>> fromNullable(reply.getNormalizedNode()));
+
+ } else {
+ returnFuture.setException(new ReadFailedException("Invalid response reading data for path " + getPath()));
+ }
+ }
}
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
public void testWrite() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
- localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
+ localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
}
public void testMerge() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
- localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
+ localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
}
@Test
public void testDelete() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
- localTransactionContext.deleteData(yangInstanceIdentifier);
+ localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
verify(readWriteTransaction).delete(yangInstanceIdentifier);
}
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
- localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.<Optional<NormalizedNode<?,?>>>create());
+ localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier), SettableFuture.<Optional<NormalizedNode<?,?>>>create());
verify(readWriteTransaction).read(yangInstanceIdentifier);
}
public void testExists() {
YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
- localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
+ localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier), SettableFuture.<Boolean>create());
verify(readWriteTransaction).exists(yangInstanceIdentifier);
}
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
- localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
- localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
+ localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
+ localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
- localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
- localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
+ localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
+ localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
RuntimeException error = new RuntimeException("mock");
doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
- localTransactionContext.deleteData(yangInstanceIdentifier);
- localTransactionContext.deleteData(yangInstanceIdentifier);
+ localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
+ localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
verify(readWriteTransaction).delete(yangInstanceIdentifier);