Merge "Bug 1081 - Deprecate non-asynchronous Data Broker API"
authorDevin Avery <devin.avery@brocade.com>
Tue, 1 Jul 2014 15:21:14 +0000 (15:21 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 1 Jul 2014 15:21:14 +0000 (15:21 +0000)
38 files changed:
opendaylight/config/yang-jmx-generator-plugin/src/main/java/org/opendaylight/controller/config/yangjmxgenerator/plugin/gofactory/AbsModuleGeneratedObjectFactory.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/BindingTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/DataBroker.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/BindingAwareBroker.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/RpcConsumerRegistry.java
opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/sal/binding/api/RpcProviderRegistry.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedDataBroker.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractForwardedTransaction.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractReadWriteTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractWriteTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadTransactionImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadWriteTransactionImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataWriteTransactionImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingToNormalizedNodeCodec.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingTranslatedTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/ForwardedBackwardsCompatibleDataBroker.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/ForwardedBindingDataBroker.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChange.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangeListener.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RouteChangePublisher.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/routing/RoutedRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DoNothingActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java [new file with mode: 0644]

index 6eda364af0d05860196bca23b7433e47f961d1a6..33546b1e2ae6a4d6ede0dfaddfc307495f13a325 100644 (file)
@@ -286,7 +286,7 @@ public class AbsModuleGeneratedObjectFactory {
             format("private final %s oldModule;\n", abstractFQN.getTypeName())+
             format("private final %s oldInstance;\n", AutoCloseable.class.getCanonicalName())+
             format("private %s instance;\n", AutoCloseable.class.getCanonicalName())+
-            format("private final %s dependencyResolver;\n", DependencyResolver.class.getCanonicalName())+
+            format("protected final %s dependencyResolver;\n", DependencyResolver.class.getCanonicalName())+
             format("private final %s identifier;\n", ModuleIdentifier.class.getCanonicalName())+
             "@Override\n"+
             format("public %s getIdentifier() {\n", ModuleIdentifier.class.getCanonicalName())+
diff --git a/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/BindingTransactionChain.java b/opendaylight/md-sal/sal-binding-api/src/main/java/org/opendaylight/controller/md/sal/binding/api/BindingTransactionChain.java
new file mode 100644 (file)
index 0000000..eac65ad
--- /dev/null
@@ -0,0 +1,18 @@
+package org.opendaylight.controller.md.sal.binding.api;
+
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public interface BindingTransactionChain extends TransactionChain<InstanceIdentifier<?>, DataObject> {
+
+    @Override
+    ReadOnlyTransaction newReadOnlyTransaction();
+
+    @Override
+    ReadWriteTransaction newReadWriteTransaction();
+
+    @Override
+    WriteTransaction newWriteOnlyTransaction();
+
+}
index 0b3658a6a6b6e7b112fb89e0d9a0d1cc2ae5aa54..b60d8ff1be71ba7dc0e22a93f240445868241e10 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.md.sal.binding.api;
 
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainFactory;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
@@ -19,7 +20,7 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
  * <p>
  * For more information on usage, please see the documentation in {@link AsyncDataBroker}.
  */
-public interface DataBroker extends AsyncDataBroker<InstanceIdentifier<?>, DataObject, DataChangeListener>, BindingService {
+public interface DataBroker extends AsyncDataBroker<InstanceIdentifier<?>, DataObject, DataChangeListener>, BindingService, TransactionChainFactory<InstanceIdentifier<?>, DataObject> {
     @Override
     ReadOnlyTransaction newReadOnlyTransaction();
 
index 453ff449118ffab0898f429cb02c8ec0ca8ed5ff..a41186dae11eda694687e7698663dffbc3364c26 100644 (file)
@@ -156,11 +156,26 @@ public interface BindingAwareBroker {
         void unregisterFunctionality(ProviderFunctionality functionality);
     }
 
+    /**
+     * Represents an RPC implementation registration. Users should call the
+     * {@link ObjectRegistration#close close} method when the registration is no longer needed.
+     *
+     * @param <T> the implemented RPC service interface
+     */
     public interface RpcRegistration<T extends RpcService> extends ObjectRegistration<T> {
 
+        /**
+         * Returns the implemented RPC service interface.
+         */
         Class<T> getServiceType();
     }
 
+    /**
+     * Represents a routed RPC implementation registration. Users should call the
+     * {@link RoutedRegistration#close close} method when the registration is no longer needed.
+     *
+     * @param <T> the implemented RPC service interface
+     */
     public interface RoutedRpcRegistration<T extends RpcService> extends RpcRegistration<T>,
             RoutedRegistration<Class<? extends BaseIdentity>, InstanceIdentifier<?>, T> {
 
index 7da0a48517fddf715db5f55ced1d431680f0a755..615acd3195c8a99ed5b5f372f53dee46d5faac7b 100644 (file)
@@ -10,16 +10,60 @@ package org.opendaylight.controller.sal.binding.api;
 import org.opendaylight.yangtools.yang.binding.RpcService;
 
 /**
- * Base interface defining contract for retrieving MD-SAL
- * version of RpcServices
+ * Provides access to registered Remote Procedure Call (RPC) service implementations. The RPCs are
+ * defined in YANG models.
+ * <p>
+ * RPC implementations are registered using the {@link RpcProviderRegistry}.
  *
  */
 public interface RpcConsumerRegistry extends BindingAwareService {
     /**
-     * Returns a session specific instance (implementation) of requested
-     * YANG module implementation / service provided by consumer.
+     * Returns an implementation of a requested RPC service.
      *
-     * @return Session specific implementation of service
+     * <p>
+     * The returned instance is not an actual implementation of the RPC service
+     * interface, but a proxy implementation of the interface that forwards to
+     * an actual implementation, if any.
+     * <p>
+     *
+     * The following describes the behavior of the proxy when invoking RPC methods:
+     * <ul>
+     * <li>If an actual implementation is registered with the MD-SAL, all invocations are
+     * forwarded to the registered implementation.</li>
+     * <li>If no actual implementation is registered, all invocations will fail by
+     * throwing {@link IllegalStateException}.</li>
+     * <li>Prior to invoking the actual implementation, the method arguments are are validated.
+     * If any are invalid, an {@link IllegalArgumentException} is thrown.
+     * </ul>
+     *
+     * The returned proxy is automatically updated with the most recent
+     * registered implementation.
+     * <p>
+     * The generated RPC method APIs require implementors to return a {@link java.util.concurrent.Future Future}
+     * instance that wraps the {@link org.opendaylight.yangtools.yang.common.RpcResult RpcResult}. Since
+     * RPC methods may be implemented asynchronously, callers should avoid blocking on the
+     * {@link java.util.concurrent.Future Future} result. Instead, it is recommended to use
+     * {@link com.google.common.util.concurrent.JdkFutureAdapters#listenInPoolThread(java.util.concurrent.Future)}
+     * or {@link com.google.common.util.concurrent.JdkFutureAdapters#listenInPoolThread(java.util.concurrent.Future, java.util.concurrent.Executor)}
+     * to listen for Rpc Result. This will asynchronously listen for future result in executor and
+     * will not block current thread.
+     *
+     * <pre>
+     *   final Future<RpcResult<SomeRpcOutput>> future = someRpcService.someRpc( ... );
+     *   Futures.addCallback(JdkFutureAdapters.listenInThreadPool(future), new FutureCallback<RpcResult<SomeRpcOutput>>() {
+     *
+     *       public void onSuccess(RpcResult<SomeRpcOutput> result) {
+     *          // process result ...
+     *       }
+     *
+     *       public void onFailure(Throwable t) {
+     *          // RPC failed
+     *       }
+     *   );
+     * </pre>
+     * @param serviceInterface the interface of the RPC Service. Typically this is an interface generated
+     *                         from a YANG model.
+     * @return the proxy for the requested RPC service. This method never returns null.
      */
-    <T extends RpcService> T getRpcService(Class<T> module);
+    <T extends RpcService> T getRpcService(Class<T> serviceInterface);
 }
index cdf55844b3d94d680a7a21ad052deb9a7aa73ac4..22db985ba96b82cc4245a42f28ccd3e1f9959774 100644 (file)
@@ -15,39 +15,256 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.RpcService;
 
 /**
- * Interface defining provider's access to the Rpc Registry which could be used
- * to register their implementations of service to the MD-SAL.
+ * Provides a registry for Remote Procedure Call (RPC) service implementations. The RPCs are
+ * defined in YANG models.
+ * <p>
+ * There are 2 types of RPCs:
+ * <ul>
+ * <li>Global</li>
+ * <li>Routed</li>
+ * </ul>
  *
- * @author ttkacik
+ * <h2>Global RPC</h2>
+ * <p>
+ * An RPC is global if there is intended to be only 1 registered implementation. A global RPC is not
+ * explicitly declared as such, essentially any RPC that is not defined to be routed is considered global.
+ * <p>
+ * Global RPCs are registered using the
+ * {@link #addRpcImplementation(Class, RpcService)} method.
  *
+ * <h2>Routed RPC</h2>
+ * <p>
+ * MD-SAL supports routing of RPC between multiple implementations where the appropriate
+ * implementation is selected at run time based on the content of the RPC message as described in
+ * YANG model.
+ * <p>
+ * RPC routing is based on:
+ * <ul>
+ * <li><b>Route identifier</b> -
+ * An {@link org.opendaylight.yangtools.yang.binding.InstanceIdentifier InstanceIdentifier} value
+ * which is part of the RPC input. This value is used to select the correct
+ * implementation at run time.</li>
+ * <li><b>Context Type</b> - A YANG-defined construct which constrains the subset of
+ * valid route identifiers for a particular RPC.</li>
+ * </ul>
+ *
+ * <h3>Context type</h3>
+ * <p>
+ * A context type is modeled in YANG using a combination of a YANG <code>identity</code>
+ * and Opendaylight specific extensions from <code>yang-ext</code> module. These extensions are:
+ * <ul>
+ * <li><b>context-instance</b> - This is used in the data tree part of a YANG model to
+ * define a context type that associates nodes with a specified context <code>identity</code>.
+ * Instance identifiers that reference these nodes are valid route identifiers for RPCs that
+ * reference this context type.</li>
+ * <li><b>context-reference</b> - This is used in RPC input to mark a leaf of type
+ * <code>instance-identifier</code> as a reference to the particular context type defined by the
+ * specified context <code>identity</code>. The value of this
+ * leaf is used by the RPC broker at run time to route the RPC request to the correct implementation.
+ * Note that <code>context-reference</code> may only be used on leaf elements of type
+ * <code>instance-identifier</code> or a type derived from <code>instance-identifier</code>.</li>
+ * </ul>
+ *
+ *
+ * <h3>Routed RPC example</h3>
+ * <p>
+ * <h5>1. Defining a Context Type</h5>
+ * <p>
+ * The following snippet declares a simple YANG <code>identity</code> named <code>example-context</code>:
+ *
+ * <pre>
+ * module example {
+ *     ...
+ *     identity example-context {
+ *          description "Identity used to define an example-context type";
+ *     }
+ *     ...
+ * }
+ * </pre>
+ * <p>
+ * We then use the declared identity to define a context type by using it in combination
+ * with the <code>context-instance</code> YANG extension. We'll associate the context type
+ * with a list element in the data tree. This defines the set of nodes whose instance
+ * identifiers are valid for the <code>example-context</code> context type.
+ * <p>
+ * The following YANG snippet imports the <code>yang-ext</code> module and defines the list
+ * element named <code>item</code> inside a container named <code>foo</code>:
+ *
+ * <pre>
+ * module foo {
+ *     ...
+ *     import yang-ext {prefix ext;}
+ *     ...
+ *     container foo {
+ *          list item {
+ *              key "id";
+ *              leaf id {type string;}
+ *              ext:context-instance "example-context";
+ *          }
+ *     }
+ *     ...
+ * }
+ * </pre>
+ * <p>
+ * The statement <code>ext:context-instance "example-context";</code> inside the list element
+ * declares that any instance identifier referencing <code>item</code> in the data
+ * tree is valid for <code>example-context</code>. For example, the following instance
+ * identifier:
+ * <pre>
+ *     InstanceIdentifier.create(Foo.class).child(Item.class,new ItemKey("Foo"))
+ * </pre>
+ * is valid for <code>example-context</code>. However the following:
+ * <pre>
+ *     InstanceIdentifier.create(Example.class)
+ * </pre>
+ * is not valid.
+ * <p>
+ * So using an <code>identity</code> in combination with <code>context-instance</code> we
+ * have effectively defined a context type that can be referenced in a YANG RPC input.
+ *
+ * <h5>2. Defining an RPC to use the Context Type</h5>
+ * <p>
+ * To define an RPC to be routed based on the context type we need to add an input leaf element
+ * that references the context type which will hold an instance identifier value to be
+ * used to route the RPC.
+ * <p>
+ * The following snippet defines an RPC named <code>show-item</code> with 2 leaf elements
+ * as input: <code>item</code> of type <code>instance-identifier</code> and <code>description</code>:
+ *
+ * <pre>
+ * module foo {
+ *      ...
+ *      import yang-ext {prefix ext;}
+ *      ...
+ *      rpc show-item {
+ *          input {
+ *              leaf item {
+ *                  type instance-identifier;
+ *                  ext:context-reference example-context;
+ *              }
+ *              leaf description {
+ *                  type "string";
+ *              }
+ *          }
+ *      }
+ * }
+ * </pre>
+ * <p>
+ * We mark the <code>item</code> leaf with a <code>context-reference</code> statement that
+ * references the <code>example-context</code> context type. RPC calls will then be routed
+ * based on the instance identifier value contained in <code>item</code>. Only instance
+ * identifiers that point to a <code>foo/item</code> node are valid as input.
+ * <p>
+ * The generated RPC Service interface for the module is:
+ *
+ * <pre>
+ * interface FooService implements RpcService {
+ *      Future&lt;RpcResult&lt;Void&gt;&gt; showItem(ShowItemInput input);
+ * }
+ * </pre>
+ * <p>
+ * For constructing the RPC input, there are generated classes ShowItemInput and ShowItemInputBuilder.
+ *
+ * <h5>3. Registering a routed RPC implementation</h5>
+ * <p>
+ * To register a routed implementation for the <code>show-item</code> RPC, we must use the
+ * {@link #addRoutedRpcImplementation(Class, RpcService)} method. This
+ * will return a {@link RoutedRpcRegistration} instance which can then be used to register /
+ * unregister routed paths associated with the registered implementation.
+ * <p>
+ * The following snippet registers <code>myImpl</code> as the RPC implementation for an
+ * <code>item</code> with key <code>"foo"</code>:
+ * <pre>
+ * // Create the instance identifier path for item "foo"
+ * InstanceIdentifier path = InstanceIdentifier.create(Foo.class).child(Item.class, new ItemKey(&quot;foo&quot;));
+ *
+ * // Register myImpl as the implementation for the FooService RPC interface
+ * RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(FooService.class, myImpl);
+ *
+ * // Now register for the context type and specific path ID. The context type is specified by the
+ * // YANG-generated class for the example-context identity.
+ * reg.registerPath(ExampleContext.class, path);
+ * </pre>
+ * <p>
+ * It is also possible to register the same implementation for multiple paths:
+ *
+ * <pre>
+ * InstanceIdentifier one = InstanceIdentifier.create(Foo.class).child(Item.class, new ItemKey(&quot;One&quot;));
+ * InstanceIdentifier two = InstanceIdentifier.create(Foo.class).child(Item.class, new ItemKey(&quot;Two&quot;));
+ *
+ * RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(FooService.class, myImpl);
+ * reg.registerPath(ExampleContext.class, one);
+ * reg.registerPath(ExampleContext.class, two);
+ * </pre>
+ *
+ * <p>
+ * When another client invokes the <code>showItem(ShowItemInput)</code> method on the proxy instance
+ * retrieved via {@link RpcConsumerRegistry#getRpcService(Class)}, the proxy will inspect the
+ * arguments in ShowItemInput, extract the InstanceIdentifier value of the <code>item</code> leaf and select
+ * the implementation whose registered path matches the InstanceIdentifier value of the <code>item</code> leaf.
+ *
+ * <h2>Notes for RPC Implementations</h2>
+ *
+ * <h3>RpcResult</h3>
+ * <p>
+ * The generated interfaces require implementors to return
+ *  {@link java.util.concurrent.Future Future}&lt;{@link org.opendaylight.yangtools.yang.common.RpcResult RpcResult}&lt;{RpcName}Output&gt;&gt; instances.
+ *
+ * Implementations should do processing of RPC calls asynchronously and update the
+ * returned {@link java.util.concurrent.Future Future} instance when processing is complete.
+ * However using {@link com.google.common.util.concurrent.Futures#immediateFuture(Object) Futures.immediateFuture}
+ * is valid only if the result is immediately available and asynchronous processing is unnecessary and
+ * would only introduce additional complexity.
+ *
+ * <p>
+ * The {@link org.opendaylight.yangtools.yang.common.RpcResult RpcResult} is a generic
+ * wrapper for the RPC output payload, if any, and also allows for attaching error or
+ * warning information (possibly along with the payload) should the RPC processing partially
+ * or completely fail. This is intended to provide additional human readable information
+ * for users of the API and to transfer warning / error information across the system
+ * so it may be visible via other external APIs such as Restconf.
+ * <p>
+ * It is recommended to use the {@link org.opendaylight.yangtools.yang.common.RpcResult RpcResult}
+ * for conveying appropriate error information
+ * on failure rather than purposely throwing unchecked exceptions if at all possible.
+ * While unchecked exceptions will fail the returned {@link java.util.concurrent.Future Future},
+ * using the intended RpcResult to convey the error information is more user-friendly.
  */
 public interface RpcProviderRegistry extends //
         RpcConsumerRegistry, //
         RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
     /**
-     * Registers a global RpcService implementation.
+     * Registers a global implementation of the provided RPC service interface.
+     * All methods of the interface are required to be implemented.
+     *
+     * @param serviceInterface the YANG-generated interface of the RPC Service for which to register.
+     * @param implementation "the implementation of the RPC service interface.
+     * @return an RpcRegistration instance that should be used to unregister the RPC implementation
+     *         when no longer needed by calling {@link RpcRegistration#close()}.
      *
-     * @param type
-     * @param implementation
-     * @return
+     * @throws IllegalStateException
+     *             if the supplied RPC interface is a routed RPC type.
      */
-    <T extends RpcService> RpcRegistration<T> addRpcImplementation(Class<T> type, T implementation)
+    <T extends RpcService> RpcRegistration<T> addRpcImplementation(Class<T> serviceInterface, T implementation)
             throws IllegalStateException;
 
     /**
+     * Registers an implementation of the given routed RPC service interface.
+     * <p>
+     * See the {@link RpcProviderRegistry class} documentation for information and example on
+     * how to use routed RPCs.
      *
-     * Register a Routed RpcService where routing is determined on annotated
-     * (in YANG model) context-reference and value of annotated leaf.
-     *
-     * @param type
-     *            Type of RpcService, use generated interface class, not your
-     *            implementation class
-     * @param implementation
-     *            Implementation of RpcService
-     * @return Registration object for routed Rpc which could be used to unregister
+     * @param serviceInterface the YANG-generated interface of the RPC Service for which to register.
+     * @param implementation the implementation instance to register.
+     * @return a RoutedRpcRegistration instance which can be used to register paths for the RPC
+     *         implementation via invoking {@link RoutedRpcRegistration#registerPath(....).
+     *         {@link RoutedRpcRegistration#close()} should be called to unregister the implementation
+     *         and all previously registered paths when no longer needed.
      *
      * @throws IllegalStateException
+     *            if the supplied RPC interface is not a routed RPC type.
      */
-    <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type, T implementation)
+    <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> serviceInterface,
+                                                                               T implementation)
             throws IllegalStateException;
 }
index f24809de451dbb945f2eab0386bdedcb60f27f34..b109f89ff61adfb016b9eaa5e86818b0ec6b6965 100644 (file)
@@ -7,7 +7,9 @@
  */
 package org.opendaylight.controller.md.sal.binding.impl;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
 
 public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBroker>, DomForwardedBroker,
         SchemaContextListener, AutoCloseable {
@@ -97,8 +100,8 @@ public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBr
     protected Map<InstanceIdentifier<?>, DataObject> toBinding(
             final Map<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized) {
         Map<InstanceIdentifier<?>, DataObject> newMap = new HashMap<>();
-        for (Map.Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, ? extends NormalizedNode<?, ?>> entry : normalized
-                .entrySet()) {
+
+        for (Map.Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, ? extends NormalizedNode<?, ?>> entry : sortedEntries(normalized)) {
             try {
                 Optional<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> potential = getCodec().toBinding(
                         entry);
@@ -113,6 +116,21 @@ public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBr
         return newMap;
     }
 
+    private static <T> Iterable<Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier,T>> sortedEntries(final Map<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, T> map) {
+        ArrayList<Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, T>> entries = new ArrayList<>(map.entrySet());
+        Collections.sort(entries, new Comparator<Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, T>>() {
+
+            @Override
+            public int compare(final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, T> left,
+                    final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, T> right) {
+                int leftSize = Iterables.size(left.getKey().getPathArguments());
+                int rightSize = Iterables.size(right.getKey().getPathArguments());
+                return Integer.compare(leftSize, rightSize);
+            }
+        });
+        return entries;
+    }
+
     protected Set<InstanceIdentifier<?>> toBinding(
             final Set<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> normalized) {
         Set<InstanceIdentifier<?>> hashSet = new HashSet<>();
index 3fac0dc93adf55b47002abedeff09ad0646d5726..e5e1e300c17437e52962218f168da911b3f54526 100644 (file)
  */
 package org.opendaylight.controller.md.sal.binding.impl;
 
-import java.util.ArrayList;
-import java.util.EnumMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
-import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.yangtools.concepts.Delegator;
+import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
-public class AbstractForwardedTransaction<T extends AsyncTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>>>
-        implements Delegator<T> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractForwardedTransaction.class);
+abstract class AbstractForwardedTransaction<T extends AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>>>
+        implements Delegator<T>, Identifiable<Object> {
+
     private final T delegate;
-    private final static CacheBuilder<Object, Object> CACHE_BUILDER = CacheBuilder.newBuilder()
-            .expireAfterWrite(10, TimeUnit.MILLISECONDS).maximumSize(100);
     private final BindingToNormalizedNodeCodec codec;
-    private final EnumMap<LogicalDatastoreType, Cache<InstanceIdentifier<?>, DataObject>> cacheMap;
 
-    protected AbstractForwardedTransaction(final T delegate, final BindingToNormalizedNodeCodec codec) {
-        super();
-        this.delegate = delegate;
-        this.codec = codec;
+    public AbstractForwardedTransaction(final T delegateTx, final BindingToNormalizedNodeCodec codec) {
+        this.delegate = Preconditions.checkNotNull(delegateTx, "Delegate must not be null");
+        this.codec = Preconditions.checkNotNull(codec, "Codec must not be null");
+    }
 
-        this.cacheMap = new EnumMap<>(LogicalDatastoreType.class);
-        cacheMap.put(LogicalDatastoreType.OPERATIONAL, CACHE_BUILDER.<InstanceIdentifier<?>, DataObject> build());
-        cacheMap.put(LogicalDatastoreType.CONFIGURATION, CACHE_BUILDER.<InstanceIdentifier<?>, DataObject> build());
 
+    @Override
+    public final  Object getIdentifier() {
+        return delegate.getIdentifier();
     }
 
     @Override
-    public T getDelegate() {
+    public final  T getDelegate() {
         return delegate;
     }
 
-    protected final BindingToNormalizedNodeCodec getCodec() {
-        return codec;
-    }
-
-    protected ListenableFuture<Optional<DataObject>> transformFuture(final LogicalDatastoreType store,
-            final InstanceIdentifier<?> path, final ListenableFuture<Optional<NormalizedNode<?, ?>>> future) {
-        return Futures.transform(future, new Function<Optional<NormalizedNode<?, ?>>, Optional<DataObject>>() {
-            @Nullable
-            @Override
-            public Optional<DataObject> apply(@Nullable final Optional<NormalizedNode<?, ?>> normalizedNode) {
-                if (normalizedNode.isPresent()) {
-                    final DataObject dataObject;
-                    try {
-                        dataObject = codec.toBinding(path, normalizedNode.get());
-                    } catch (DeserializationException e) {
-                        LOG.warn("Failed to create dataobject from node {}", normalizedNode.get(), e);
-                        throw new IllegalStateException("Failed to create dataobject", e);
-                    }
-
-                    if (dataObject != null) {
-                        updateCache(store, path, dataObject);
-                        return Optional.of(dataObject);
-                    }
-                }
-                return Optional.absent();
-            }
-        });
-    }
-
-    protected void doPut(final DOMDataWriteTransaction writeTransaction, final LogicalDatastoreType store,
-            final InstanceIdentifier<?> path, final DataObject data) {
-        invalidateCache(store, path);
-        final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = codec
-                .toNormalizedNode(path, data);
-        writeTransaction.put(store, normalized.getKey(), normalized.getValue());
-    }
-
-    protected void doPutWithEnsureParents(final DOMDataReadWriteTransaction writeTransaction,
-            final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
-        invalidateCache(store, path);
-        final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = codec
-                .toNormalizedNode(path, data);
-
-        final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath = normalized.getKey();
-        ensureParentsByMerge(writeTransaction, store, normalizedPath, path);
-        LOG.debug("Tx: {} : Putting data {}", getDelegate().getIdentifier(), normalizedPath);
-        writeTransaction.put(store, normalizedPath, normalized.getValue());
-    }
-
-    protected void doMergeWithEnsureParents(final DOMDataReadWriteTransaction writeTransaction,
-            final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
-        invalidateCache(store, path);
-        final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = codec
-                .toNormalizedNode(path, data);
-
-        final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath = normalized.getKey();
-        ensureParentsByMerge(writeTransaction, store, normalizedPath, path);
-        LOG.debug("Tx: {} : Merge data {}",getDelegate().getIdentifier(),normalizedPath);
-        writeTransaction.merge(store, normalizedPath, normalized.getValue());
+    @SuppressWarnings("unchecked")
+    protected final <S extends AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>>> S getDelegateChecked(final Class<S> txType) {
+        Preconditions.checkState(txType.isInstance(delegate));
+        return (S) delegate;
     }
 
-    private void ensureParentsByMerge(final DOMDataReadWriteTransaction writeTransaction,
-            final LogicalDatastoreType store,
-            final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath,
-            final InstanceIdentifier<?> path) {
-        List<PathArgument> currentArguments = new ArrayList<>();
-        DataNormalizationOperation<?> currentOp = codec.getDataNormalizer().getRootOperation();
-        Iterator<PathArgument> iterator = normalizedPath.getPath().iterator();
-        while (iterator.hasNext()) {
-            PathArgument currentArg = iterator.next();
-            try {
-                currentOp = currentOp.getChild(currentArg);
-            } catch (DataNormalizationException e) {
-                throw new IllegalArgumentException(String.format("Invalid child encountered in path %s", path), e);
-            }
-            currentArguments.add(currentArg);
-            org.opendaylight.yangtools.yang.data.api.InstanceIdentifier currentPath = new org.opendaylight.yangtools.yang.data.api.InstanceIdentifier(
-                    currentArguments);
-
-            final Optional<NormalizedNode<?, ?>> d;
-            try {
-                d = writeTransaction.read(store, currentPath).get();
-            } catch (InterruptedException | ExecutionException e) {
-                LOG.error("Failed to read pre-existing data from store {} path {}", store, currentPath, e);
-                throw new IllegalStateException("Failed to read pre-existing data", e);
-            }
-
-            if (!d.isPresent() && iterator.hasNext()) {
-                writeTransaction.merge(store, currentPath, currentOp.createDefault(currentArg));
-            }
-        }
-    }
-
-    protected void doMerge(final DOMDataWriteTransaction writeTransaction, final LogicalDatastoreType store,
-            final InstanceIdentifier<?> path, final DataObject data) {
-        invalidateCache(store, path);
-        final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = codec
-                .toNormalizedNode(path, data);
-        writeTransaction.merge(store, normalized.getKey(), normalized.getValue());
-    }
-
-    protected void doDelete(final DOMDataWriteTransaction writeTransaction, final LogicalDatastoreType store,
-            final InstanceIdentifier<?> path) {
-        invalidateCache(store, path);
-        final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized = codec.toNormalized(path);
-        writeTransaction.delete(store, normalized);
-    }
-
-    protected ListenableFuture<RpcResult<TransactionStatus>> doCommit(final DOMDataWriteTransaction writeTransaction) {
-        return writeTransaction.commit();
-    }
-
-    protected boolean doCancel(final DOMDataWriteTransaction writeTransaction) {
-        return writeTransaction.cancel();
-    }
-
-    protected ListenableFuture<Optional<DataObject>> doRead(final DOMDataReadTransaction readTransaction,
-            final LogicalDatastoreType store, final InstanceIdentifier<?> path) {
-        final DataObject dataObject = getFromCache(store, path);
-        if (dataObject == null) {
-            final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = readTransaction.read(store,
-                    codec.toNormalized(path));
-            return transformFuture(store, path, future);
-        } else {
-            return Futures.immediateFuture(Optional.of(dataObject));
-        }
-    }
-
-    private DataObject getFromCache(final LogicalDatastoreType store, final InstanceIdentifier<?> path) {
-        Cache<InstanceIdentifier<?>, DataObject> cache = cacheMap.get(store);
-        if (cache != null) {
-            return cache.getIfPresent(path);
-        }
-        return null;
-    }
-
-    private void updateCache(final LogicalDatastoreType store, final InstanceIdentifier<?> path,
-            final DataObject dataObject) {
-        // Check if cache exists. If not create one.
-        Cache<InstanceIdentifier<?>, DataObject> cache = cacheMap.get(store);
-        if (cache == null) {
-            cache = CacheBuilder.newBuilder().maximumSize(1000).expireAfterWrite(1, TimeUnit.MINUTES).build();
-
-        }
-
-        cache.put(path, dataObject);
+    protected final BindingToNormalizedNodeCodec getCodec() {
+        return codec;
     }
 
-    private void invalidateCache(final LogicalDatastoreType store, final InstanceIdentifier<?> path) {
-        // FIXME: Optimization: invalidate only parents and children of path
-        Cache<InstanceIdentifier<?>, DataObject> cache = cacheMap.get(store);
-        cache.invalidateAll();
-        LOG.trace("Cache invalidated");
+    protected final ListenableFuture<Optional<DataObject>> doRead(final DOMDataReadTransaction readTx,
+            final LogicalDatastoreType store, final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> path) {
+        return Futures.transform(readTx.read(store, codec.toNormalized(path)), codec.deserializeFunction(path));
     }
-
 }
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractReadWriteTransaction.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractReadWriteTransaction.java
new file mode 100644 (file)
index 0000000..3988bc6
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
+import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+public class AbstractReadWriteTransaction extends AbstractWriteTransaction<DOMDataReadWriteTransaction> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractReadWriteTransaction.class);
+
+    public AbstractReadWriteTransaction(final DOMDataReadWriteTransaction delegate, final BindingToNormalizedNodeCodec codec) {
+        super(delegate, codec);
+    }
+
+    protected final void doPutWithEnsureParents(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+        final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
+                .toNormalizedNode(path, data);
+
+        final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath = normalized.getKey();
+        ensureParentsByMerge(store, normalizedPath, path);
+        LOG.debug("Tx: {} : Putting data {}", getDelegate().getIdentifier(), normalizedPath);
+        doPut(store, path, data);
+    }
+
+    protected final void doMergeWithEnsureParents(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+        final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
+                .toNormalizedNode(path, data);
+
+        final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath = normalized.getKey();
+        ensureParentsByMerge(store, normalizedPath, path);
+        LOG.debug("Tx: {} : Merge data {}", getDelegate().getIdentifier(), normalizedPath);
+        doMerge(store, path, data);
+    }
+
+    private final void ensureParentsByMerge(final LogicalDatastoreType store,
+            final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath,
+            final InstanceIdentifier<?> path) {
+        List<PathArgument> currentArguments = new ArrayList<>();
+        DataNormalizationOperation<?> currentOp = getCodec().getDataNormalizer().getRootOperation();
+        Iterator<PathArgument> iterator = normalizedPath.getPathArguments().iterator();
+        while (iterator.hasNext()) {
+            PathArgument currentArg = iterator.next();
+            try {
+                currentOp = currentOp.getChild(currentArg);
+            } catch (DataNormalizationException e) {
+                throw new IllegalArgumentException(String.format("Invalid child encountered in path %s", path), e);
+            }
+            currentArguments.add(currentArg);
+            org.opendaylight.yangtools.yang.data.api.InstanceIdentifier currentPath = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.create(
+                    currentArguments);
+
+            final Optional<NormalizedNode<?, ?>> d;
+            try {
+                d = getDelegate().read(store, currentPath).get();
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.error("Failed to read pre-existing data from store {} path {}", store, currentPath, e);
+                throw new IllegalStateException("Failed to read pre-existing data", e);
+            }
+
+            if (!d.isPresent() && iterator.hasNext()) {
+                getDelegate().merge(store, currentPath, currentOp.createDefault(currentArg));
+            }
+        }
+    }
+
+
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractWriteTransaction.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/AbstractWriteTransaction.java
new file mode 100644 (file)
index 0000000..5ce6687
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import java.util.Map.Entry;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ *
+ * Abstract Base Transaction for transactions which are backed by
+ * {@link DOMDataWriteTransaction}
+ */
+public class AbstractWriteTransaction<T extends DOMDataWriteTransaction> extends
+        AbstractForwardedTransaction<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteTransaction.class);
+
+    protected AbstractWriteTransaction(final T delegate,
+            final BindingToNormalizedNodeCodec codec) {
+        super(delegate, codec);
+    }
+
+    protected final void doPut(final LogicalDatastoreType store,
+            final InstanceIdentifier<?> path, final DataObject data) {
+        final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
+                .toNormalizedNode(path, data);
+        getDelegate().put(store, normalized.getKey(), normalized.getValue());
+    }
+
+    protected final void doMerge(final LogicalDatastoreType store,
+            final InstanceIdentifier<?> path, final DataObject data) {
+        final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
+                .toNormalizedNode(path, data);
+        getDelegate().merge(store, normalized.getKey(), normalized.getValue());
+    }
+
+    protected final void doDelete(final LogicalDatastoreType store,
+            final InstanceIdentifier<?> path) {
+        final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized = getCodec().toNormalized(path);
+        getDelegate().delete(store, normalized);
+    }
+
+    protected final ListenableFuture<RpcResult<TransactionStatus>> doCommit() {
+        return getDelegate().commit();
+    }
+
+    protected final boolean doCancel() {
+        return getDelegate().cancel();
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadTransactionImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadTransactionImpl.java
new file mode 100644 (file)
index 0000000..e71404d
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+
+class BindingDataReadTransactionImpl extends AbstractForwardedTransaction<DOMDataReadOnlyTransaction> implements
+        ReadOnlyTransaction {
+
+    protected BindingDataReadTransactionImpl(final DOMDataReadOnlyTransaction delegate,
+            final BindingToNormalizedNodeCodec codec) {
+        super(delegate, codec);
+    }
+
+    @Override
+    public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
+            final InstanceIdentifier<?> path) {
+        return doRead(getDelegate(),store, path);
+    }
+
+    @Override
+    public void close() {
+        getDelegate().close();
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadWriteTransactionImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataReadWriteTransactionImpl.java
new file mode 100644 (file)
index 0000000..5a89cc7
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+
+class BindingDataReadWriteTransactionImpl extends
+        BindingDataWriteTransactionImpl<DOMDataReadWriteTransaction> implements ReadWriteTransaction {
+
+    protected BindingDataReadWriteTransactionImpl(final DOMDataReadWriteTransaction delegate,
+            final BindingToNormalizedNodeCodec codec) {
+        super(delegate, codec);
+    }
+
+    @Override
+    public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
+            final InstanceIdentifier<?> path) {
+        return doRead(getDelegate(), store, path);
+    }
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataWriteTransactionImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingDataWriteTransactionImpl.java
new file mode 100644 (file)
index 0000000..a62319b
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+class BindingDataWriteTransactionImpl<T extends DOMDataWriteTransaction> extends
+        AbstractWriteTransaction<T> implements WriteTransaction {
+
+    protected BindingDataWriteTransactionImpl(final T delegateTx, final BindingToNormalizedNodeCodec codec) {
+        super(delegateTx, codec);
+    }
+
+
+
+    @Override
+    public void put(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+        doPut(store, path, data);
+    }
+
+    @Override
+    public void merge(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+        doMerge(store, path, data);
+    }
+
+    @Override
+    public void delete(final LogicalDatastoreType store, final InstanceIdentifier<?> path) {
+        doDelete( store, path);
+    }
+
+    @Override
+    public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+        return doCommit();
+    }
+
+    @Override
+    public boolean cancel() {
+        return doCancel();
+    }
+}
\ No newline at end of file
index f1be5c6922ecda45cc75f343b3f6355e402e013d..003f57cd72f053557a79fa761896894836ced89e 100644 (file)
@@ -14,6 +14,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 
+import javax.annotation.Nullable;
+
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
 import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
@@ -42,7 +44,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -75,7 +79,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
 
     public Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
             final InstanceIdentifier<? extends DataObject> bindingPath, final DataObject bindingObject) {
-        return toNormalizedNode(toEntry(bindingPath, bindingObject));
+        return toNormalizedNode(toBindingEntry(bindingPath, bindingObject));
 
     }
 
@@ -87,17 +91,16 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
                 .toNormalized(legacyEntry);
         LOG.trace("Serialization of {}, Legacy Representation: {}, Normalized Representation: {}", binding,
                 legacyEntry, normalizedEntry);
-        if (Augmentation.class.isAssignableFrom(binding.getKey().getTargetType())) {
+        if (isAugmentation(binding.getKey().getTargetType())) {
 
             for (DataContainerChild<? extends PathArgument, ?> child : ((DataContainerNode<?>) normalizedEntry
                     .getValue()).getValue()) {
                 if (child instanceof AugmentationNode) {
                     ImmutableList<PathArgument> childArgs = ImmutableList.<PathArgument> builder()
-                            .addAll(normalizedEntry.getKey().getPath()).add(child.getIdentifier()).build();
-                    org.opendaylight.yangtools.yang.data.api.InstanceIdentifier childPath = new org.opendaylight.yangtools.yang.data.api.InstanceIdentifier(
+                            .addAll(normalizedEntry.getKey().getPathArguments()).add(child.getIdentifier()).build();
+                    org.opendaylight.yangtools.yang.data.api.InstanceIdentifier childPath = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.create(
                             childArgs);
-                    return new SimpleEntry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>>(
-                            childPath, child);
+                    return toDOMEntry(childPath, child);
                 }
             }
 
@@ -119,7 +122,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
             final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
             throws DeserializationException {
 
-        PathArgument lastArgument = Iterables.getLast(normalized.getPath());
+        PathArgument lastArgument = Iterables.getLast(normalized.getPathArguments());
         // Used instance-identifier codec do not support serialization of last
         // path
         // argument if it is AugmentationIdentifier (behaviour expected by old
@@ -143,7 +146,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
         }
 
         int normalizedCount = getAugmentationCount(normalized);
-        AugmentationIdentifier lastArgument = (AugmentationIdentifier) Iterables.getLast(normalized.getPath());
+        AugmentationIdentifier lastArgument = (AugmentationIdentifier) Iterables.getLast(normalized.getPathArguments());
 
         // Here we employ small trick - Binding-aware Codec injects an pointer
         // to augmentation class
@@ -152,7 +155,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
         LOG.trace("Looking for candidates to match {}", normalized);
         for (QName child : lastArgument.getPossibleChildNames()) {
             org.opendaylight.yangtools.yang.data.api.InstanceIdentifier childPath = new org.opendaylight.yangtools.yang.data.api.InstanceIdentifier(
-                    ImmutableList.<PathArgument> builder().addAll(normalized.getPath()).add(new NodeIdentifier(child))
+                    ImmutableList.<PathArgument> builder().addAll(normalized.getPathArguments()).add(new NodeIdentifier(child))
                             .build());
             try {
                 if (isNotRepresentable(childPath)) {
@@ -218,19 +221,26 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
             final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
             throws DataNormalizationException {
         DataNormalizationOperation<?> current = legacyToNormalized.getRootOperation();
-        for (PathArgument arg : normalized.getPath()) {
+        for (PathArgument arg : normalized.getPathArguments()) {
             current = current.getChild(arg);
         }
         return current;
     }
 
-    private static final Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject> toEntry(
+    private static final Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject> toBindingEntry(
             final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject> key,
             final DataObject value) {
         return new SimpleEntry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject>(
                 key, value);
     }
 
+    private static final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> toDOMEntry(
+            final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier key,
+            final NormalizedNode<?, ?> value) {
+        return new SimpleEntry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>>(
+                key, value);
+    }
+
     public DataObject toBinding(final InstanceIdentifier<?> path, final NormalizedNode<?, ?> normalizedNode)
             throws DeserializationException {
         CompositeNode legacy = null;
@@ -262,7 +272,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
             if (bindingData == null) {
                 LOG.warn("Failed to deserialize {} to Binding format. Binding path is: {}", normalized, bindingPath);
             }
-            return Optional.of(toEntry(bindingPath, bindingData));
+            return Optional.of(toBindingEntry(bindingPath, bindingData));
         } else {
             return Optional.absent();
         }
@@ -304,15 +314,15 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
             final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized) {
         int position = 0;
         int foundPosition = -1;
-        for (PathArgument arg : normalized.getPath()) {
+        for (PathArgument arg : normalized.getPathArguments()) {
             position++;
             if (arg instanceof AugmentationIdentifier) {
                 foundPosition = position;
             }
         }
         if (foundPosition > 0) {
-            return new org.opendaylight.yangtools.yang.data.api.InstanceIdentifier(normalized.getPath().subList(0,
-                    foundPosition));
+            Iterable<PathArgument> shortened = Iterables.limit(normalized.getPathArguments(), foundPosition);
+            return org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.create(shortened);
         }
         return null;
     }
@@ -404,7 +414,7 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
     }
 
     private boolean isAugmentationIdentifier(final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier processed) {
-        return Iterables.getLast(processed.getPath()) instanceof AugmentationIdentifier;
+        return Iterables.getLast(processed.getPathArguments()) instanceof AugmentationIdentifier;
     }
 
     private static int getAugmentationCount(final InstanceIdentifier<?> potential) {
@@ -420,11 +430,46 @@ public class BindingToNormalizedNodeCodec implements SchemaContextListener {
 
     private static int getAugmentationCount(final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier potential) {
         int count = 0;
-        for(PathArgument arg : potential.getPath()) {
+        for(PathArgument arg : potential.getPathArguments()) {
             if(arg instanceof AugmentationIdentifier) {
                 count++;
             }
         }
         return count;
     }
+
+    public Function<Optional<NormalizedNode<?, ?>>, Optional<DataObject>>  deserializeFunction(final InstanceIdentifier<?> path) {
+        return new DeserializeFunction(this, path);
+    }
+
+    private static class DeserializeFunction implements Function<Optional<NormalizedNode<?, ?>>, Optional<DataObject>> {
+
+        private final BindingToNormalizedNodeCodec codec;
+        private final InstanceIdentifier<?> path;
+
+        public DeserializeFunction(final BindingToNormalizedNodeCodec codec, final InstanceIdentifier<?> path) {
+            super();
+            this.codec = Preconditions.checkNotNull(codec, "Codec must not be null");
+            this.path = Preconditions.checkNotNull(path, "Path must not be null");
+        }
+
+        @Nullable
+        @Override
+        public Optional<DataObject> apply(@Nullable final Optional<NormalizedNode<?, ?>> normalizedNode) {
+            if (normalizedNode.isPresent()) {
+                final DataObject dataObject;
+                try {
+                    dataObject = codec.toBinding(path, normalizedNode.get());
+                } catch (DeserializationException e) {
+                    LOG.warn("Failed to create dataobject from node {}", normalizedNode.get(), e);
+                    throw new IllegalStateException("Failed to create dataobject", e);
+                }
+
+                if (dataObject != null) {
+                    return Optional.of(dataObject);
+                }
+            }
+            return Optional.absent();
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingTranslatedTransactionChain.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/md/sal/binding/impl/BindingTranslatedTransactionChain.java
new file mode 100644 (file)
index 0000000..2d8e51c
--- /dev/null
@@ -0,0 +1,115 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.yangtools.concepts.Delegator;
+
+import com.google.common.base.Preconditions;
+
+class BindingTranslatedTransactionChain implements BindingTransactionChain, Delegator<DOMTransactionChain> {
+
+    private final DOMTransactionChain delegate;
+
+    @GuardedBy("this")
+    private final Map<AsyncTransaction<?, ?>, AsyncTransaction<?, ?>> delegateTxToBindingTx = new WeakHashMap<>();
+    private final BindingToNormalizedNodeCodec codec;
+
+    public BindingTranslatedTransactionChain(final DOMDataBroker chainFactory,
+            final BindingToNormalizedNodeCodec codec, final TransactionChainListener listener) {
+        Preconditions.checkNotNull(chainFactory, "DOM Transaction chain factory must not be null");
+        this.delegate = chainFactory.createTransactionChain(new ListenerInvoker(listener));
+        this.codec = codec;
+    }
+
+    @Override
+    public DOMTransactionChain getDelegate() {
+        return delegate;
+    }
+
+    @Override
+    public ReadOnlyTransaction newReadOnlyTransaction() {
+        DOMDataReadOnlyTransaction delegateTx = delegate.newReadOnlyTransaction();
+        ReadOnlyTransaction bindingTx = new BindingDataReadTransactionImpl(delegateTx, codec);
+        putDelegateToBinding(delegateTx, bindingTx);
+        return bindingTx;
+    }
+
+    @Override
+    public ReadWriteTransaction newReadWriteTransaction() {
+        DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
+        ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec);
+        putDelegateToBinding(delegateTx, bindingTx);
+        return bindingTx;
+    }
+
+    @Override
+    public WriteTransaction newWriteOnlyTransaction() {
+        DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction();
+        WriteTransaction bindingTx = new BindingDataWriteTransactionImpl<>(delegateTx, codec);
+        putDelegateToBinding(delegateTx, bindingTx);
+        return bindingTx;
+    }
+
+    @Override
+    public void close() {
+        delegate.close();
+    }
+
+    private synchronized void putDelegateToBinding(final AsyncTransaction<?, ?> domTx,
+            final AsyncTransaction<?, ?> bindingTx) {
+        final Object previous = delegateTxToBindingTx.put(domTx, bindingTx);
+        Preconditions.checkState(previous == null, "DOM Transaction %s has already associated binding transation %s",domTx,previous);
+    }
+
+    private synchronized AsyncTransaction<?, ?> getBindingTransaction(final AsyncTransaction<?, ?> transaction) {
+        return delegateTxToBindingTx.get(transaction);
+    }
+
+    private final class ListenerInvoker implements TransactionChainListener {
+
+        private final TransactionChainListener listener;
+
+        public ListenerInvoker(final TransactionChainListener listener) {
+            this.listener = Preconditions.checkNotNull(listener, "Listener must not be null.");
+        }
+
+        @Override
+        public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
+                final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+            Preconditions.checkState(delegate.equals(chain),
+                    "Illegal state - listener for %s was invoked for incorrect chain %s.", delegate, chain);
+            AsyncTransaction<?, ?> bindingTx = getBindingTransaction(transaction);
+            listener.onTransactionChainFailed(chain, bindingTx, cause);
+        }
+
+        @Override
+        public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+            Preconditions.checkState(delegate.equals(chain),
+                    "Illegal state - listener for %s was invoked for incorrect chain %s.", delegate, chain);
+            listener.onTransactionChainSuccessful(BindingTranslatedTransactionChain.this);
+        }
+    }
+
+}
index 49d04d04b3f7d58c1d1f1f5e250d918f91878f9e..1c6447a4e741615b714657c9356dc6b756fe44c0 100644 (file)
@@ -186,7 +186,7 @@ public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDat
     }
 
     private class ForwardedBackwardsCompatibleTransacion extends
-            AbstractForwardedTransaction<DOMDataReadWriteTransaction> implements DataModificationTransaction {
+            AbstractReadWriteTransaction implements DataModificationTransaction {
 
         private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
         private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
@@ -214,9 +214,9 @@ public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDat
         public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
             boolean previouslyRemoved = posponedRemovedOperational.remove(path);
             if(previouslyRemoved) {
-                doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.OPERATIONAL, path, data);
+                doPutWithEnsureParents(LogicalDatastoreType.OPERATIONAL, path, data);
             } else {
-                doMergeWithEnsureParents(getDelegate(), LogicalDatastoreType.OPERATIONAL, path, data);
+                doMergeWithEnsureParents(LogicalDatastoreType.OPERATIONAL, path, data);
             }
         }
 
@@ -232,9 +232,9 @@ public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDat
             }
             updated.put(path, data);
             if(previouslyRemoved) {
-                doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.CONFIGURATION, path, data);
+                doPutWithEnsureParents(LogicalDatastoreType.CONFIGURATION, path, data);
             } else {
-                doMergeWithEnsureParents(getDelegate(), LogicalDatastoreType.CONFIGURATION, path, data);
+                doMergeWithEnsureParents(LogicalDatastoreType.CONFIGURATION, path, data);
             }
         }
 
@@ -308,11 +308,6 @@ public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDat
             }
         }
 
-        @Override
-        public Object getIdentifier() {
-            return getDelegate().getIdentifier();
-        }
-
         private void changeStatus(final TransactionStatus status) {
             LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
             this.status = status;
@@ -330,11 +325,11 @@ public class ForwardedBackwardsCompatibleDataBroker extends AbstractForwardedDat
         public ListenableFuture<RpcResult<TransactionStatus>> commit() {
 
             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
-                doDelete(getDelegate(), LogicalDatastoreType.CONFIGURATION, path);
+                doDelete(LogicalDatastoreType.CONFIGURATION, path);
             }
 
             for(InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
-                doDelete(getDelegate(), LogicalDatastoreType.OPERATIONAL, path);
+                doDelete(LogicalDatastoreType.OPERATIONAL, path);
             }
 
             changeStatus(TransactionStatus.SUBMITED);
index 5b008ad4bc3c0fe87863bd5d968394bc0a38396d..6359b60684ef45e18d2185d1231b14b165497724 100644 (file)
@@ -8,27 +8,16 @@
 package org.opendaylight.controller.md.sal.binding.impl;
 
 
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
-
 /**
  * The DataBrokerImpl simply defers to the DOMDataBroker for all its operations.
  * All transactions and listener registrations are wrapped by the DataBrokerImpl
@@ -37,9 +26,7 @@ import com.google.common.util.concurrent.ListenableFuture;
  * Besides this the DataBrokerImpl and it's collaborators also cache data that
  * is already transformed from the binding independent to binding aware format
  *
- * TODO : All references in this class to CompositeNode should be switched to
- * NormalizedNode once the MappingService is updated
- *
+
  */
 public class ForwardedBindingDataBroker extends AbstractForwardedDataBroker implements DataBroker {
 
@@ -60,90 +47,11 @@ public class ForwardedBindingDataBroker extends AbstractForwardedDataBroker impl
 
     @Override
     public WriteTransaction newWriteOnlyTransaction() {
-        return new BindingDataWriteTransactionImpl<DOMDataWriteTransaction>(getDelegate().newWriteOnlyTransaction(),getCodec());
-    }
-
-    private abstract class AbstractBindingTransaction<T extends AsyncTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>>>
-            extends AbstractForwardedTransaction<T> implements AsyncTransaction<InstanceIdentifier<?>, DataObject> {
-
-        protected AbstractBindingTransaction(final T delegate, final BindingToNormalizedNodeCodec codec) {
-            super(delegate, codec);
-        }
-
-        @Override
-        public Object getIdentifier() {
-            return getDelegate().getIdentifier();
-        }
-
-    }
-
-
-    private class BindingDataReadTransactionImpl extends AbstractBindingTransaction<DOMDataReadOnlyTransaction> implements
-            ReadOnlyTransaction {
-
-        protected BindingDataReadTransactionImpl(final DOMDataReadOnlyTransaction delegate,
-                final BindingToNormalizedNodeCodec codec) {
-            super(delegate, codec);
-        }
-
-        @Override
-        public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
-                final InstanceIdentifier<?> path) {
-            return doRead(getDelegate(), store, path);
-        }
-
-        @Override
-        public void close() {
-            getDelegate().close();
-        }
+        return new BindingDataWriteTransactionImpl<>(getDelegate().newWriteOnlyTransaction(),getCodec());
     }
 
-    private class BindingDataWriteTransactionImpl<T extends DOMDataWriteTransaction> extends
-            AbstractBindingTransaction<T> implements WriteTransaction {
-
-        protected BindingDataWriteTransactionImpl(final T delegate, final BindingToNormalizedNodeCodec codec) {
-            super(delegate, codec);
-
-        }
-
-        @Override
-        public boolean cancel() {
-            return doCancel(getDelegate());
-        }
-
-        @Override
-        public void put(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
-            doPut(getDelegate(), store, path, data);
-        }
-
-        @Override
-        public void merge(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
-            doMerge(getDelegate(), store, path, data);
-        }
-
-        @Override
-        public void delete(final LogicalDatastoreType store, final InstanceIdentifier<?> path) {
-            doDelete(getDelegate(), store, path);
-        }
-
-        @Override
-        public ListenableFuture<RpcResult<TransactionStatus>> commit() {
-            return doCommit(getDelegate());
-        }
-    }
-
-    private class BindingDataReadWriteTransactionImpl extends
-            BindingDataWriteTransactionImpl<DOMDataReadWriteTransaction> implements ReadWriteTransaction {
-
-        protected BindingDataReadWriteTransactionImpl(final DOMDataReadWriteTransaction delegate,
-                final BindingToNormalizedNodeCodec codec) {
-            super(delegate, codec);
-        }
-
-        @Override
-        public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
-                final InstanceIdentifier<?> path) {
-            return doRead(getDelegate(), store, path);
-        }
+    @Override
+    public BindingTransactionChain createTransactionChain(final TransactionChainListener listener) {
+        return new BindingTranslatedTransactionChain(getDelegate(), getCodec(), listener);
     }
 }
index 5f84ec579d7dd25bdc9580f785aca6d300c35a0a..0c04b936b668847d6914cefe522190751f10f733 100644 (file)
@@ -9,9 +9,36 @@ package org.opendaylight.controller.md.sal.common.api.routing;
 
 import java.util.Map;
 import java.util.Set;
-
+/**
+ * Event representing change in RPC routing table.
+ *
+ *
+ * @param <C> Type, which is used to represent Routing context.
+ * @param <P> Type of data tree path, which is used to identify route.
+ */
 public interface RouteChange<C,P> {
 
+    /**
+     *
+     * Returns a map of removed routes in associated routing contexts.
+     * <p>
+     * This map represents routes, which were withdrawn from broker local
+     * routing table and broker may need to forward RPC to other broker
+     * in order to process RPC request.
+     *
+     * @return Map of contexts and removed routes
+     */
     Map<C,Set<P>> getRemovals();
+    /**
+    *
+    * Returns a map of announced routes in associated routing contexts.
+    *
+    * This map represents routes, which were announced by broker
+    * and are present in broker's local routing table. This routes
+    * are processed by implementations which are registered
+    * to originating broker.
+    *
+    * @return Map of contexts and announced routes
+    */
     Map<C,Set<P>> getAnnouncements();
 }
index 62206013f816fea5a85885e1f5696d6b93603a4f..b3b6fe6ee93ba44a30abaf32028044c7c89d006f 100644 (file)
@@ -8,8 +8,23 @@
 package org.opendaylight.controller.md.sal.common.api.routing;
 
 import java.util.EventListener;
-
+/**
+ *
+ * Listener which is interested in receiving RouteChangeEvents
+ * for its local broker.
+ * <p>
+ * Listener is registerd via {@link RouteChangePublisher#registerRouteChangeListener(RouteChangeListener)}
+ *
+ *
+ * @param <C> Type, which is used to represent Routing context.
+ * @param <P> Type of data tree path, which is used to identify route.
+ */
 public interface RouteChangeListener<C,P> extends EventListener {
 
+    /**
+     * Callback which is invoked if there is an rpc routing table change.
+     *
+     * @param change Event representing change in local RPC routing table.
+     */
     void onRouteChange(RouteChange<C, P> change);
 }
index 7bf61fab0b726cd9ebc4f3fdcac428c447f74b26..dc6b6dd3b7d89c3d23a3ed6bf7e2157bd49bbc4c 100644 (file)
@@ -9,6 +9,12 @@ package org.opendaylight.controller.md.sal.common.api.routing;
 
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 
+/**
+ * Publishes changes in local RPC routing table to registered listener.
+ *
+ * @param <C> Type, which is used to represent Routing context.
+ * @param <P> Type of data tree path, which is used to identify route.
+ */
 public interface RouteChangePublisher<C,P> {
 
     <L extends RouteChangeListener<C,P>> ListenerRegistration<L> registerRouteChangeListener(L listener);
index 6ce7b5a5c7f990360ff1d5cb3d01e63989fc42da..6fe8d6921730fc55e4fe3900ea66823ee2b644ed 100644 (file)
@@ -10,9 +10,31 @@ package org.opendaylight.controller.md.sal.common.api.routing;
 import org.opendaylight.yangtools.concepts.Path;
 import org.opendaylight.yangtools.concepts.Registration;
 
+/**
+ * Base interface for a routed RPC RPC implementation registration.
+ *
+ * @param <C> the context type used for routing
+ * @param <P> the path identifier type
+ * @param <S> the RPC implementation type
+ */
 public interface RoutedRegistration<C, P extends Path<P>, S> extends Registration<S> {
 
+    /**
+     * Registers the RPC implementation associated with this registration for the given path
+     * identifier and context.
+     *
+     * @param context the context used for routing RPCs to this implementation.
+     * @param path the path identifier for which to register.
+     */
     void registerPath(C context, P path);
+
+    /**
+     * Unregisters the RPC implementation associated with this registration for the given path
+     * identifier and context.
+     *
+     * @param context the context used for routing RPCs to this implementation.
+     * @param path the path identifier for which to unregister.
+     */
     void unregisterPath(C context, P path);
 
     @Override
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
new file mode 100644 (file)
index 0000000..ba09d04
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+
+public class DataChangeListener extends UntypedActor {
+    @Override public void onReceive(Object message) throws Exception {
+        throw new UnsupportedOperationException("onReceive");
+    }
+
+    public static Props props() {
+        return Props.create(new Creator<DataChangeListener>() {
+            @Override
+            public DataChangeListener create() throws Exception {
+                return new DataChangeListener();
+            }
+
+        });
+
+    }
+}
index c87f1abb21c74cf585d2fcfe6a4057b0c7143e99..f64c6f1a8669888726f30bfe4099aa628365ccbb 100644 (file)
@@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
@@ -20,39 +24,72 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *
  */
-public class DistributedDataStore implements DOMStore {
-  private final ActorRef shardManager;
-
-  public DistributedDataStore(ActorSystem actorSystem, String type) {
-    shardManager = actorSystem.actorOf(ShardManager.props(type));
-  }
-
-  @Override
-  public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) {
-    return new ListenerRegistrationProxy();
-  }
-
-  @Override
-  public DOMStoreTransactionChain createTransactionChain() {
-    return new TransactionChainProxy();
-  }
-
-  @Override
-  public DOMStoreReadTransaction newReadOnlyTransaction() {
-    return new TransactionProxy();
-  }
-
-  @Override
-  public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-    return new TransactionProxy();
-  }
-
-  @Override
-  public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-    return new TransactionProxy();
-  }
+public class DistributedDataStore implements DOMStore, SchemaContextListener {
+
+    private static final Logger
+        LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+
+
+    private final String type;
+    private final ActorContext actorContext;
+
+    public DistributedDataStore(ActorSystem actorSystem, String type) {
+        this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type))), type);
+    }
+
+    public DistributedDataStore(ActorContext actorContext, String type) {
+        this.type = type;
+        this.actorContext = actorContext;
+    }
+
+
+    @Override
+    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
+        InstanceIdentifier path, L listener,
+        AsyncDataBroker.DataChangeScope scope) {
+
+        ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props());
+
+        Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME,
+            new RegisterChangeListener(path, dataChangeListenerActor.path(),
+                AsyncDataBroker.DataChangeScope.BASE),
+            ActorContext.ASK_DURATION);
+
+        RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+        return new ListenerRegistrationProxy(reply.getListenerRegistrationPath());
+    }
+
+
+
+    @Override
+    public DOMStoreTransactionChain createTransactionChain() {
+        return new TransactionChainProxy(actorContext);
+    }
+
+    @Override
+    public DOMStoreReadTransaction newReadOnlyTransaction() {
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
+    }
+
+    @Override
+    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
+    }
+
+    @Override
+    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
+    }
+
+    @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
+        actorContext.getShardManager().tell(new UpdateSchemaContext(schemaContext), null);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerProxy.java
new file mode 100644 (file)
index 0000000..7c38ee5
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class ListenerProxy implements AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>{
+    private final ActorSelection listenerRegistrationActor;
+
+    public ListenerProxy(ActorSelection listenerRegistrationActor) {
+        this.listenerRegistrationActor = listenerRegistrationActor;
+    }
+
+    @Override public void onDataChanged(
+        AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+        throw new UnsupportedOperationException("onDataChanged");
+    }
+}
index c2fc8c0277472108abc4f72b5012445ef4937807..a548a885eb2c73600e013ccf3a0cd46fe34950d9 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorPath;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 
 /**
@@ -17,6 +18,13 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
  * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
  */
 public class ListenerRegistrationProxy implements ListenerRegistration {
+    private final ActorPath listenerRegistrationPath;
+
+    public ListenerRegistrationProxy(ActorPath listenerRegistrationPath) {
+
+        this.listenerRegistrationPath = listenerRegistrationPath;
+    }
+
     @Override
     public Object getInstance() {
         throw new UnsupportedOperationException("getInstance");
index d75edc7922f54ea46d72e4fd62fc4bc0e0aa3143..5b4f7ef8989711dffdf4cdd8fbe7d483165f23a7 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
@@ -46,89 +47,112 @@ import java.util.concurrent.Executors;
  */
 public class Shard extends UntypedProcessor {
 
-  public static final String DEFAULT_NAME = "default";
+    public static final String DEFAULT_NAME = "default";
 
-  private final ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+    private final ListeningExecutorService storeExecutor =
+        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
 
-  private final InMemoryDOMDataStore store;
+    private final InMemoryDOMDataStore store;
 
-  private final Map<Modification, DOMStoreThreePhaseCommitCohort> modificationToCohort = new HashMap<>();
+    private final Map<Modification, DOMStoreThreePhaseCommitCohort>
+        modificationToCohort = new HashMap<>();
 
-  private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+    private final LoggingAdapter log =
+        Logging.getLogger(getContext().system(), this);
 
-  private Shard(String name){
-    store = new InMemoryDOMDataStore(name, storeExecutor);
-  }
-
-  public static Props props(final String name) {
-    return Props.create(new Creator<Shard>() {
+    private Shard(String name) {
+        store = new InMemoryDOMDataStore(name, storeExecutor);
+    }
 
-      @Override
-      public Shard create() throws Exception {
-        return new Shard(name);
-      }
+    public static Props props(final String name) {
+        return Props.create(new Creator<Shard>() {
 
-    });
-  }
+            @Override
+            public Shard create() throws Exception {
+                return new Shard(name);
+            }
 
-  @Override
-  public void onReceive(Object message) throws Exception {
-    if (message instanceof CreateTransactionChain) {
-      createTransactionChain();
-    } else if (message instanceof RegisterChangeListener) {
-      registerChangeListener((RegisterChangeListener) message);
-    } else if (message instanceof UpdateSchemaContext) {
-      updateSchemaContext((UpdateSchemaContext) message);
-    } else if (message instanceof ForwardedCommitTransaction ) {
-      handleForwardedCommit((ForwardedCommitTransaction) message);
-    } else if (message instanceof Persistent){
-      commit((Persistent) message);
+        });
     }
-  }
-
-  private void commit(Persistent message) {
-    Modification modification = (Modification) message.payload();
-    DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(modification);
-    if(cohort == null){
-      log.error("Could not find cohort for modification : " + modification);
-      return;
+
+    @Override
+    public void onReceive(Object message) throws Exception {
+        if (message instanceof CreateTransactionChain) {
+            createTransactionChain();
+        } else if (message instanceof RegisterChangeListener) {
+            registerChangeListener((RegisterChangeListener) message);
+        } else if (message instanceof UpdateSchemaContext) {
+            updateSchemaContext((UpdateSchemaContext) message);
+        } else if (message instanceof ForwardedCommitTransaction) {
+            handleForwardedCommit((ForwardedCommitTransaction) message);
+        } else if (message instanceof Persistent) {
+            commit((Persistent) message);
+        }
     }
-    final ListenableFuture<Void> future = cohort.commit();
-    final ActorRef sender = getSender();
-    final ActorRef self = getSelf();
-    future.addListener(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          future.get();
-          sender.tell(new CommitTransactionReply(), self);
-        } catch (InterruptedException | ExecutionException e) {
-          log.error(e, "An exception happened when committing");
+
+    private void commit(Persistent message) {
+        Modification modification = (Modification) message.payload();
+        DOMStoreThreePhaseCommitCohort cohort =
+            modificationToCohort.remove(modification);
+        if (cohort == null) {
+            log.error(
+                "Could not find cohort for modification : " + modification);
+            return;
         }
-      }
-    }, getContext().dispatcher());
-  }
-
-  private void handleForwardedCommit(ForwardedCommitTransaction message) {
-    log.info("received forwarded transaction");
-    modificationToCohort.put(message.getModification(), message.getCohort());
-    getSelf().forward(Persistent.create(message.getModification()), getContext());
-  }
-
-  private void updateSchemaContext(UpdateSchemaContext message) {
-    store.onGlobalContextUpdated(message.getSchemaContext());
-  }
-
-  private void registerChangeListener(RegisterChangeListener registerChangeListener) {
-    org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration =
-            store.registerChangeListener(registerChangeListener.getPath(), registerChangeListener.getListener(), registerChangeListener.getScope());
-    ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(registration));
-    getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
-  }
-
-  private void createTransactionChain() {
-    DOMStoreTransactionChain chain = store.createTransactionChain();
-    ActorRef transactionChain = getContext().actorOf(ShardTransactionChain.props(chain));
-    getSender().tell(new CreateTransactionChainReply(transactionChain.path()), getSelf());
-  }
+        final ListenableFuture<Void> future = cohort.commit();
+        final ActorRef sender = getSender();
+        final ActorRef self = getSelf();
+        future.addListener(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    future.get();
+                    sender.tell(new CommitTransactionReply(), self);
+                } catch (InterruptedException | ExecutionException e) {
+                    log.error(e, "An exception happened when committing");
+                }
+            }
+        }, getContext().dispatcher());
+    }
+
+    private void handleForwardedCommit(ForwardedCommitTransaction message) {
+        log.info("received forwarded transaction");
+        modificationToCohort
+            .put(message.getModification(), message.getCohort());
+        getSelf().forward(Persistent.create(message.getModification()),
+            getContext());
+    }
+
+    private void updateSchemaContext(UpdateSchemaContext message) {
+        store.onGlobalContextUpdated(message.getSchemaContext());
+    }
+
+    private void registerChangeListener(
+        RegisterChangeListener registerChangeListener) {
+
+        ActorSelection listenerRegistrationActor = getContext()
+            .system().actorSelection(registerChangeListener.getDataChangeListenerPath());
+
+        AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>
+            listener = new ListenerProxy(listenerRegistrationActor);
+
+        org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+            registration =
+            store.registerChangeListener(registerChangeListener.getPath(),
+                listener, registerChangeListener.getScope());
+        ActorRef listenerRegistration =
+            getContext().actorOf(ListenerRegistration.props(registration));
+        getSender()
+            .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
+                getSelf());
+    }
+
+    private void createTransactionChain() {
+        DOMStoreTransactionChain chain = store.createTransactionChain();
+        ActorRef transactionChain =
+            getContext().actorOf(ShardTransactionChain.props(chain));
+        getSender()
+            .tell(new CreateTransactionChainReply(transactionChain.path()),
+                getSelf());
+    }
 }
index 8d8527a240fab4ebdeed00aae39882f108944b19..4e2369d3758596bd1217670f8f3ec5a2438db36d 100644 (file)
@@ -19,6 +19,7 @@ import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 
 import java.util.HashMap;
 import java.util.List;
@@ -92,6 +93,9 @@ public class ShardManager extends UntypedActor {
       } else {
         getSender().tell(new PrimaryNotFound(shardName), getSelf());
       }
+    } else if(message instanceof UpdateSchemaContext){
+        // FIXME : Notify all local shards of a context change
+        getContext().system().actorSelection(defaultShardPath).forward(message, getContext());
     }
   }
 
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
new file mode 100644 (file)
index 0000000..197b3b7
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorPath;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
+ */
+public class ThreePhaseCommitCohortProxy implements
+    DOMStoreThreePhaseCommitCohort{
+
+    private final List<ActorPath> cohortPaths;
+
+    public ThreePhaseCommitCohortProxy(List<ActorPath> cohortPaths) {
+
+        this.cohortPaths = cohortPaths;
+    }
+
+    @Override public ListenableFuture<Boolean> canCommit() {
+        throw new UnsupportedOperationException("canCommit");
+    }
+
+    @Override public ListenableFuture<Void> preCommit() {
+        throw new UnsupportedOperationException("preCommit");
+    }
+
+    @Override public ListenableFuture<Void> abort() {
+        throw new UnsupportedOperationException("abort");
+    }
+
+    @Override public ListenableFuture<Void> commit() {
+        throw new UnsupportedOperationException("commit");
+    }
+
+    public List<ActorPath> getCohortPaths() {
+        return Collections.unmodifiableList(this.cohortPaths);
+    }
+}
index 1ee0d89e6116837a1733848aae5f85e1cf02cd34..837ffc1b51dd8c5b2a80a3cf2f71a1076406648d 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
@@ -17,23 +18,32 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
  */
 public class TransactionChainProxy implements DOMStoreTransactionChain{
+    private final ActorContext actorContext;
+
+    public TransactionChainProxy(ActorContext actorContext) {
+        this.actorContext = actorContext;
+    }
+
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        throw new UnsupportedOperationException("newReadOnlyTransaction");
+        return new TransactionProxy(actorContext,
+            TransactionProxy.TransactionType.READ_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        throw new UnsupportedOperationException("newReadWriteTransaction");
+        return new TransactionProxy(actorContext,
+            TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        throw new UnsupportedOperationException("newWriteOnlyTransaction");
+        return new TransactionProxy(actorContext,
+            TransactionProxy.TransactionType.READ_WRITE);
     }
 
     @Override
     public void close() {
-        throw new UnsupportedOperationException("close");
+        throw new UnsupportedOperationException("close - not sure what to do here?");
     }
 }
index 609dea0b360819ebc576e636e2237b8cb9868172..32bb7d0951964975b850c8a1a685ce7d95c03f47 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
- *
+ * <p>
  * Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
  * the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
  * be created on each of those shards by the TransactionProxy
- *
+ *</p>
+ * <p>
  * The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
  * shards will be executed.
- *
+ * </p>
  */
 public class TransactionProxy implements DOMStoreReadWriteTransaction {
+
+    public enum TransactionType {
+        READ_ONLY,
+        WRITE_ONLY,
+        READ_WRITE
+    }
+
+    private static final AtomicLong counter = new AtomicLong();
+
+    private final TransactionType readOnly;
+    private final ActorContext actorContext;
+    private final Map<String, ActorSelection> remoteTransactionPaths = new HashMap<>();
+    private final String identifier;
+
+    public TransactionProxy(
+        ActorContext actorContext,
+        TransactionType readOnly) {
+
+        this.identifier = "transaction-" + counter.getAndIncrement();
+        this.readOnly = readOnly;
+        this.actorContext = actorContext;
+
+        Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION);
+        if(response instanceof CreateTransactionReply){
+            CreateTransactionReply reply = (CreateTransactionReply) response;
+            remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionPath()));
+        }
+    }
+
     @Override
