From 74926cb05f2e5e4937658ca61444f7d7c846eb00 Mon Sep 17 00:00:00 2001 From: Gary Wu Date: Wed, 16 Dec 2015 14:29:02 -0800 Subject: [PATCH] Refactor TransactonContext Rafactor TransactionContext to: Consolidate write(), merge(), and delete() into a single executeModification() method. Consolidate read() and dataExists() into a single executeRead() method. Change-Id: I559c974295e097ab53f08037329aa3252647331c Signed-off-by: Gary Wu --- .../datastore/LocalTransactionContext.java | 55 ++------- .../datastore/NoOpTransactionContext.java | 34 ++---- .../datastore/RemoteTransactionContext.java | 114 ++++-------------- .../cluster/datastore/TransactionContext.java | 15 +-- .../cluster/datastore/TransactionProxy.java | 71 +++++------ .../PreLithiumTransactionContextImpl.java | 36 ++++-- .../datastore/messages/AbstractRead.java | 38 ++++++ .../datastore/messages/DataExists.java | 34 ++++-- .../cluster/datastore/messages/ReadData.java | 37 ++++-- .../LocalTransactionContextTest.java | 28 +++-- 10 files changed, 210 insertions(+), 252 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractRead.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java index 75cdd1b597..a6e42c4c70 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java @@ -8,17 +8,16 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; +import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import scala.concurrent.Future; /** @@ -44,62 +43,22 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { protected abstract DOMStoreReadTransaction getReadDelegate(); @Override - public void writeData(YangInstanceIdentifier path, NormalizedNode data) { + public void executeModification(AbstractModification modification) { incrementModificationCount(); if(operationError == null) { try { - getWriteDelegate().write(path, data); + modification.apply(getWriteDelegate()); } catch (Exception e) { operationError = e; } } - - } - - @Override - public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - incrementModificationCount(); - if(operationError == null) { - try { - getWriteDelegate().merge(path, data); - } catch (Exception e) { - operationError = e; - } - } - } - - @Override - public void deleteData(YangInstanceIdentifier path) { - incrementModificationCount(); - if(operationError == null) { - try { - getWriteDelegate().delete(path); - } catch (Exception e) { - operationError = e; - } - } - } - - @Override - public void readData(YangInstanceIdentifier path, final SettableFuture>> proxyFuture) { - Futures.addCallback(getReadDelegate().read(path), new FutureCallback>>() { - @Override - public void onSuccess(final Optional> result) { - proxyFuture.set(result); - } - - @Override - public void onFailure(final Throwable t) { - proxyFuture.setException(t); - } - }); } @Override - public void dataExists(YangInstanceIdentifier path, final SettableFuture proxyFuture) { - Futures.addCallback(getReadDelegate().exists(path), new FutureCallback() { + public void executeRead(AbstractRead readCmd, final SettableFuture proxyFuture) { + Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback() { @Override - public void onSuccess(final Boolean result) { + public void onSuccess(final T result) { proxyFuture.set(result); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java index 2094cd2f77..fa9d97a1fd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java @@ -8,14 +8,13 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; +import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -53,23 +52,15 @@ final class NoOpTransactionContext extends AbstractTransactionContext { } @Override - public void deleteData(YangInstanceIdentifier path) { - LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path); + public void executeModification(AbstractModification modification) { + LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass().getSimpleName(), + modification.getPath()); } @Override - public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path); - } - - @Override - public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path); - } - - @Override - public void readData(final YangInstanceIdentifier path, SettableFuture>> proxyFuture) { - LOG.debug("Tx {} readData called path = {}", getIdentifier(), path); + public void executeRead(AbstractRead readCmd, SettableFuture proxyFuture) { + LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(), + readCmd.getPath()); final Throwable t; if (failure instanceof NoShardLeaderException) { @@ -77,12 +68,7 @@ final class NoOpTransactionContext extends AbstractTransactionContext { } else { t = failure; } - proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, t)); - } - - @Override - public void dataExists(YangInstanceIdentifier path, SettableFuture proxyFuture) { - LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path); - proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure)); + proxyFuture.setException(new ReadFailedException("Error executeRead " + readCmd.getClass().getSimpleName() + + " for path " + readCmd.getPath(), t)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index 20074c1028..af0c871409 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -10,25 +10,17 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; -import org.opendaylight.controller.cluster.datastore.messages.DataExists; -import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply; -import org.opendaylight.controller.cluster.datastore.messages.ReadData; -import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply; import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage; -import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; -import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; @@ -175,77 +167,22 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public void deleteData(YangInstanceIdentifier path) { - LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path); - - acquireOperation(); - batchModification(new DeleteModification(path)); - } - - @Override - public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path); - - acquireOperation(); - batchModification(new MergeModification(path, data)); - } - - @Override - public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path); - - acquireOperation(); - batchModification(new WriteModification(path, data)); - } - - @Override - public void readData(final YangInstanceIdentifier path, - final SettableFuture>> returnFuture ) { - - LOG.debug("Tx {} readData called path = {}", getIdentifier(), path); - - // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the - // public API contract. + public void executeModification(AbstractModification modification) { + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass() + .getSimpleName(), modification.getPath()); + } acquireOperation(); - sendBatchedModifications(); - - OnComplete onComplete = new OnComplete() { - @Override - public void onComplete(Throwable failure, Object readResponse) throws Throwable { - if(failure != null) { - LOG.debug("Tx {} read operation failed: {}", getIdentifier(), failure); - returnFuture.setException(new ReadFailedException( - "Error reading data for path " + path, failure)); - - } else { - LOG.debug("Tx {} read operation succeeded", getIdentifier(), failure); - - if (readResponse instanceof ReadDataReply) { - ReadDataReply reply = (ReadDataReply) readResponse; - returnFuture.set(Optional.>fromNullable(reply.getNormalizedNode())); - - } else if (ReadDataReply.isSerializedType(readResponse)) { - ReadDataReply reply = ReadDataReply.fromSerializable(readResponse); - returnFuture.set(Optional.>fromNullable(reply.getNormalizedNode())); - - } else { - returnFuture.setException(new ReadFailedException( - "Invalid response reading data for path " + path)); - } - } - } - }; - - Future readFuture = executeOperationAsync(new ReadData(path)); - - readFuture.onComplete(onComplete, actorContext.getClientDispatcher()); + batchModification(modification); } @Override - public void dataExists(final YangInstanceIdentifier path, final SettableFuture returnFuture) { - - LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path); + public void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture) { + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(), + readCmd.getPath()); + } // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the // public API contract. @@ -257,27 +194,22 @@ public class RemoteTransactionContext extends AbstractTransactionContext { @Override public void onComplete(Throwable failure, Object response) throws Throwable { if(failure != null) { - LOG.debug("Tx {} dataExists operation failed: {}", getIdentifier(), failure); - returnFuture.setException(new ReadFailedException( - "Error checking data exists for path " + path, failure)); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(), + failure); + } + returnFuture.setException(new ReadFailedException("Error checking " + readCmd.getClass().getSimpleName() + + " for path " + readCmd.getPath(), failure)); } else { - LOG.debug("Tx {} dataExists operation succeeded", getIdentifier(), failure); - - if (response instanceof DataExistsReply) { - returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists())); - - } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { - returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists())); - - } else { - returnFuture.setException(new ReadFailedException( - "Invalid response checking exists for path " + path)); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName()); } + readCmd.processResponse(response, returnFuture); } } }; - Future future = executeOperationAsync(new DataExists(path)); + Future future = executeOperationAsync(readCmd); future.onComplete(onComplete, actorContext.getClientDispatcher()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java index 6a542002d0..ab636ff493 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java @@ -8,10 +8,9 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSelection; -import com.google.common.base.Optional; import com.google.common.util.concurrent.SettableFuture; -import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; +import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; import scala.concurrent.Future; /* @@ -23,15 +22,9 @@ interface TransactionContext { Future readyTransaction(); - void writeData(YangInstanceIdentifier path, NormalizedNode data); + void executeModification(AbstractModification modification); - void deleteData(YangInstanceIdentifier path); - - void mergeData(YangInstanceIdentifier path, NormalizedNode data); - - void readData(final YangInstanceIdentifier path, SettableFuture>> proxyFuture); - - void dataExists(YangInstanceIdentifier path, SettableFuture proxyFuture); + void executeRead(AbstractRead readCmd, SettableFuture promise); boolean supportsDirectCommit(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index cdc2ec2a0a..d97c858672 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -25,6 +25,13 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; +import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; @@ -67,16 +74,22 @@ public class TransactionProxy extends AbstractDOMStoreTransaction exists(final YangInstanceIdentifier path) { + return executeRead(shardNameFromIdentifier(path), new DataExists(path)); + } + + private CheckedFuture executeRead(String shardName, final AbstractRead readCmd) { Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed"); - LOG.debug("Tx {} exists {}", getIdentifier(), path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath()); + } - final SettableFuture proxyFuture = SettableFuture.create(); - TransactionContextWrapper contextWrapper = getContextWrapper(path); + final SettableFuture proxyFuture = SettableFuture.create(); + TransactionContextWrapper contextWrapper = getContextWrapper(shardName); contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { @Override public void invoke(TransactionContext transactionContext) { - transactionContext.dataExists(path, proxyFuture); + transactionContext.executeRead(readCmd, proxyFuture); } }); @@ -98,16 +111,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction>, ReadFailedException> singleShardRead( final String shardName, final YangInstanceIdentifier path) { - final SettableFuture>> proxyFuture = SettableFuture.create(); - TransactionContextWrapper contextWrapper = getContextWrapper(shardName); - contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.readData(path, proxyFuture); - } - }); - - return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER); + return executeRead(shardName, new ReadData(path)); } private CheckedFuture>, ReadFailedException> readAllData() { @@ -137,45 +141,32 @@ public class TransactionProxy extends AbstractDOMStoreTransaction data) { - checkModificationState(); - - LOG.debug("Tx {} merge {}", getIdentifier(), path); - - TransactionContextWrapper contextWrapper = getContextWrapper(path); - contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { - @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.mergeData(path, data); - } - }); + executeModification(new MergeModification(path, data)); } @Override public void write(final YangInstanceIdentifier path, final NormalizedNode data) { + executeModification(new WriteModification(path, data)); + } + + private void executeModification(final AbstractModification modification) { checkModificationState(); - LOG.debug("Tx {} write {}", getIdentifier(), path); + if(LOG.isDebugEnabled()) { + LOG.debug("Tx {} executeModification {} {}", getIdentifier(), modification.getClass().getSimpleName(), + modification.getPath()); + } - TransactionContextWrapper contextWrapper = getContextWrapper(path); + TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath()); contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() { @Override - public void invoke(TransactionContext transactionContext) { - transactionContext.writeData(path, data); + protected void invoke(TransactionContext transactionContext) { + transactionContext.executeModification(modification); } }); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java index c44166396f..2634adaf4c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java @@ -15,7 +15,12 @@ import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIden import org.opendaylight.controller.cluster.datastore.messages.DeleteData; import org.opendaylight.controller.cluster.datastore.messages.MergeData; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction; +import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage; import org.opendaylight.controller.cluster.datastore.messages.WriteData; +import org.opendaylight.controller.cluster.datastore.modification.AbstractModification; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -43,18 +48,29 @@ public class PreLithiumTransactionContextImpl extends RemoteTransactionContext { } @Override - public void deleteData(YangInstanceIdentifier path) { - executeOperationAsync(new DeleteData(path, getRemoteTransactionVersion())); - } + public void executeModification(AbstractModification modification) { + final short remoteTransactionVersion = getRemoteTransactionVersion(); + final YangInstanceIdentifier path = modification.getPath(); + VersionedExternalizableMessage msg = null; - @Override - public void mergeData(YangInstanceIdentifier path, NormalizedNode data) { - executeOperationAsync(new MergeData(path, data, getRemoteTransactionVersion())); - } + if(modification instanceof DeleteModification) { + msg = new DeleteData(path, remoteTransactionVersion); + } else if(modification instanceof WriteModification) { + final NormalizedNode data = ((WriteModification) modification).getData(); - @Override - public void writeData(YangInstanceIdentifier path, NormalizedNode data) { - executeOperationAsync(new WriteData(path, data, getRemoteTransactionVersion())); + // be sure to check for Merge before Write, since Merge is a subclass of Write + if(modification instanceof MergeModification) { + msg = new MergeData(path, data, remoteTransactionVersion); + } else { + msg = new WriteData(path, data, remoteTransactionVersion); + } + } else { + LOG.error("Invalid modification type " + modification.getClass().getName()); + } + + if(msg != null) { + executeOperationAsync(msg); + } } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractRead.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractRead.java new file mode 100644 index 0000000000..c1d83e9891 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractRead.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2015 Huawei, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.messages; + +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * Abstract base class for ReadData and DataExists messages. + * + * @author gwu + * + */ +public abstract class AbstractRead implements SerializableMessage { + private final YangInstanceIdentifier path; + + public AbstractRead(final YangInstanceIdentifier path) { + this.path = path; + } + + public YangInstanceIdentifier getPath() { + return path; + } + + public abstract CheckedFuture apply(DOMStoreReadTransaction readDelegate); + + public abstract void processResponse(Object reponse, SettableFuture promise); + +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataExists.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataExists.java index 84b8df1676..2541a04d5f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataExists.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataExists.java @@ -8,29 +8,27 @@ package org.opendaylight.controller.cluster.datastore.messages; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -public class DataExists implements SerializableMessage{ +public class DataExists extends AbstractRead { public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.DataExists.class; - private final YangInstanceIdentifier path; - public DataExists(final YangInstanceIdentifier path) { - this.path = path; - } - - public YangInstanceIdentifier getPath() { - return path; + super(path); } @Override public Object toSerializable() { return ShardTransactionMessages.DataExists.newBuilder() .setInstanceIdentifierPathArguments( - InstanceIdentifierUtils.toSerializable(path)).build(); + InstanceIdentifierUtils.toSerializable(getPath())).build(); } public static DataExists fromSerializable(final Object serializable){ @@ -38,4 +36,22 @@ public class DataExists implements SerializableMessage{ return new DataExists(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments())); } + @Override + public CheckedFuture apply(DOMStoreReadTransaction readDelegate) { + return readDelegate.exists(getPath()); + } + + @Override + public void processResponse(Object response, SettableFuture returnFuture) { + if(response instanceof DataExistsReply) { + returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists())); + + } else if(response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) { + returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists())); + + } else { + returnFuture.setException(new ReadFailedException("Invalid response checking exists for path " + getPath())); + } + } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadData.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadData.java index bbbdbdf8fe..33f2f0001c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadData.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadData.java @@ -8,26 +8,27 @@ package org.opendaylight.controller.cluster.datastore.messages; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.SettableFuture; import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -public class ReadData implements SerializableMessage { +public class ReadData extends AbstractRead>> { public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadData.class; - private final YangInstanceIdentifier path; public ReadData(final YangInstanceIdentifier path) { - this.path = path; - } - - public YangInstanceIdentifier getPath() { - return path; + super(path); } public Object toSerializable(){ return ShardTransactionMessages.ReadData.newBuilder() - .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path)) + .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(getPath())) .build(); } @@ -35,4 +36,24 @@ public class ReadData implements SerializableMessage { ShardTransactionMessages.ReadData o = (ShardTransactionMessages.ReadData) serializable; return new ReadData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments())); } + + @Override + public CheckedFuture>, ReadFailedException> apply(DOMStoreReadTransaction readDelegate) { + return readDelegate.read(getPath()); + } + + @Override + public void processResponse(Object readResponse, SettableFuture>> returnFuture) { + if(readResponse instanceof ReadDataReply) { + ReadDataReply reply = (ReadDataReply) readResponse; + returnFuture.set(Optional.> fromNullable(reply.getNormalizedNode())); + + } else if(ReadDataReply.isSerializedType(readResponse)) { + ReadDataReply reply = ReadDataReply.fromSerializable(readResponse); + returnFuture.set(Optional.> fromNullable(reply.getNormalizedNode())); + + } else { + returnFuture.setException(new ReadFailedException("Invalid response reading data for path " + getPath())); + } + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java index 838a169dbe..6a8ab620c5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java @@ -13,6 +13,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; + import akka.actor.ActorSelection; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; @@ -22,6 +23,11 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.DataExists; +import org.opendaylight.controller.cluster.datastore.messages.ReadData; +import org.opendaylight.controller.cluster.datastore.modification.DeleteModification; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; @@ -65,7 +71,7 @@ public class LocalTransactionContextTest { public void testWrite() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); NormalizedNode normalizedNode = mock(NormalizedNode.class); - localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode); + localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode)); verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode); } @@ -73,14 +79,14 @@ public class LocalTransactionContextTest { public void testMerge() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); NormalizedNode normalizedNode = mock(NormalizedNode.class); - localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode); + localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode)); verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode); } @Test public void testDelete() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); - localTransactionContext.deleteData(yangInstanceIdentifier); + localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier)); verify(readWriteTransaction).delete(yangInstanceIdentifier); } @@ -90,7 +96,7 @@ public class LocalTransactionContextTest { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); NormalizedNode normalizedNode = mock(NormalizedNode.class); doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier); - localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.>>create()); + localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier), SettableFuture.>>create()); verify(readWriteTransaction).read(yangInstanceIdentifier); } @@ -98,7 +104,7 @@ public class LocalTransactionContextTest { public void testExists() { YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build(); doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier); - localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture. create()); + localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier), SettableFuture.create()); verify(readWriteTransaction).exists(yangInstanceIdentifier); } @@ -121,8 +127,8 @@ public class LocalTransactionContextTest { RuntimeException error = new RuntimeException("mock"); doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode); - localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode); - localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode); + localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode)); + localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode)); verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode); @@ -136,8 +142,8 @@ public class LocalTransactionContextTest { RuntimeException error = new RuntimeException("mock"); doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode); - localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode); - localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode); + localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode)); + localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode)); verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode); @@ -150,8 +156,8 @@ public class LocalTransactionContextTest { RuntimeException error = new RuntimeException("mock"); doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier); - localTransactionContext.deleteData(yangInstanceIdentifier); - localTransactionContext.deleteData(yangInstanceIdentifier); + localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier)); + localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier)); verify(readWriteTransaction).delete(yangInstanceIdentifier); -- 2.36.6