Refactor TransactonContext 85/31485/6
authorGary Wu <gary.wu1@huawei.com>
Wed, 16 Dec 2015 22:29:02 +0000 (14:29 -0800)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 6 Jan 2016 20:28:25 +0000 (20:28 +0000)
Rafactor TransactionContext to:

Consolidate write(), merge(), and delete() into a
single executeModification() method.

Consolidate read() and dataExists() into a
single executeRead() method.

Change-Id: I559c974295e097ab53f08037329aa3252647331c
Signed-off-by: Gary Wu <gary.wu1@huawei.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumTransactionContextImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractRead.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/DataExists.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadData.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java

index 75cdd1b..a6e42c4 100644 (file)
@@ -8,17 +8,16 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import scala.concurrent.Future;
 
 /**
@@ -44,62 +43,22 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     protected abstract DOMStoreReadTransaction getReadDelegate();
 
     @Override
-    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
+    public void executeModification(AbstractModification modification) {
         incrementModificationCount();
         if(operationError == null) {
             try {
-                getWriteDelegate().write(path, data);
+                modification.apply(getWriteDelegate());
             } catch (Exception e) {
                 operationError = e;
             }
         }
-
-    }
-
-    @Override
-    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        incrementModificationCount();
-        if(operationError == null) {
-            try {
-                getWriteDelegate().merge(path, data);
-            } catch (Exception e) {
-                operationError = e;
-            }
-        }
-    }
-
-    @Override
-    public void deleteData(YangInstanceIdentifier path) {
-        incrementModificationCount();
-        if(operationError == null) {
-            try {
-                getWriteDelegate().delete(path);
-            } catch (Exception e) {
-                operationError = e;
-            }
-        }
-    }
-
-    @Override
-    public void readData(YangInstanceIdentifier path, final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
-        Futures.addCallback(getReadDelegate().read(path), new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
-            @Override
-            public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
-                proxyFuture.set(result);
-            }
-
-            @Override
-            public void onFailure(final Throwable t) {
-                proxyFuture.setException(t);
-            }
-        });
     }
 
     @Override
-    public void dataExists(YangInstanceIdentifier path, final SettableFuture<Boolean> proxyFuture) {
-        Futures.addCallback(getReadDelegate().exists(path), new FutureCallback<Boolean>() {
+    public <T> void executeRead(AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture) {
+        Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
             @Override
-            public void onSuccess(final Boolean result) {
+            public void onSuccess(final T result) {
                 proxyFuture.set(result);
             }
 
index 2094cd2..fa9d97a 100644 (file)
@@ -8,14 +8,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import org.opendaylight.controller.md.sal.common.api.data.DataStoreUnavailableException;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -53,23 +52,15 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public void deleteData(YangInstanceIdentifier path) {
-        LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
+    public void executeModification(AbstractModification modification) {
+        LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass().getSimpleName(),
+                modification.getPath());
     }
 
     @Override
-    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
-    }
-
-    @Override
-    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
-    }
-
-    @Override
-    public void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture) {
-        LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
+    public <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> proxyFuture) {
+        LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+                readCmd.getPath());
 
         final Throwable t;
         if (failure instanceof NoShardLeaderException) {
@@ -77,12 +68,7 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
         } else {
             t = failure;
         }
-        proxyFuture.setException(new ReadFailedException("Error reading data for path " + path, t));
-    }
-
-    @Override
-    public void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture) {
-        LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
-        proxyFuture.setException(new ReadFailedException("Error checking exists for path " + path, failure));
+        proxyFuture.setException(new ReadFailedException("Error executeRead " + readCmd.getClass().getSimpleName()
+                + " for path " + readCmd.getPath(), t));
     }
 }
index 20074c1..af0c871 100644 (file)
@@ -10,25 +10,17 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
@@ -175,77 +167,22 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
-    public void deleteData(YangInstanceIdentifier path) {
-        LOG.debug("Tx {} deleteData called path = {}", getIdentifier(), path);
-
-        acquireOperation();
-        batchModification(new DeleteModification(path));
-    }
-
-    @Override
-    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} mergeData called path = {}", getIdentifier(), path);
-
-        acquireOperation();
-        batchModification(new MergeModification(path, data));
-    }
-
-    @Override
-    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        LOG.debug("Tx {} writeData called path = {}", getIdentifier(), path);
-
-        acquireOperation();
-        batchModification(new WriteModification(path, data));
-    }
-
-    @Override
-    public void readData(final YangInstanceIdentifier path,
-            final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture ) {
-
-        LOG.debug("Tx {} readData called path = {}", getIdentifier(), path);
-
-        // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
-        // public API contract.
+    public void executeModification(AbstractModification modification) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} executeModification {} called path = {}", getIdentifier(), modification.getClass()
+                    .getSimpleName(), modification.getPath());
+        }
 
         acquireOperation();
-        sendBatchedModifications();
-
-        OnComplete<Object> onComplete = new OnComplete<Object>() {
-            @Override
-            public void onComplete(Throwable failure, Object readResponse) throws Throwable {
-                if(failure != null) {
-                    LOG.debug("Tx {} read operation failed: {}", getIdentifier(), failure);
-                    returnFuture.setException(new ReadFailedException(
-                            "Error reading data for path " + path, failure));
-
-                } else {
-                    LOG.debug("Tx {} read operation succeeded", getIdentifier(), failure);
-
-                    if (readResponse instanceof ReadDataReply) {
-                        ReadDataReply reply = (ReadDataReply) readResponse;
-                        returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
-
-                    } else if (ReadDataReply.isSerializedType(readResponse)) {
-                        ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
-                        returnFuture.set(Optional.<NormalizedNode<?, ?>>fromNullable(reply.getNormalizedNode()));
-
-                    } else {
-                        returnFuture.setException(new ReadFailedException(
-                            "Invalid response reading data for path " + path));
-                    }
-                }
-            }
-        };
-
-        Future<Object> readFuture = executeOperationAsync(new ReadData(path));
-
-        readFuture.onComplete(onComplete, actorContext.getClientDispatcher());
+        batchModification(modification);
     }
 
     @Override
-    public void dataExists(final YangInstanceIdentifier path, final SettableFuture<Boolean> returnFuture) {
-
-        LOG.debug("Tx {} dataExists called path = {}", getIdentifier(), path);
+    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture) {
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+                    readCmd.getPath());
+        }
 
         // Send any batched modifications. This is necessary to honor the read uncommitted semantics of the
         // public API contract.
@@ -257,27 +194,22 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
             @Override
             public void onComplete(Throwable failure, Object response) throws Throwable {
                 if(failure != null) {
-                    LOG.debug("Tx {} dataExists operation failed: {}", getIdentifier(), failure);
-                    returnFuture.setException(new ReadFailedException(
-                            "Error checking data exists for path " + path, failure));
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {} {} operation failed: {}", getIdentifier(), readCmd.getClass().getSimpleName(),
+                                failure);
+                    }
+                    returnFuture.setException(new ReadFailedException("Error checking " + readCmd.getClass().getSimpleName()
+                            + " for path " + readCmd.getPath(), failure));
                 } else {
-                    LOG.debug("Tx {} dataExists operation succeeded", getIdentifier(), failure);
-
-                    if (response instanceof DataExistsReply) {
-                        returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
-
-                    } else if (response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
-                        returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
-
-                    } else {
-                        returnFuture.setException(new ReadFailedException(
-                                "Invalid response checking exists for path " + path));
+                    if(LOG.isDebugEnabled()) {
+                        LOG.debug("Tx {} {} operation succeeded", getIdentifier(), readCmd.getClass().getSimpleName());
                     }
+                    readCmd.processResponse(response, returnFuture);
                 }
             }
         };
 
-        Future<Object> future = executeOperationAsync(new DataExists(path));
+        Future<Object> future = executeOperationAsync(readCmd);
 
         future.onComplete(onComplete, actorContext.getClientDispatcher());
     }
index 6a54200..ab636ff 100644 (file)
@@ -8,10 +8,9 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
-import com.google.common.base.Optional;
 import com.google.common.util.concurrent.SettableFuture;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
 import scala.concurrent.Future;
 
 /*
@@ -23,15 +22,9 @@ interface TransactionContext {
 
     Future<ActorSelection> readyTransaction();
 
-    void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
+    void executeModification(AbstractModification modification);
 
-    void deleteData(YangInstanceIdentifier path);
-
-    void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
-
-    void readData(final YangInstanceIdentifier path, SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture);
-
-    void dataExists(YangInstanceIdentifier path, SettableFuture<Boolean> proxyFuture);
+    <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise);
 
     boolean supportsDirectCommit();
 
index cdc2ec2..d97c858 100644 (file)
@@ -25,6 +25,13 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -67,16 +74,22 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     @Override
     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+        return executeRead(shardNameFromIdentifier(path), new DataExists(path));
+    }
+
+    private <T> CheckedFuture<T, ReadFailedException> executeRead(String shardName, final AbstractRead<T> readCmd) {
         Preconditions.checkState(type != TransactionType.WRITE_ONLY, "Reads from write-only transactions are not allowed");
 
-        LOG.debug("Tx {} exists {}", getIdentifier(), path);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} {} {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath());
+        }
 
-        final SettableFuture<Boolean> proxyFuture = SettableFuture.create();
-        TransactionContextWrapper contextWrapper = getContextWrapper(path);
+        final SettableFuture<T> proxyFuture = SettableFuture.create();
+        TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
         contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
             public void invoke(TransactionContext transactionContext) {
-                transactionContext.dataExists(path, proxyFuture);
+                transactionContext.executeRead(readCmd, proxyFuture);
             }
         });
 
@@ -98,16 +111,7 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> singleShardRead(
             final String shardName, final YangInstanceIdentifier path) {
-        final SettableFuture<Optional<NormalizedNode<?, ?>>> proxyFuture = SettableFuture.create();
-        TransactionContextWrapper contextWrapper = getContextWrapper(shardName);
-        contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(TransactionContext transactionContext) {
-                transactionContext.readData(path, proxyFuture);
-            }
-        });
-
-        return MappingCheckedFuture.create(proxyFuture, ReadFailedException.MAPPER);
+        return executeRead(shardName, new ReadData(path));
     }
 
     private CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readAllData() {
@@ -137,45 +141,32 @@ public class TransactionProxy extends AbstractDOMStoreTransaction<TransactionIde
 
     @Override
     public void delete(final YangInstanceIdentifier path) {
-        checkModificationState();
-
-        LOG.debug("Tx {} delete {}", getIdentifier(), path);
-
-        TransactionContextWrapper contextWrapper = getContextWrapper(path);
-        contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(TransactionContext transactionContext) {
-                transactionContext.deleteData(path);
-            }
-        });
+        executeModification(new DeleteModification(path));
     }
 
     @Override
     public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        checkModificationState();
-
-        LOG.debug("Tx {} merge {}", getIdentifier(), path);
-
-        TransactionContextWrapper contextWrapper = getContextWrapper(path);
-        contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
-            @Override
-            public void invoke(TransactionContext transactionContext) {
-                transactionContext.mergeData(path, data);
-            }
-        });
+        executeModification(new MergeModification(path, data));
     }
 
     @Override
     public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        executeModification(new WriteModification(path, data));
+    }
+
+    private void executeModification(final AbstractModification modification) {
         checkModificationState();
 
-        LOG.debug("Tx {} write {}", getIdentifier(), path);
+        if(LOG.isDebugEnabled()) {
+            LOG.debug("Tx {} executeModification {} {}", getIdentifier(), modification.getClass().getSimpleName(),
+                    modification.getPath());
+        }
 
-        TransactionContextWrapper contextWrapper = getContextWrapper(path);
+        TransactionContextWrapper contextWrapper = getContextWrapper(modification.getPath());
         contextWrapper.maybeExecuteTransactionOperation(new TransactionOperation() {
             @Override
-            public void invoke(TransactionContext transactionContext) {
-                transactionContext.writeData(path, data);
+            protected void invoke(TransactionContext transactionContext) {
+                transactionContext.executeModification(modification);
             }
         });
     }
index c441663..2634ada 100644 (file)
@@ -15,7 +15,12 @@ import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIden
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.modification.AbstractModification;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -43,18 +48,29 @@ public class PreLithiumTransactionContextImpl extends RemoteTransactionContext {
     }
 
     @Override
-    public void deleteData(YangInstanceIdentifier path) {
-        executeOperationAsync(new DeleteData(path, getRemoteTransactionVersion()));
-    }
+    public void executeModification(AbstractModification modification) {
+        final short remoteTransactionVersion = getRemoteTransactionVersion();
+        final YangInstanceIdentifier path = modification.getPath();
+        VersionedExternalizableMessage msg = null;
 
-    @Override
-    public void mergeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        executeOperationAsync(new MergeData(path, data, getRemoteTransactionVersion()));
-    }
+        if(modification instanceof DeleteModification) {
+            msg = new DeleteData(path, remoteTransactionVersion);
+        } else if(modification instanceof WriteModification) {
+            final NormalizedNode<?, ?> data = ((WriteModification) modification).getData();
 
-    @Override
-    public void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
-        executeOperationAsync(new WriteData(path, data, getRemoteTransactionVersion()));
+            // be sure to check for Merge before Write, since Merge is a subclass of Write
+            if(modification instanceof MergeModification) {
+                msg = new MergeData(path, data, remoteTransactionVersion);
+            } else {
+                msg = new WriteData(path, data, remoteTransactionVersion);
+            }
+        } else {
+            LOG.error("Invalid modification type " + modification.getClass().getName());
+        }
+
+        if(msg != null) {
+            executeOperationAsync(msg);
+        }
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractRead.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractRead.java
new file mode 100644 (file)
index 0000000..c1d83e9
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2015 Huawei, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Abstract base class for ReadData and DataExists messages.
+ *
+ * @author gwu
+ *
+ */
+public abstract class AbstractRead<T> implements SerializableMessage {
+    private final YangInstanceIdentifier path;
+
+    public AbstractRead(final YangInstanceIdentifier path) {
+        this.path = path;
+    }
+
+    public YangInstanceIdentifier getPath() {
+        return path;
+    }
+
+    public abstract CheckedFuture<T, ReadFailedException> apply(DOMStoreReadTransaction readDelegate);
+
+    public abstract void processResponse(Object reponse, SettableFuture<T> promise);
+
+}
index 84b8df1..2541a04 100644 (file)
@@ -8,29 +8,27 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
-public class DataExists implements SerializableMessage{
+public class DataExists extends AbstractRead<Boolean> {
 
     public static final Class<ShardTransactionMessages.DataExists> SERIALIZABLE_CLASS =
             ShardTransactionMessages.DataExists.class;
 
-    private final YangInstanceIdentifier path;
-
     public DataExists(final YangInstanceIdentifier path) {
-        this.path = path;
-    }
-
-    public YangInstanceIdentifier getPath() {
-        return path;
+        super(path);
     }
 
     @Override public Object toSerializable() {
         return ShardTransactionMessages.DataExists.newBuilder()
             .setInstanceIdentifierPathArguments(
-                InstanceIdentifierUtils.toSerializable(path)).build();
+                InstanceIdentifierUtils.toSerializable(getPath())).build();
     }
 
     public static DataExists fromSerializable(final Object serializable){
@@ -38,4 +36,22 @@ public class DataExists implements SerializableMessage{
         return new DataExists(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
     }
 
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> apply(DOMStoreReadTransaction readDelegate) {
+        return readDelegate.exists(getPath());
+    }
+
+    @Override
+    public void processResponse(Object response, SettableFuture<Boolean> returnFuture) {
+        if(response instanceof DataExistsReply) {
+            returnFuture.set(Boolean.valueOf(((DataExistsReply) response).exists()));
+
+        } else if(response.getClass().equals(DataExistsReply.SERIALIZABLE_CLASS)) {
+            returnFuture.set(Boolean.valueOf(DataExistsReply.fromSerializable(response).exists()));
+
+        } else {
+            returnFuture.setException(new ReadFailedException("Invalid response checking exists for path " + getPath()));
+        }
+    }
+
 }
index bbbdbdf..33f2f00 100644 (file)
@@ -8,26 +8,27 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import org.opendaylight.controller.cluster.datastore.util.InstanceIdentifierUtils;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class ReadData implements SerializableMessage {
+public class ReadData extends AbstractRead<Optional<NormalizedNode<?, ?>>> {
   public static final Class<ShardTransactionMessages.ReadData> SERIALIZABLE_CLASS =
           ShardTransactionMessages.ReadData.class;
-  private final YangInstanceIdentifier path;
 
   public ReadData(final YangInstanceIdentifier path) {
-    this.path = path;
-  }
-
-  public YangInstanceIdentifier getPath() {
-    return path;
+    super(path);
   }
 
   public Object toSerializable(){
     return ShardTransactionMessages.ReadData.newBuilder()
-        .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(path))
+        .setInstanceIdentifierPathArguments(InstanceIdentifierUtils.toSerializable(getPath()))
         .build();
   }
 
@@ -35,4 +36,24 @@ public class ReadData implements SerializableMessage {
     ShardTransactionMessages.ReadData o = (ShardTransactionMessages.ReadData) serializable;
     return new ReadData(InstanceIdentifierUtils.fromSerializable(o.getInstanceIdentifierPathArguments()));
   }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> apply(DOMStoreReadTransaction readDelegate) {
+        return readDelegate.read(getPath());
+    }
+
+    @Override
+    public void processResponse(Object readResponse, SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture) {
+        if(readResponse instanceof ReadDataReply) {
+            ReadDataReply reply = (ReadDataReply) readResponse;
+            returnFuture.set(Optional.<NormalizedNode<?, ?>> fromNullable(reply.getNormalizedNode()));
+
+        } else if(ReadDataReply.isSerializedType(readResponse)) {
+            ReadDataReply reply = ReadDataReply.fromSerializable(readResponse);
+            returnFuture.set(Optional.<NormalizedNode<?, ?>> fromNullable(reply.getNormalizedNode()));
+
+        } else {
+            returnFuture.setException(new ReadFailedException("Invalid response reading data for path " + getPath()));
+        }
+    }
 }
index 838a169..6a8ab62 100644 (file)
@@ -13,6 +13,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+
 import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
@@ -22,6 +23,11 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.DataExists;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
@@ -65,7 +71,7 @@ public class LocalTransactionContextTest {
     public void testWrite() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
-        localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
+        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
         verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
     }
 
@@ -73,14 +79,14 @@ public class LocalTransactionContextTest {
     public void testMerge() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
-        localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
+        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
         verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
     }
 
     @Test
     public void testDelete() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
-        localTransactionContext.deleteData(yangInstanceIdentifier);
+        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
         verify(readWriteTransaction).delete(yangInstanceIdentifier);
     }
 
@@ -90,7 +96,7 @@ public class LocalTransactionContextTest {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         NormalizedNode<?, ?> normalizedNode = mock(NormalizedNode.class);
         doReturn(Futures.immediateCheckedFuture(Optional.of(normalizedNode))).when(readWriteTransaction).read(yangInstanceIdentifier);
-        localTransactionContext.readData(yangInstanceIdentifier, SettableFuture.<Optional<NormalizedNode<?,?>>>create());
+        localTransactionContext.executeRead(new ReadData(yangInstanceIdentifier), SettableFuture.<Optional<NormalizedNode<?,?>>>create());
         verify(readWriteTransaction).read(yangInstanceIdentifier);
     }
 
@@ -98,7 +104,7 @@ public class LocalTransactionContextTest {
     public void testExists() {
         YangInstanceIdentifier yangInstanceIdentifier = YangInstanceIdentifier.builder().build();
         doReturn(Futures.immediateCheckedFuture(true)).when(readWriteTransaction).exists(yangInstanceIdentifier);
-        localTransactionContext.dataExists(yangInstanceIdentifier, SettableFuture.<Boolean> create());
+        localTransactionContext.executeRead(new DataExists(yangInstanceIdentifier), SettableFuture.<Boolean>create());
         verify(readWriteTransaction).exists(yangInstanceIdentifier);
     }
 
@@ -121,8 +127,8 @@ public class LocalTransactionContextTest {
         RuntimeException error = new RuntimeException("mock");
         doThrow(error).when(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
 
-        localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
-        localTransactionContext.writeData(yangInstanceIdentifier, normalizedNode);
+        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
+        localTransactionContext.executeModification(new WriteModification(yangInstanceIdentifier, normalizedNode));
 
         verify(readWriteTransaction).write(yangInstanceIdentifier, normalizedNode);
 
@@ -136,8 +142,8 @@ public class LocalTransactionContextTest {
         RuntimeException error = new RuntimeException("mock");
         doThrow(error).when(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
 
-        localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
-        localTransactionContext.mergeData(yangInstanceIdentifier, normalizedNode);
+        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
+        localTransactionContext.executeModification(new MergeModification(yangInstanceIdentifier, normalizedNode));
 
         verify(readWriteTransaction).merge(yangInstanceIdentifier, normalizedNode);
 
@@ -150,8 +156,8 @@ public class LocalTransactionContextTest {
         RuntimeException error = new RuntimeException("mock");
         doThrow(error).when(readWriteTransaction).delete(yangInstanceIdentifier);
 
-        localTransactionContext.deleteData(yangInstanceIdentifier);
-        localTransactionContext.deleteData(yangInstanceIdentifier);
+        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
+        localTransactionContext.executeModification(new DeleteModification(yangInstanceIdentifier));
 
         verify(readWriteTransaction).delete(yangInstanceIdentifier);