-    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(InstanceIdentifier path) {
-        throw new UnsupportedOperationException("read");
+    public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
+        final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
+
+        Callable<Optional<NormalizedNode<?,?>>> call = new Callable() {
+
+            @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
+                Object response = actorContext
+                    .executeRemoteOperation(remoteTransaction, new ReadData(path),
+                        ActorContext.ASK_DURATION);
+                if(response instanceof ReadDataReply){
+                    ReadDataReply reply = (ReadDataReply) response;
+                    //FIXME : A cast should not be required here ???
+                    return (Optional<NormalizedNode<?, ?>>) Optional.of(reply.getNormalizedNode());
+                }
+
+                return Optional.absent();
+            }
+        };
+
+        ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
+            future = ListenableFutureTask.create(call);
+
+        //FIXME : Use a thread pool here
+        Executors.newSingleThreadExecutor().submit(future);
+
+        return future;
     }
 
     @Override
     public void write(InstanceIdentifier path, NormalizedNode<?, ?> data) {
-        throw new UnsupportedOperationException("write");
+        final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
+        remoteTransaction.tell(new WriteData(path, data), null);
     }
 
     @Override
     public void merge(InstanceIdentifier path, NormalizedNode<?, ?> data) {
-        throw new UnsupportedOperationException("merge");
+        final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
+        remoteTransaction.tell(new MergeData(path, data), null);
     }
 
     @Override
     public void delete(InstanceIdentifier path) {
-        throw new UnsupportedOperationException("delete");
+        final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
+        remoteTransaction.tell(new DeleteData(path), null);
     }
 
     @Override
     public DOMStoreThreePhaseCommitCohort ready() {
-        throw new UnsupportedOperationException("ready");
+        List<ActorPath> cohortPaths = new ArrayList<>();
+
+        for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) {
+            Object result = actorContext.executeRemoteOperation(remoteTransaction,
+                new ReadyTransaction(),
+                ActorContext.ASK_DURATION
+            );
+
+            if(result instanceof ReadyTransactionReply){
+                ReadyTransactionReply reply = (ReadyTransactionReply) result;
+                cohortPaths.add(reply.getCohortPath());
+            }
+        }
+
+        return new ThreePhaseCommitCohortProxy(cohortPaths);
     }
 
     @Override
     public Object getIdentifier() {
-        throw new UnsupportedOperationException("getIdentifier");
+        return this.identifier;
     }
 
     @Override
     public void close() {
-        throw new UnsupportedOperationException("close");
+        for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) {
+            remoteTransaction.tell(new CloseTransaction(), null);
+        }
+    }
+
+    private ActorSelection remoteTransactionFromIdentifier(InstanceIdentifier path){
+        String shardName = shardNameFromIdentifier(path);
+        return remoteTransactionPaths.get(shardName);
+    }
+
+    private String shardNameFromIdentifier(InstanceIdentifier path){
+        return Shard.DEFAULT_NAME;
     }
 }
index 0123a701471e29ef5a81ad41dc3fb2304e4cbf62..7c9e4f0665a2710e2ed4b28f4792e6a043a48800 100644 (file)
@@ -8,32 +8,34 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import akka.actor.ActorPath;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class RegisterChangeListener {
-  private final InstanceIdentifier path;
-  private final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener;
-  private final AsyncDataBroker.DataChangeScope scope;
+    private final InstanceIdentifier path;
+    private final ActorPath dataChangeListenerPath;
+    private final AsyncDataBroker.DataChangeScope scope;
 
 
-  public RegisterChangeListener(InstanceIdentifier path, AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener, AsyncDataBroker.DataChangeScope scope) {
-    this.path = path;
-    this.listener = listener;
-    this.scope = scope;
-  }
+    public RegisterChangeListener(InstanceIdentifier path,
+        ActorPath dataChangeListenerPath,
+        AsyncDataBroker.DataChangeScope scope) {
+        this.path = path;
+        this.dataChangeListenerPath = dataChangeListenerPath;
+        this.scope = scope;
+    }
 
-  public InstanceIdentifier getPath() {
-    return path;
-  }
+    public InstanceIdentifier getPath() {
+        return path;
+    }
 
-  public AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> getListener() {
-    return listener;
-  }
 
-  public AsyncDataBroker.DataChangeScope getScope() {
-    return scope;
-  }
+    public AsyncDataBroker.DataChangeScope getScope() {
+        return scope;
+    }
+
+    public ActorPath getDataChangeListenerPath() {
+        return dataChangeListenerPath;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
new file mode 100644 (file)
index 0000000..ba4d4de
--- /dev/null
@@ -0,0 +1,144 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static akka.pattern.Patterns.ask;
+
+/**
+ * The ActorContext class contains utility methods which could be used by
+ * non-actors (like DistributedDataStore) to work with actors a little more
+ * easily. An ActorContext can be freely passed around to local object instances
+ * but should not be passed to actors especially remote actors
+ */
+public class ActorContext {
+    private static final Logger
+        LOG = LoggerFactory.getLogger(ActorContext.class);
+
+    public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
+    public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
+
+    private final ActorSystem actorSystem;
+    private final ActorRef shardManager;
+
+    public ActorContext(ActorSystem actorSystem, ActorRef shardManager){
+        this.actorSystem = actorSystem;
+        this.shardManager = shardManager;
+    }
+
+    public ActorSystem getActorSystem() {
+        return actorSystem;
+    }
+
+    public ActorRef getShardManager() {
+        return shardManager;
+    }
+
+    public ActorSelection actorSelection(String actorPath){
+        return actorSystem.actorSelection(actorPath);
+    }
+
+    public ActorSelection actorSelection(ActorPath actorPath){
+        return actorSystem.actorSelection(actorPath);
+    }
+
+
+    /**
+     * Finds the primary for a given shard
+     *
+     * @param shardName
+     * @return
+     */
+    public ActorSelection findPrimary(String shardName) {
+        Object result = executeLocalOperation(shardManager,
+            new FindPrimary(shardName), ASK_DURATION);
+
+        if(result instanceof PrimaryFound){
+            PrimaryFound found = (PrimaryFound) result;
+
+            LOG.error("Primary found {}", found.getPrimaryPath());
+
+            return actorSystem.actorSelection(found.getPrimaryPath());
+        }
+        throw new RuntimeException("primary was not found");
+    }
+
+    /**
+     * Executes an operation on a local actor and wait for it's response
+     * @param actor
+     * @param message
+     * @param duration
+     * @return The response of the operation
+     */
+    public Object executeLocalOperation(ActorRef actor, Object message,
+        FiniteDuration duration){
+        Future<Object> future =
+            ask(actor, message, new Timeout(duration));
+
+        try {
+            return Await.result(future, AWAIT_DURATION);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Execute an operation on a remote actor and wait for it's response
+     * @param actor
+     * @param message
+     * @param duration
+     * @return
+     */
+    public Object executeRemoteOperation(ActorSelection actor, Object message,
+        FiniteDuration duration){
+        Future<Object> future =
+            ask(actor, message, new Timeout(duration));
+
+        try {
+            return Await.result(future, AWAIT_DURATION);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Execute an operation on the primary for a given shard
+     * <p>
+     *     This method first finds the primary for a given shard ,then sends
+     *     the message to the remote shard and waits for a response
+     * </p>
+     * @param shardName
+     * @param message
+     * @param duration
+     * @throws java.lang.RuntimeException when a primary is not found or if the message to the remote shard fails or times out
+     *
+     * @return
+     */
+    public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
+        ActorSelection primary = findPrimary(shardName);
+
+        return executeRemoteOperation(primary, message, duration);
+    }
+
+}
index 45492fd71495cbe235b852a055aca0fe5818cc99..3a74a4ca7656dc01ae650d011595335474297de1 100644 (file)
@@ -1,6 +1,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
+import akka.actor.Props;
 import junit.framework.Assert;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -15,10 +22,24 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 public class DistributedDataStoreTest extends AbstractActorTest{
 
     private DistributedDataStore distributedDataStore;
+    private MockActorContext mockActorContext;
+    private ActorRef doNothingActorRef;
 
     @org.junit.Before
     public void setUp() throws Exception {
-        distributedDataStore = new DistributedDataStore(getSystem(), "config");
+        final Props props = Props.create(DoNothingActor.class);
+
+        doNothingActorRef = getSystem().actorOf(props);
+
+        mockActorContext = new MockActorContext(getSystem(), doNothingActorRef);
+        distributedDataStore = new DistributedDataStore(mockActorContext, "config");
+        distributedDataStore.onGlobalContextUpdated(
+            TestModel.createTestContext());
+
+        // Make CreateTransactionReply as the default response. Will need to be
+        // tuned if a specific test requires some other response
+        mockActorContext.setExecuteShardOperationResponse(
+            new CreateTransactionReply(doNothingActorRef.path()));
     }
 
     @org.junit.After
@@ -28,8 +49,9 @@ public class DistributedDataStoreTest extends AbstractActorTest{
 
     @org.junit.Test
     public void testRegisterChangeListener() throws Exception {
+        mockActorContext.setExecuteShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()));
         ListenerRegistration registration =
-                distributedDataStore.registerChangeListener(InstanceIdentifier.builder().build(), new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
+                distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
             @Override
             public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
                 throw new UnsupportedOperationException("onDataChanged");
index a9d8042ce238ffdc3be211a7625f50be957dc9f1..48365fa1a06a90c87131ab6bda0e71db78595ee6 100644 (file)
@@ -63,7 +63,7 @@ public class ShardTest extends AbstractActorTest{
 
           subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-          subject.tell(new RegisterChangeListener(InstanceIdentifier.builder().build(), noOpDataChangeListener() , AsyncDataBroker.DataChangeScope.BASE), getRef());
+          subject.tell(new RegisterChangeListener(TestModel.TEST_PATH, getRef().path() , AsyncDataBroker.DataChangeScope.BASE), getRef());
 
           final String out = new ExpectMsg<String>("match hint") {
             // do not put code outside this method, will run afterwards
@@ -97,4 +97,4 @@ public class ShardTest extends AbstractActorTest{
       }
     };
   }
-}
\ No newline at end of file
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java
new file mode 100644 (file)
index 0000000..6d057a4
--- /dev/null
@@ -0,0 +1,225 @@
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+import java.util.List;
+
+public class TransactionProxyTest extends AbstractActorTest {
+
+    @Test
+    public void testRead() throws Exception {
+        final Props props = Props.create(DoNothingActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+        actorContext.setExecuteRemoteOperationResponse("message");
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY);
+
+
+        ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
+            transactionProxy.read(TestModel.TEST_PATH);
+
+        Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
+
+        Assert.assertFalse(normalizedNodeOptional.isPresent());
+
+        actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
+            ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
+
+        read = transactionProxy.read(TestModel.TEST_PATH);
+
+        normalizedNodeOptional = read.get();
+
+        Assert.assertTrue(normalizedNodeOptional.isPresent());
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        final Props props = Props.create(MessageCollectorActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+        actorContext.setExecuteRemoteOperationResponse("message");
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY);
+
+        transactionProxy.write(TestModel.TEST_PATH,
+            ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+
+        ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+        Object messages = testContext
+            .executeLocalOperation(actorRef, "messages",
+                ActorContext.ASK_DURATION);
+
+        Assert.assertNotNull(messages);
+
+        Assert.assertTrue(messages instanceof List);
+
+        List<Object> listMessages = (List<Object>) messages;
+
+        Assert.assertEquals(1, listMessages.size());
+
+        Assert.assertTrue(listMessages.get(0) instanceof WriteData);
+    }
+
+    @Test
+    public void testMerge() throws Exception {
+        final Props props = Props.create(MessageCollectorActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+        actorContext.setExecuteRemoteOperationResponse("message");
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY);
+
+        transactionProxy.merge(TestModel.TEST_PATH,
+            ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+
+        ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+        Object messages = testContext
+            .executeLocalOperation(actorRef, "messages",
+                ActorContext.ASK_DURATION);
+
+        Assert.assertNotNull(messages);
+
+        Assert.assertTrue(messages instanceof List);
+
+        List<Object> listMessages = (List<Object>) messages;
+
+        Assert.assertEquals(1, listMessages.size());
+
+        Assert.assertTrue(listMessages.get(0) instanceof MergeData);
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        final Props props = Props.create(MessageCollectorActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+        actorContext.setExecuteRemoteOperationResponse("message");
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY);
+
+        transactionProxy.delete(TestModel.TEST_PATH);
+
+        ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+        Object messages = testContext
+            .executeLocalOperation(actorRef, "messages",
+                ActorContext.ASK_DURATION);
+
+        Assert.assertNotNull(messages);
+
+        Assert.assertTrue(messages instanceof List);
+
+        List<Object> listMessages = (List<Object>) messages;
+
+        Assert.assertEquals(1, listMessages.size());
+
+        Assert.assertTrue(listMessages.get(0) instanceof DeleteData);
+    }
+
+    @Test
+    public void testReady() throws Exception {
+        final Props props = Props.create(DoNothingActor.class);
+        final ActorRef doNothingActorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(doNothingActorRef.path()));
+        actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY);
+
+
+        DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+        Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+        ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+        Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
+
+    }
+
+    @Test
+    public void testGetIdentifier(){
+        final Props props = Props.create(DoNothingActor.class);
+        final ActorRef doNothingActorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(
+            new CreateTransactionReply(doNothingActorRef.path()));
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY);
+
+        Assert.assertNotNull(transactionProxy.getIdentifier());
+    }
+
+    @Test
+    public void testClose(){
+        final Props props = Props.create(MessageCollectorActor.class);
+        final ActorRef actorRef = getSystem().actorOf(props);
+
+        final MockActorContext actorContext = new MockActorContext(this.getSystem());
+        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+        actorContext.setExecuteRemoteOperationResponse("message");
+
+        TransactionProxy transactionProxy =
+            new TransactionProxy(actorContext,
+                TransactionProxy.TransactionType.READ_ONLY);
+
+        transactionProxy.close();
+
+        ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+        Object messages = testContext
+            .executeLocalOperation(actorRef, "messages",
+                ActorContext.ASK_DURATION);
+
+        Assert.assertNotNull(messages);
+
+        Assert.assertTrue(messages instanceof List);
+
+        List<Object> listMessages = (List<Object>) messages;
+
+        Assert.assertEquals(1, listMessages.size());
+
+        Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DoNothingActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/DoNothingActor.java
new file mode 100644 (file)
index 0000000..819cfd0
--- /dev/null
@@ -0,0 +1,17 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.UntypedActor;
+
+public class DoNothingActor extends UntypedActor {
+
+    @Override public void onReceive(Object message) throws Exception {
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MessageCollectorActor.java
new file mode 100644 (file)
index 0000000..f75aa54
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.UntypedActor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * MessageCollectorActor collects messages as it receives them. It can send
+ * those collected messages to any sender which sends it the "messages" message
+ * <p>
+ *     This class would be useful as a mock to test whether messages were sent
+ *     to a remote actor or not.
+ * </p>
+ */
+public class MessageCollectorActor extends UntypedActor {
+    private List<Object> messages = new ArrayList<>();
+
+    @Override public void onReceive(Object message) throws Exception {
+        if(message instanceof String){
+            if("messages".equals(message)){
+                getSender().tell(messages, getSelf());
+            }
+        } else {
+            messages.add(message);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockActorContext.java
new file mode 100644 (file)
index 0000000..fe62516
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import scala.concurrent.duration.FiniteDuration;
+
+public class MockActorContext extends ActorContext {
+
+    private Object executeShardOperationResponse;
+    private Object executeRemoteOperationResponse;
+    private Object executeLocalOperationResponse;
+
+    public MockActorContext(ActorSystem actorSystem) {
+        super(actorSystem, null);
+    }
+
+    public MockActorContext(ActorSystem actorSystem, ActorRef shardManager) {
+        super(actorSystem, shardManager);
+    }
+
+
+    @Override public Object executeShardOperation(String shardName,
+        Object message, FiniteDuration duration) {
+        return executeShardOperationResponse;
+    }
+
+    @Override public Object executeRemoteOperation(ActorSelection actor,
+        Object message, FiniteDuration duration) {
+        return executeRemoteOperationResponse;
+    }
+
+    @Override public ActorSelection findPrimary(String shardName) {
+        return null;
+    }
+
+    public void setExecuteShardOperationResponse(Object response){
+        executeShardOperationResponse = response;
+    }
+
+    public void setExecuteRemoteOperationResponse(Object response){
+        executeRemoteOperationResponse = response;
+    }
+
+    public void setExecuteLocalOperationResponse(
+        Object executeLocalOperationResponse) {
+        this.executeLocalOperationResponse = executeLocalOperationResponse;
+    }
+
+
+}