format("private final %s oldModule;\n", abstractFQN.getTypeName())+
format("private final %s oldInstance;\n", AutoCloseable.class.getCanonicalName())+
format("private %s instance;\n", AutoCloseable.class.getCanonicalName())+
- format("private final %s dependencyResolver;\n", DependencyResolver.class.getCanonicalName())+
+ format("protected final %s dependencyResolver;\n", DependencyResolver.class.getCanonicalName())+
format("private final %s identifier;\n", ModuleIdentifier.class.getCanonicalName())+
"@Override\n"+
format("public %s getIdentifier() {\n", ModuleIdentifier.class.getCanonicalName())+
--- /dev/null
+package org.opendaylight.controller.md.sal.binding.api;
+
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+public interface BindingTransactionChain extends TransactionChain<InstanceIdentifier<?>, DataObject> {
+
+ @Override
+ ReadOnlyTransaction newReadOnlyTransaction();
+
+ @Override
+ ReadWriteTransaction newReadWriteTransaction();
+
+ @Override
+ WriteTransaction newWriteOnlyTransaction();
+
+}
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainFactory;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
* <p>
* For more information on usage, please see the documentation in {@link AsyncDataBroker}.
*/
-public interface DataBroker extends AsyncDataBroker<InstanceIdentifier<?>, DataObject, DataChangeListener>, BindingService {
+public interface DataBroker extends AsyncDataBroker<InstanceIdentifier<?>, DataObject, DataChangeListener>, BindingService, TransactionChainFactory<InstanceIdentifier<?>, DataObject> {
@Override
ReadOnlyTransaction newReadOnlyTransaction();
void unregisterFunctionality(ProviderFunctionality functionality);
}
+ /**
+ * Represents an RPC implementation registration. Users should call the
+ * {@link ObjectRegistration#close close} method when the registration is no longer needed.
+ *
+ * @param <T> the implemented RPC service interface
+ */
public interface RpcRegistration<T extends RpcService> extends ObjectRegistration<T> {
+ /**
+ * Returns the implemented RPC service interface.
+ */
Class<T> getServiceType();
}
+ /**
+ * Represents a routed RPC implementation registration. Users should call the
+ * {@link RoutedRegistration#close close} method when the registration is no longer needed.
+ *
+ * @param <T> the implemented RPC service interface
+ */
public interface RoutedRpcRegistration<T extends RpcService> extends RpcRegistration<T>,
RoutedRegistration<Class<? extends BaseIdentity>, InstanceIdentifier<?>, T> {
import org.opendaylight.yangtools.yang.binding.RpcService;
/**
- * Base interface defining contract for retrieving MD-SAL
- * version of RpcServices
+ * Provides access to registered Remote Procedure Call (RPC) service implementations. The RPCs are
+ * defined in YANG models.
+ * <p>
+ * RPC implementations are registered using the {@link RpcProviderRegistry}.
*
*/
public interface RpcConsumerRegistry extends BindingAwareService {
/**
- * Returns a session specific instance (implementation) of requested
- * YANG module implementation / service provided by consumer.
+ * Returns an implementation of a requested RPC service.
*
- * @return Session specific implementation of service
+ * <p>
+ * The returned instance is not an actual implementation of the RPC service
+ * interface, but a proxy implementation of the interface that forwards to
+ * an actual implementation, if any.
+ * <p>
+ *
+ * The following describes the behavior of the proxy when invoking RPC methods:
+ * <ul>
+ * <li>If an actual implementation is registered with the MD-SAL, all invocations are
+ * forwarded to the registered implementation.</li>
+ * <li>If no actual implementation is registered, all invocations will fail by
+ * throwing {@link IllegalStateException}.</li>
+ * <li>Prior to invoking the actual implementation, the method arguments are are validated.
+ * If any are invalid, an {@link IllegalArgumentException} is thrown.
+ * </ul>
+ *
+ * The returned proxy is automatically updated with the most recent
+ * registered implementation.
+ * <p>
+ * The generated RPC method APIs require implementors to return a {@link java.util.concurrent.Future Future}
+ * instance that wraps the {@link org.opendaylight.yangtools.yang.common.RpcResult RpcResult}. Since
+ * RPC methods may be implemented asynchronously, callers should avoid blocking on the
+ * {@link java.util.concurrent.Future Future} result. Instead, it is recommended to use
+ * {@link com.google.common.util.concurrent.JdkFutureAdapters#listenInPoolThread(java.util.concurrent.Future)}
+ * or {@link com.google.common.util.concurrent.JdkFutureAdapters#listenInPoolThread(java.util.concurrent.Future, java.util.concurrent.Executor)}
+ * to listen for Rpc Result. This will asynchronously listen for future result in executor and
+ * will not block current thread.
+ *
+ * <pre>
+ * final Future<RpcResult<SomeRpcOutput>> future = someRpcService.someRpc( ... );
+ * Futures.addCallback(JdkFutureAdapters.listenInThreadPool(future), new FutureCallback<RpcResult<SomeRpcOutput>>() {
+ *
+ * public void onSuccess(RpcResult<SomeRpcOutput> result) {
+ * // process result ...
+ * }
+ *
+ * public void onFailure(Throwable t) {
+ * // RPC failed
+ * }
+ * );
+ * </pre>
+ * @param serviceInterface the interface of the RPC Service. Typically this is an interface generated
+ * from a YANG model.
+ * @return the proxy for the requested RPC service. This method never returns null.
*/
- <T extends RpcService> T getRpcService(Class<T> module);
+ <T extends RpcService> T getRpcService(Class<T> serviceInterface);
}
import org.opendaylight.yangtools.yang.binding.RpcService;
/**
- * Interface defining provider's access to the Rpc Registry which could be used
- * to register their implementations of service to the MD-SAL.
+ * Provides a registry for Remote Procedure Call (RPC) service implementations. The RPCs are
+ * defined in YANG models.
+ * <p>
+ * There are 2 types of RPCs:
+ * <ul>
+ * <li>Global</li>
+ * <li>Routed</li>
+ * </ul>
*
- * @author ttkacik
+ * <h2>Global RPC</h2>
+ * <p>
+ * An RPC is global if there is intended to be only 1 registered implementation. A global RPC is not
+ * explicitly declared as such, essentially any RPC that is not defined to be routed is considered global.
+ * <p>
+ * Global RPCs are registered using the
+ * {@link #addRpcImplementation(Class, RpcService)} method.
*
+ * <h2>Routed RPC</h2>
+ * <p>
+ * MD-SAL supports routing of RPC between multiple implementations where the appropriate
+ * implementation is selected at run time based on the content of the RPC message as described in
+ * YANG model.
+ * <p>
+ * RPC routing is based on:
+ * <ul>
+ * <li><b>Route identifier</b> -
+ * An {@link org.opendaylight.yangtools.yang.binding.InstanceIdentifier InstanceIdentifier} value
+ * which is part of the RPC input. This value is used to select the correct
+ * implementation at run time.</li>
+ * <li><b>Context Type</b> - A YANG-defined construct which constrains the subset of
+ * valid route identifiers for a particular RPC.</li>
+ * </ul>
+ *
+ * <h3>Context type</h3>
+ * <p>
+ * A context type is modeled in YANG using a combination of a YANG <code>identity</code>
+ * and Opendaylight specific extensions from <code>yang-ext</code> module. These extensions are:
+ * <ul>
+ * <li><b>context-instance</b> - This is used in the data tree part of a YANG model to
+ * define a context type that associates nodes with a specified context <code>identity</code>.
+ * Instance identifiers that reference these nodes are valid route identifiers for RPCs that
+ * reference this context type.</li>
+ * <li><b>context-reference</b> - This is used in RPC input to mark a leaf of type
+ * <code>instance-identifier</code> as a reference to the particular context type defined by the
+ * specified context <code>identity</code>. The value of this
+ * leaf is used by the RPC broker at run time to route the RPC request to the correct implementation.
+ * Note that <code>context-reference</code> may only be used on leaf elements of type
+ * <code>instance-identifier</code> or a type derived from <code>instance-identifier</code>.</li>
+ * </ul>
+ *
+ *
+ * <h3>Routed RPC example</h3>
+ * <p>
+ * <h5>1. Defining a Context Type</h5>
+ * <p>
+ * The following snippet declares a simple YANG <code>identity</code> named <code>example-context</code>:
+ *
+ * <pre>
+ * module example {
+ * ...
+ * identity example-context {
+ * description "Identity used to define an example-context type";
+ * }
+ * ...
+ * }
+ * </pre>
+ * <p>
+ * We then use the declared identity to define a context type by using it in combination
+ * with the <code>context-instance</code> YANG extension. We'll associate the context type
+ * with a list element in the data tree. This defines the set of nodes whose instance
+ * identifiers are valid for the <code>example-context</code> context type.
+ * <p>
+ * The following YANG snippet imports the <code>yang-ext</code> module and defines the list
+ * element named <code>item</code> inside a container named <code>foo</code>:
+ *
+ * <pre>
+ * module foo {
+ * ...
+ * import yang-ext {prefix ext;}
+ * ...
+ * container foo {
+ * list item {
+ * key "id";
+ * leaf id {type string;}
+ * ext:context-instance "example-context";
+ * }
+ * }
+ * ...
+ * }
+ * </pre>
+ * <p>
+ * The statement <code>ext:context-instance "example-context";</code> inside the list element
+ * declares that any instance identifier referencing <code>item</code> in the data
+ * tree is valid for <code>example-context</code>. For example, the following instance
+ * identifier:
+ * <pre>
+ * InstanceIdentifier.create(Foo.class).child(Item.class,new ItemKey("Foo"))
+ * </pre>
+ * is valid for <code>example-context</code>. However the following:
+ * <pre>
+ * InstanceIdentifier.create(Example.class)
+ * </pre>
+ * is not valid.
+ * <p>
+ * So using an <code>identity</code> in combination with <code>context-instance</code> we
+ * have effectively defined a context type that can be referenced in a YANG RPC input.
+ *
+ * <h5>2. Defining an RPC to use the Context Type</h5>
+ * <p>
+ * To define an RPC to be routed based on the context type we need to add an input leaf element
+ * that references the context type which will hold an instance identifier value to be
+ * used to route the RPC.
+ * <p>
+ * The following snippet defines an RPC named <code>show-item</code> with 2 leaf elements
+ * as input: <code>item</code> of type <code>instance-identifier</code> and <code>description</code>:
+ *
+ * <pre>
+ * module foo {
+ * ...
+ * import yang-ext {prefix ext;}
+ * ...
+ * rpc show-item {
+ * input {
+ * leaf item {
+ * type instance-identifier;
+ * ext:context-reference example-context;
+ * }
+ * leaf description {
+ * type "string";
+ * }
+ * }
+ * }
+ * }
+ * </pre>
+ * <p>
+ * We mark the <code>item</code> leaf with a <code>context-reference</code> statement that
+ * references the <code>example-context</code> context type. RPC calls will then be routed
+ * based on the instance identifier value contained in <code>item</code>. Only instance
+ * identifiers that point to a <code>foo/item</code> node are valid as input.
+ * <p>
+ * The generated RPC Service interface for the module is:
+ *
+ * <pre>
+ * interface FooService implements RpcService {
+ * Future<RpcResult<Void>> showItem(ShowItemInput input);
+ * }
+ * </pre>
+ * <p>
+ * For constructing the RPC input, there are generated classes ShowItemInput and ShowItemInputBuilder.
+ *
+ * <h5>3. Registering a routed RPC implementation</h5>
+ * <p>
+ * To register a routed implementation for the <code>show-item</code> RPC, we must use the
+ * {@link #addRoutedRpcImplementation(Class, RpcService)} method. This
+ * will return a {@link RoutedRpcRegistration} instance which can then be used to register /
+ * unregister routed paths associated with the registered implementation.
+ * <p>
+ * The following snippet registers <code>myImpl</code> as the RPC implementation for an
+ * <code>item</code> with key <code>"foo"</code>:
+ * <pre>
+ * // Create the instance identifier path for item "foo"
+ * InstanceIdentifier path = InstanceIdentifier.create(Foo.class).child(Item.class, new ItemKey("foo"));
+ *
+ * // Register myImpl as the implementation for the FooService RPC interface
+ * RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(FooService.class, myImpl);
+ *
+ * // Now register for the context type and specific path ID. The context type is specified by the
+ * // YANG-generated class for the example-context identity.
+ * reg.registerPath(ExampleContext.class, path);
+ * </pre>
+ * <p>
+ * It is also possible to register the same implementation for multiple paths:
+ *
+ * <pre>
+ * InstanceIdentifier one = InstanceIdentifier.create(Foo.class).child(Item.class, new ItemKey("One"));
+ * InstanceIdentifier two = InstanceIdentifier.create(Foo.class).child(Item.class, new ItemKey("Two"));
+ *
+ * RoutedRpcRegistration reg = rpcRegistry.addRoutedRpcImplementation(FooService.class, myImpl);
+ * reg.registerPath(ExampleContext.class, one);
+ * reg.registerPath(ExampleContext.class, two);
+ * </pre>
+ *
+ * <p>
+ * When another client invokes the <code>showItem(ShowItemInput)</code> method on the proxy instance
+ * retrieved via {@link RpcConsumerRegistry#getRpcService(Class)}, the proxy will inspect the
+ * arguments in ShowItemInput, extract the InstanceIdentifier value of the <code>item</code> leaf and select
+ * the implementation whose registered path matches the InstanceIdentifier value of the <code>item</code> leaf.
+ *
+ * <h2>Notes for RPC Implementations</h2>
+ *
+ * <h3>RpcResult</h3>
+ * <p>
+ * The generated interfaces require implementors to return
+ * {@link java.util.concurrent.Future Future}<{@link org.opendaylight.yangtools.yang.common.RpcResult RpcResult}<{RpcName}Output>> instances.
+ *
+ * Implementations should do processing of RPC calls asynchronously and update the
+ * returned {@link java.util.concurrent.Future Future} instance when processing is complete.
+ * However using {@link com.google.common.util.concurrent.Futures#immediateFuture(Object) Futures.immediateFuture}
+ * is valid only if the result is immediately available and asynchronous processing is unnecessary and
+ * would only introduce additional complexity.
+ *
+ * <p>
+ * The {@link org.opendaylight.yangtools.yang.common.RpcResult RpcResult} is a generic
+ * wrapper for the RPC output payload, if any, and also allows for attaching error or
+ * warning information (possibly along with the payload) should the RPC processing partially
+ * or completely fail. This is intended to provide additional human readable information
+ * for users of the API and to transfer warning / error information across the system
+ * so it may be visible via other external APIs such as Restconf.
+ * <p>
+ * It is recommended to use the {@link org.opendaylight.yangtools.yang.common.RpcResult RpcResult}
+ * for conveying appropriate error information
+ * on failure rather than purposely throwing unchecked exceptions if at all possible.
+ * While unchecked exceptions will fail the returned {@link java.util.concurrent.Future Future},
+ * using the intended RpcResult to convey the error information is more user-friendly.
*/
public interface RpcProviderRegistry extends //
RpcConsumerRegistry, //
RouteChangePublisher<RpcContextIdentifier, InstanceIdentifier<?>> {
/**
- * Registers a global RpcService implementation.
+ * Registers a global implementation of the provided RPC service interface.
+ * All methods of the interface are required to be implemented.
+ *
+ * @param serviceInterface the YANG-generated interface of the RPC Service for which to register.
+ * @param implementation "the implementation of the RPC service interface.
+ * @return an RpcRegistration instance that should be used to unregister the RPC implementation
+ * when no longer needed by calling {@link RpcRegistration#close()}.
*
- * @param type
- * @param implementation
- * @return
+ * @throws IllegalStateException
+ * if the supplied RPC interface is a routed RPC type.
*/
- <T extends RpcService> RpcRegistration<T> addRpcImplementation(Class<T> type, T implementation)
+ <T extends RpcService> RpcRegistration<T> addRpcImplementation(Class<T> serviceInterface, T implementation)
throws IllegalStateException;
/**
+ * Registers an implementation of the given routed RPC service interface.
+ * <p>
+ * See the {@link RpcProviderRegistry class} documentation for information and example on
+ * how to use routed RPCs.
*
- * Register a Routed RpcService where routing is determined on annotated
- * (in YANG model) context-reference and value of annotated leaf.
- *
- * @param type
- * Type of RpcService, use generated interface class, not your
- * implementation class
- * @param implementation
- * Implementation of RpcService
- * @return Registration object for routed Rpc which could be used to unregister
+ * @param serviceInterface the YANG-generated interface of the RPC Service for which to register.
+ * @param implementation the implementation instance to register.
+ * @return a RoutedRpcRegistration instance which can be used to register paths for the RPC
+ * implementation via invoking {@link RoutedRpcRegistration#registerPath(....).
+ * {@link RoutedRpcRegistration#close()} should be called to unregister the implementation
+ * and all previously registered paths when no longer needed.
*
* @throws IllegalStateException
+ * if the supplied RPC interface is not a routed RPC type.
*/
- <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> type, T implementation)
+ <T extends RpcService> RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> serviceInterface,
+ T implementation)
throws IllegalStateException;
}
*/
package org.opendaylight.controller.md.sal.binding.impl;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
public abstract class AbstractForwardedDataBroker implements Delegator<DOMDataBroker>, DomForwardedBroker,
SchemaContextListener, AutoCloseable {
protected Map<InstanceIdentifier<?>, DataObject> toBinding(
final Map<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, ? extends NormalizedNode<?, ?>> normalized) {
Map<InstanceIdentifier<?>, DataObject> newMap = new HashMap<>();
- for (Map.Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, ? extends NormalizedNode<?, ?>> entry : normalized
- .entrySet()) {
+
+ for (Map.Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, ? extends NormalizedNode<?, ?>> entry : sortedEntries(normalized)) {
try {
Optional<Entry<InstanceIdentifier<? extends DataObject>, DataObject>> potential = getCodec().toBinding(
entry);
return newMap;
}
+ private static <T> Iterable<Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier,T>> sortedEntries(final Map<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, T> map) {
+ ArrayList<Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, T>> entries = new ArrayList<>(map.entrySet());
+ Collections.sort(entries, new Comparator<Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, T>>() {
+
+ @Override
+ public int compare(final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, T> left,
+ final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, T> right) {
+ int leftSize = Iterables.size(left.getKey().getPathArguments());
+ int rightSize = Iterables.size(right.getKey().getPathArguments());
+ return Integer.compare(leftSize, rightSize);
+ }
+ });
+ return entries;
+ }
+
protected Set<InstanceIdentifier<?>> toBinding(
final Set<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> normalized) {
Set<InstanceIdentifier<?>> hashSet = new HashSet<>();
*/
package org.opendaylight.controller.md.sal.binding.impl;
-import java.util.ArrayList;
-import java.util.EnumMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
-import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
-import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.yangtools.concepts.Delegator;
+import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-public class AbstractForwardedTransaction<T extends AsyncTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>>>
- implements Delegator<T> {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractForwardedTransaction.class);
+abstract class AbstractForwardedTransaction<T extends AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>>>
+ implements Delegator<T>, Identifiable<Object> {
+
private final T delegate;
- private final static CacheBuilder<Object, Object> CACHE_BUILDER = CacheBuilder.newBuilder()
- .expireAfterWrite(10, TimeUnit.MILLISECONDS).maximumSize(100);
private final BindingToNormalizedNodeCodec codec;
- private final EnumMap<LogicalDatastoreType, Cache<InstanceIdentifier<?>, DataObject>> cacheMap;
- protected AbstractForwardedTransaction(final T delegate, final BindingToNormalizedNodeCodec codec) {
- super();
- this.delegate = delegate;
- this.codec = codec;
+ public AbstractForwardedTransaction(final T delegateTx, final BindingToNormalizedNodeCodec codec) {
+ this.delegate = Preconditions.checkNotNull(delegateTx, "Delegate must not be null");
+ this.codec = Preconditions.checkNotNull(codec, "Codec must not be null");
+ }
- this.cacheMap = new EnumMap<>(LogicalDatastoreType.class);
- cacheMap.put(LogicalDatastoreType.OPERATIONAL, CACHE_BUILDER.<InstanceIdentifier<?>, DataObject> build());
- cacheMap.put(LogicalDatastoreType.CONFIGURATION, CACHE_BUILDER.<InstanceIdentifier<?>, DataObject> build());
+ @Override
+ public final Object getIdentifier() {
+ return delegate.getIdentifier();
}
@Override
- public T getDelegate() {
+ public final T getDelegate() {
return delegate;
}
- protected final BindingToNormalizedNodeCodec getCodec() {
- return codec;
- }
-
- protected ListenableFuture<Optional<DataObject>> transformFuture(final LogicalDatastoreType store,
- final InstanceIdentifier<?> path, final ListenableFuture<Optional<NormalizedNode<?, ?>>> future) {
- return Futures.transform(future, new Function<Optional<NormalizedNode<?, ?>>, Optional<DataObject>>() {
- @Nullable
- @Override
- public Optional<DataObject> apply(@Nullable final Optional<NormalizedNode<?, ?>> normalizedNode) {
- if (normalizedNode.isPresent()) {
- final DataObject dataObject;
- try {
- dataObject = codec.toBinding(path, normalizedNode.get());
- } catch (DeserializationException e) {
- LOG.warn("Failed to create dataobject from node {}", normalizedNode.get(), e);
- throw new IllegalStateException("Failed to create dataobject", e);
- }
-
- if (dataObject != null) {
- updateCache(store, path, dataObject);
- return Optional.of(dataObject);
- }
- }
- return Optional.absent();
- }
- });
- }
-
- protected void doPut(final DOMDataWriteTransaction writeTransaction, final LogicalDatastoreType store,
- final InstanceIdentifier<?> path, final DataObject data) {
- invalidateCache(store, path);
- final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = codec
- .toNormalizedNode(path, data);
- writeTransaction.put(store, normalized.getKey(), normalized.getValue());
- }
-
- protected void doPutWithEnsureParents(final DOMDataReadWriteTransaction writeTransaction,
- final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
- invalidateCache(store, path);
- final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = codec
- .toNormalizedNode(path, data);
-
- final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath = normalized.getKey();
- ensureParentsByMerge(writeTransaction, store, normalizedPath, path);
- LOG.debug("Tx: {} : Putting data {}", getDelegate().getIdentifier(), normalizedPath);
- writeTransaction.put(store, normalizedPath, normalized.getValue());
- }
-
- protected void doMergeWithEnsureParents(final DOMDataReadWriteTransaction writeTransaction,
- final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
- invalidateCache(store, path);
- final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = codec
- .toNormalizedNode(path, data);
-
- final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath = normalized.getKey();
- ensureParentsByMerge(writeTransaction, store, normalizedPath, path);
- LOG.debug("Tx: {} : Merge data {}",getDelegate().getIdentifier(),normalizedPath);
- writeTransaction.merge(store, normalizedPath, normalized.getValue());
+ @SuppressWarnings("unchecked")
+ protected final <S extends AsyncTransaction<InstanceIdentifier, NormalizedNode<?, ?>>> S getDelegateChecked(final Class<S> txType) {
+ Preconditions.checkState(txType.isInstance(delegate));
+ return (S) delegate;
}
- private void ensureParentsByMerge(final DOMDataReadWriteTransaction writeTransaction,
- final LogicalDatastoreType store,
- final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath,
- final InstanceIdentifier<?> path) {
- List<PathArgument> currentArguments = new ArrayList<>();
- DataNormalizationOperation<?> currentOp = codec.getDataNormalizer().getRootOperation();
- Iterator<PathArgument> iterator = normalizedPath.getPath().iterator();
- while (iterator.hasNext()) {
- PathArgument currentArg = iterator.next();
- try {
- currentOp = currentOp.getChild(currentArg);
- } catch (DataNormalizationException e) {
- throw new IllegalArgumentException(String.format("Invalid child encountered in path %s", path), e);
- }
- currentArguments.add(currentArg);
- org.opendaylight.yangtools.yang.data.api.InstanceIdentifier currentPath = new org.opendaylight.yangtools.yang.data.api.InstanceIdentifier(
- currentArguments);
-
- final Optional<NormalizedNode<?, ?>> d;
- try {
- d = writeTransaction.read(store, currentPath).get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Failed to read pre-existing data from store {} path {}", store, currentPath, e);
- throw new IllegalStateException("Failed to read pre-existing data", e);
- }
-
- if (!d.isPresent() && iterator.hasNext()) {
- writeTransaction.merge(store, currentPath, currentOp.createDefault(currentArg));
- }
- }
- }
-
- protected void doMerge(final DOMDataWriteTransaction writeTransaction, final LogicalDatastoreType store,
- final InstanceIdentifier<?> path, final DataObject data) {
- invalidateCache(store, path);
- final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = codec
- .toNormalizedNode(path, data);
- writeTransaction.merge(store, normalized.getKey(), normalized.getValue());
- }
-
- protected void doDelete(final DOMDataWriteTransaction writeTransaction, final LogicalDatastoreType store,
- final InstanceIdentifier<?> path) {
- invalidateCache(store, path);
- final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized = codec.toNormalized(path);
- writeTransaction.delete(store, normalized);
- }
-
- protected ListenableFuture<RpcResult<TransactionStatus>> doCommit(final DOMDataWriteTransaction writeTransaction) {
- return writeTransaction.commit();
- }
-
- protected boolean doCancel(final DOMDataWriteTransaction writeTransaction) {
- return writeTransaction.cancel();
- }
-
- protected ListenableFuture<Optional<DataObject>> doRead(final DOMDataReadTransaction readTransaction,
- final LogicalDatastoreType store, final InstanceIdentifier<?> path) {
- final DataObject dataObject = getFromCache(store, path);
- if (dataObject == null) {
- final ListenableFuture<Optional<NormalizedNode<?, ?>>> future = readTransaction.read(store,
- codec.toNormalized(path));
- return transformFuture(store, path, future);
- } else {
- return Futures.immediateFuture(Optional.of(dataObject));
- }
- }
-
- private DataObject getFromCache(final LogicalDatastoreType store, final InstanceIdentifier<?> path) {
- Cache<InstanceIdentifier<?>, DataObject> cache = cacheMap.get(store);
- if (cache != null) {
- return cache.getIfPresent(path);
- }
- return null;
- }
-
- private void updateCache(final LogicalDatastoreType store, final InstanceIdentifier<?> path,
- final DataObject dataObject) {
- // Check if cache exists. If not create one.
- Cache<InstanceIdentifier<?>, DataObject> cache = cacheMap.get(store);
- if (cache == null) {
- cache = CacheBuilder.newBuilder().maximumSize(1000).expireAfterWrite(1, TimeUnit.MINUTES).build();
-
- }
-
- cache.put(path, dataObject);
+ protected final BindingToNormalizedNodeCodec getCodec() {
+ return codec;
}
- private void invalidateCache(final LogicalDatastoreType store, final InstanceIdentifier<?> path) {
- // FIXME: Optimization: invalidate only parents and children of path
- Cache<InstanceIdentifier<?>, DataObject> cache = cacheMap.get(store);
- cache.invalidateAll();
- LOG.trace("Cache invalidated");
+ protected final ListenableFuture<Optional<DataObject>> doRead(final DOMDataReadTransaction readTx,
+ final LogicalDatastoreType store, final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<?> path) {
+ return Futures.transform(readTx.read(store, codec.toNormalized(path)), codec.deserializeFunction(path));
}
-
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
+import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+public class AbstractReadWriteTransaction extends AbstractWriteTransaction<DOMDataReadWriteTransaction> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractReadWriteTransaction.class);
+
+ public AbstractReadWriteTransaction(final DOMDataReadWriteTransaction delegate, final BindingToNormalizedNodeCodec codec) {
+ super(delegate, codec);
+ }
+
+ protected final void doPutWithEnsureParents(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+ final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
+ .toNormalizedNode(path, data);
+
+ final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath = normalized.getKey();
+ ensureParentsByMerge(store, normalizedPath, path);
+ LOG.debug("Tx: {} : Putting data {}", getDelegate().getIdentifier(), normalizedPath);
+ doPut(store, path, data);
+ }
+
+ protected final void doMergeWithEnsureParents(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+ final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
+ .toNormalizedNode(path, data);
+
+ final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath = normalized.getKey();
+ ensureParentsByMerge(store, normalizedPath, path);
+ LOG.debug("Tx: {} : Merge data {}", getDelegate().getIdentifier(), normalizedPath);
+ doMerge(store, path, data);
+ }
+
+ private final void ensureParentsByMerge(final LogicalDatastoreType store,
+ final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalizedPath,
+ final InstanceIdentifier<?> path) {
+ List<PathArgument> currentArguments = new ArrayList<>();
+ DataNormalizationOperation<?> currentOp = getCodec().getDataNormalizer().getRootOperation();
+ Iterator<PathArgument> iterator = normalizedPath.getPathArguments().iterator();
+ while (iterator.hasNext()) {
+ PathArgument currentArg = iterator.next();
+ try {
+ currentOp = currentOp.getChild(currentArg);
+ } catch (DataNormalizationException e) {
+ throw new IllegalArgumentException(String.format("Invalid child encountered in path %s", path), e);
+ }
+ currentArguments.add(currentArg);
+ org.opendaylight.yangtools.yang.data.api.InstanceIdentifier currentPath = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.create(
+ currentArguments);
+
+ final Optional<NormalizedNode<?, ?>> d;
+ try {
+ d = getDelegate().read(store, currentPath).get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to read pre-existing data from store {} path {}", store, currentPath, e);
+ throw new IllegalStateException("Failed to read pre-existing data", e);
+ }
+
+ if (!d.isPresent() && iterator.hasNext()) {
+ getDelegate().merge(store, currentPath, currentOp.createDefault(currentArg));
+ }
+ }
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import java.util.Map.Entry;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ *
+ * Abstract Base Transaction for transactions which are backed by
+ * {@link DOMDataWriteTransaction}
+ */
+public class AbstractWriteTransaction<T extends DOMDataWriteTransaction> extends
+ AbstractForwardedTransaction<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteTransaction.class);
+
+ protected AbstractWriteTransaction(final T delegate,
+ final BindingToNormalizedNodeCodec codec) {
+ super(delegate, codec);
+ }
+
+ protected final void doPut(final LogicalDatastoreType store,
+ final InstanceIdentifier<?> path, final DataObject data) {
+ final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
+ .toNormalizedNode(path, data);
+ getDelegate().put(store, normalized.getKey(), normalized.getValue());
+ }
+
+ protected final void doMerge(final LogicalDatastoreType store,
+ final InstanceIdentifier<?> path, final DataObject data) {
+ final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> normalized = getCodec()
+ .toNormalizedNode(path, data);
+ getDelegate().merge(store, normalized.getKey(), normalized.getValue());
+ }
+
+ protected final void doDelete(final LogicalDatastoreType store,
+ final InstanceIdentifier<?> path) {
+ final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized = getCodec().toNormalized(path);
+ getDelegate().delete(store, normalized);
+ }
+
+ protected final ListenableFuture<RpcResult<TransactionStatus>> doCommit() {
+ return getDelegate().commit();
+ }
+
+ protected final boolean doCancel() {
+ return getDelegate().cancel();
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+
+class BindingDataReadTransactionImpl extends AbstractForwardedTransaction<DOMDataReadOnlyTransaction> implements
+ ReadOnlyTransaction {
+
+ protected BindingDataReadTransactionImpl(final DOMDataReadOnlyTransaction delegate,
+ final BindingToNormalizedNodeCodec codec) {
+ super(delegate, codec);
+ }
+
+ @Override
+ public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
+ final InstanceIdentifier<?> path) {
+ return doRead(getDelegate(),store, path);
+ }
+
+ @Override
+ public void close() {
+ getDelegate().close();
+ }
+
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+
+class BindingDataReadWriteTransactionImpl extends
+ BindingDataWriteTransactionImpl<DOMDataReadWriteTransaction> implements ReadWriteTransaction {
+
+ protected BindingDataReadWriteTransactionImpl(final DOMDataReadWriteTransaction delegate,
+ final BindingToNormalizedNodeCodec codec) {
+ super(delegate, codec);
+ }
+
+ @Override
+ public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
+ final InstanceIdentifier<?> path) {
+ return doRead(getDelegate(), store, path);
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+class BindingDataWriteTransactionImpl<T extends DOMDataWriteTransaction> extends
+ AbstractWriteTransaction<T> implements WriteTransaction {
+
+ protected BindingDataWriteTransactionImpl(final T delegateTx, final BindingToNormalizedNodeCodec codec) {
+ super(delegateTx, codec);
+ }
+
+
+
+ @Override
+ public void put(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+ doPut(store, path, data);
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
+ doMerge(store, path, data);
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final InstanceIdentifier<?> path) {
+ doDelete( store, path);
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ return doCommit();
+ }
+
+ @Override
+ public boolean cancel() {
+ return doCancel();
+ }
+}
\ No newline at end of file
import java.util.List;
import java.util.Map.Entry;
+import javax.annotation.Nullable;
+
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationException;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizationOperation;
import org.opendaylight.controller.md.sal.common.impl.util.compat.DataNormalizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
public Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> toNormalizedNode(
final InstanceIdentifier<? extends DataObject> bindingPath, final DataObject bindingObject) {
- return toNormalizedNode(toEntry(bindingPath, bindingObject));
+ return toNormalizedNode(toBindingEntry(bindingPath, bindingObject));
}
.toNormalized(legacyEntry);
LOG.trace("Serialization of {}, Legacy Representation: {}, Normalized Representation: {}", binding,
legacyEntry, normalizedEntry);
- if (Augmentation.class.isAssignableFrom(binding.getKey().getTargetType())) {
+ if (isAugmentation(binding.getKey().getTargetType())) {
for (DataContainerChild<? extends PathArgument, ?> child : ((DataContainerNode<?>) normalizedEntry
.getValue()).getValue()) {
if (child instanceof AugmentationNode) {
ImmutableList<PathArgument> childArgs = ImmutableList.<PathArgument> builder()
- .addAll(normalizedEntry.getKey().getPath()).add(child.getIdentifier()).build();
- org.opendaylight.yangtools.yang.data.api.InstanceIdentifier childPath = new org.opendaylight.yangtools.yang.data.api.InstanceIdentifier(
+ .addAll(normalizedEntry.getKey().getPathArguments()).add(child.getIdentifier()).build();
+ org.opendaylight.yangtools.yang.data.api.InstanceIdentifier childPath = org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.create(
childArgs);
- return new SimpleEntry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>>(
- childPath, child);
+ return toDOMEntry(childPath, child);
}
}
final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
throws DeserializationException {
- PathArgument lastArgument = Iterables.getLast(normalized.getPath());
+ PathArgument lastArgument = Iterables.getLast(normalized.getPathArguments());
// Used instance-identifier codec do not support serialization of last
// path
// argument if it is AugmentationIdentifier (behaviour expected by old
}
int normalizedCount = getAugmentationCount(normalized);
- AugmentationIdentifier lastArgument = (AugmentationIdentifier) Iterables.getLast(normalized.getPath());
+ AugmentationIdentifier lastArgument = (AugmentationIdentifier) Iterables.getLast(normalized.getPathArguments());
// Here we employ small trick - Binding-aware Codec injects an pointer
// to augmentation class
LOG.trace("Looking for candidates to match {}", normalized);
for (QName child : lastArgument.getPossibleChildNames()) {
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier childPath = new org.opendaylight.yangtools.yang.data.api.InstanceIdentifier(
- ImmutableList.<PathArgument> builder().addAll(normalized.getPath()).add(new NodeIdentifier(child))
+ ImmutableList.<PathArgument> builder().addAll(normalized.getPathArguments()).add(new NodeIdentifier(child))
.build());
try {
if (isNotRepresentable(childPath)) {
final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized)
throws DataNormalizationException {
DataNormalizationOperation<?> current = legacyToNormalized.getRootOperation();
- for (PathArgument arg : normalized.getPath()) {
+ for (PathArgument arg : normalized.getPathArguments()) {
current = current.getChild(arg);
}
return current;
}
- private static final Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject> toEntry(
+ private static final Entry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject> toBindingEntry(
final org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject> key,
final DataObject value) {
return new SimpleEntry<org.opendaylight.yangtools.yang.binding.InstanceIdentifier<? extends DataObject>, DataObject>(
key, value);
}
+ private static final Entry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>> toDOMEntry(
+ final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier key,
+ final NormalizedNode<?, ?> value) {
+ return new SimpleEntry<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>>(
+ key, value);
+ }
+
public DataObject toBinding(final InstanceIdentifier<?> path, final NormalizedNode<?, ?> normalizedNode)
throws DeserializationException {
CompositeNode legacy = null;
if (bindingData == null) {
LOG.warn("Failed to deserialize {} to Binding format. Binding path is: {}", normalized, bindingPath);
}
- return Optional.of(toEntry(bindingPath, bindingData));
+ return Optional.of(toBindingEntry(bindingPath, bindingData));
} else {
return Optional.absent();
}
final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier normalized) {
int position = 0;
int foundPosition = -1;
- for (PathArgument arg : normalized.getPath()) {
+ for (PathArgument arg : normalized.getPathArguments()) {
position++;
if (arg instanceof AugmentationIdentifier) {
foundPosition = position;
}
}
if (foundPosition > 0) {
- return new org.opendaylight.yangtools.yang.data.api.InstanceIdentifier(normalized.getPath().subList(0,
- foundPosition));
+ Iterable<PathArgument> shortened = Iterables.limit(normalized.getPathArguments(), foundPosition);
+ return org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.create(shortened);
}
return null;
}
}
private boolean isAugmentationIdentifier(final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier processed) {
- return Iterables.getLast(processed.getPath()) instanceof AugmentationIdentifier;
+ return Iterables.getLast(processed.getPathArguments()) instanceof AugmentationIdentifier;
}
private static int getAugmentationCount(final InstanceIdentifier<?> potential) {
private static int getAugmentationCount(final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier potential) {
int count = 0;
- for(PathArgument arg : potential.getPath()) {
+ for(PathArgument arg : potential.getPathArguments()) {
if(arg instanceof AugmentationIdentifier) {
count++;
}
}
return count;
}
+
+ public Function<Optional<NormalizedNode<?, ?>>, Optional<DataObject>> deserializeFunction(final InstanceIdentifier<?> path) {
+ return new DeserializeFunction(this, path);
+ }
+
+ private static class DeserializeFunction implements Function<Optional<NormalizedNode<?, ?>>, Optional<DataObject>> {
+
+ private final BindingToNormalizedNodeCodec codec;
+ private final InstanceIdentifier<?> path;
+
+ public DeserializeFunction(final BindingToNormalizedNodeCodec codec, final InstanceIdentifier<?> path) {
+ super();
+ this.codec = Preconditions.checkNotNull(codec, "Codec must not be null");
+ this.path = Preconditions.checkNotNull(path, "Path must not be null");
+ }
+
+ @Nullable
+ @Override
+ public Optional<DataObject> apply(@Nullable final Optional<NormalizedNode<?, ?>> normalizedNode) {
+ if (normalizedNode.isPresent()) {
+ final DataObject dataObject;
+ try {
+ dataObject = codec.toBinding(path, normalizedNode.get());
+ } catch (DeserializationException e) {
+ LOG.warn("Failed to create dataobject from node {}", normalizedNode.get(), e);
+ throw new IllegalStateException("Failed to create dataobject", e);
+ }
+
+ if (dataObject != null) {
+ return Optional.of(dataObject);
+ }
+ }
+ return Optional.absent();
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.binding.impl;
+
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.yangtools.concepts.Delegator;
+
+import com.google.common.base.Preconditions;
+
+class BindingTranslatedTransactionChain implements BindingTransactionChain, Delegator<DOMTransactionChain> {
+
+ private final DOMTransactionChain delegate;
+
+ @GuardedBy("this")
+ private final Map<AsyncTransaction<?, ?>, AsyncTransaction<?, ?>> delegateTxToBindingTx = new WeakHashMap<>();
+ private final BindingToNormalizedNodeCodec codec;
+
+ public BindingTranslatedTransactionChain(final DOMDataBroker chainFactory,
+ final BindingToNormalizedNodeCodec codec, final TransactionChainListener listener) {
+ Preconditions.checkNotNull(chainFactory, "DOM Transaction chain factory must not be null");
+ this.delegate = chainFactory.createTransactionChain(new ListenerInvoker(listener));
+ this.codec = codec;
+ }
+
+ @Override
+ public DOMTransactionChain getDelegate() {
+ return delegate;
+ }
+
+ @Override
+ public ReadOnlyTransaction newReadOnlyTransaction() {
+ DOMDataReadOnlyTransaction delegateTx = delegate.newReadOnlyTransaction();
+ ReadOnlyTransaction bindingTx = new BindingDataReadTransactionImpl(delegateTx, codec);
+ putDelegateToBinding(delegateTx, bindingTx);
+ return bindingTx;
+ }
+
+ @Override
+ public ReadWriteTransaction newReadWriteTransaction() {
+ DOMDataReadWriteTransaction delegateTx = delegate.newReadWriteTransaction();
+ ReadWriteTransaction bindingTx = new BindingDataReadWriteTransactionImpl(delegateTx, codec);
+ putDelegateToBinding(delegateTx, bindingTx);
+ return bindingTx;
+ }
+
+ @Override
+ public WriteTransaction newWriteOnlyTransaction() {
+ DOMDataWriteTransaction delegateTx = delegate.newWriteOnlyTransaction();
+ WriteTransaction bindingTx = new BindingDataWriteTransactionImpl<>(delegateTx, codec);
+ putDelegateToBinding(delegateTx, bindingTx);
+ return bindingTx;
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ }
+
+ private synchronized void putDelegateToBinding(final AsyncTransaction<?, ?> domTx,
+ final AsyncTransaction<?, ?> bindingTx) {
+ final Object previous = delegateTxToBindingTx.put(domTx, bindingTx);
+ Preconditions.checkState(previous == null, "DOM Transaction %s has already associated binding transation %s",domTx,previous);
+ }
+
+ private synchronized AsyncTransaction<?, ?> getBindingTransaction(final AsyncTransaction<?, ?> transaction) {
+ return delegateTxToBindingTx.get(transaction);
+ }
+
+ private final class ListenerInvoker implements TransactionChainListener {
+
+ private final TransactionChainListener listener;
+
+ public ListenerInvoker(final TransactionChainListener listener) {
+ this.listener = Preconditions.checkNotNull(listener, "Listener must not be null.");
+ }
+
+ @Override
+ public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
+ final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+ Preconditions.checkState(delegate.equals(chain),
+ "Illegal state - listener for %s was invoked for incorrect chain %s.", delegate, chain);
+ AsyncTransaction<?, ?> bindingTx = getBindingTransaction(transaction);
+ listener.onTransactionChainFailed(chain, bindingTx, cause);
+ }
+
+ @Override
+ public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
+ Preconditions.checkState(delegate.equals(chain),
+ "Illegal state - listener for %s was invoked for incorrect chain %s.", delegate, chain);
+ listener.onTransactionChainSuccessful(BindingTranslatedTransactionChain.this);
+ }
+ }
+
+}
}
private class ForwardedBackwardsCompatibleTransacion extends
- AbstractForwardedTransaction<DOMDataReadWriteTransaction> implements DataModificationTransaction {
+ AbstractReadWriteTransaction implements DataModificationTransaction {
private final ListenerRegistry<DataTransactionListener> listeners = ListenerRegistry.create();
private final Map<InstanceIdentifier<? extends DataObject>, DataObject> updated = new HashMap<>();
public void putOperationalData(final InstanceIdentifier<? extends DataObject> path, final DataObject data) {
boolean previouslyRemoved = posponedRemovedOperational.remove(path);
if(previouslyRemoved) {
- doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.OPERATIONAL, path, data);
+ doPutWithEnsureParents(LogicalDatastoreType.OPERATIONAL, path, data);
} else {
- doMergeWithEnsureParents(getDelegate(), LogicalDatastoreType.OPERATIONAL, path, data);
+ doMergeWithEnsureParents(LogicalDatastoreType.OPERATIONAL, path, data);
}
}
}
updated.put(path, data);
if(previouslyRemoved) {
- doPutWithEnsureParents(getDelegate(), LogicalDatastoreType.CONFIGURATION, path, data);
+ doPutWithEnsureParents(LogicalDatastoreType.CONFIGURATION, path, data);
} else {
- doMergeWithEnsureParents(getDelegate(), LogicalDatastoreType.CONFIGURATION, path, data);
+ doMergeWithEnsureParents(LogicalDatastoreType.CONFIGURATION, path, data);
}
}
}
}
- @Override
- public Object getIdentifier() {
- return getDelegate().getIdentifier();
- }
-
private void changeStatus(final TransactionStatus status) {
LOG.trace("Transaction {} changed status to {}", getIdentifier(), status);
this.status = status;
public ListenableFuture<RpcResult<TransactionStatus>> commit() {
for(InstanceIdentifier<? extends DataObject> path : posponedRemovedConfiguration) {
- doDelete(getDelegate(), LogicalDatastoreType.CONFIGURATION, path);
+ doDelete(LogicalDatastoreType.CONFIGURATION, path);
}
for(InstanceIdentifier<? extends DataObject> path : posponedRemovedOperational) {
- doDelete(getDelegate(), LogicalDatastoreType.OPERATIONAL, path);
+ doDelete(LogicalDatastoreType.OPERATIONAL, path);
}
changeStatus(TransactionStatus.SUBMITED);
package org.opendaylight.controller.md.sal.binding.impl;
+import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
-import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ListenableFuture;
-
/**
* The DataBrokerImpl simply defers to the DOMDataBroker for all its operations.
* All transactions and listener registrations are wrapped by the DataBrokerImpl
* Besides this the DataBrokerImpl and it's collaborators also cache data that
* is already transformed from the binding independent to binding aware format
*
- * TODO : All references in this class to CompositeNode should be switched to
- * NormalizedNode once the MappingService is updated
- *
+
*/
public class ForwardedBindingDataBroker extends AbstractForwardedDataBroker implements DataBroker {
@Override
public WriteTransaction newWriteOnlyTransaction() {
- return new BindingDataWriteTransactionImpl<DOMDataWriteTransaction>(getDelegate().newWriteOnlyTransaction(),getCodec());
- }
-
- private abstract class AbstractBindingTransaction<T extends AsyncTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, NormalizedNode<?, ?>>>
- extends AbstractForwardedTransaction<T> implements AsyncTransaction<InstanceIdentifier<?>, DataObject> {
-
- protected AbstractBindingTransaction(final T delegate, final BindingToNormalizedNodeCodec codec) {
- super(delegate, codec);
- }
-
- @Override
- public Object getIdentifier() {
- return getDelegate().getIdentifier();
- }
-
- }
-
-
- private class BindingDataReadTransactionImpl extends AbstractBindingTransaction<DOMDataReadOnlyTransaction> implements
- ReadOnlyTransaction {
-
- protected BindingDataReadTransactionImpl(final DOMDataReadOnlyTransaction delegate,
- final BindingToNormalizedNodeCodec codec) {
- super(delegate, codec);
- }
-
- @Override
- public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
- final InstanceIdentifier<?> path) {
- return doRead(getDelegate(), store, path);
- }
-
- @Override
- public void close() {
- getDelegate().close();
- }
+ return new BindingDataWriteTransactionImpl<>(getDelegate().newWriteOnlyTransaction(),getCodec());
}
- private class BindingDataWriteTransactionImpl<T extends DOMDataWriteTransaction> extends
- AbstractBindingTransaction<T> implements WriteTransaction {
-
- protected BindingDataWriteTransactionImpl(final T delegate, final BindingToNormalizedNodeCodec codec) {
- super(delegate, codec);
-
- }
-
- @Override
- public boolean cancel() {
- return doCancel(getDelegate());
- }
-
- @Override
- public void put(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
- doPut(getDelegate(), store, path, data);
- }
-
- @Override
- public void merge(final LogicalDatastoreType store, final InstanceIdentifier<?> path, final DataObject data) {
- doMerge(getDelegate(), store, path, data);
- }
-
- @Override
- public void delete(final LogicalDatastoreType store, final InstanceIdentifier<?> path) {
- doDelete(getDelegate(), store, path);
- }
-
- @Override
- public ListenableFuture<RpcResult<TransactionStatus>> commit() {
- return doCommit(getDelegate());
- }
- }
-
- private class BindingDataReadWriteTransactionImpl extends
- BindingDataWriteTransactionImpl<DOMDataReadWriteTransaction> implements ReadWriteTransaction {
-
- protected BindingDataReadWriteTransactionImpl(final DOMDataReadWriteTransaction delegate,
- final BindingToNormalizedNodeCodec codec) {
- super(delegate, codec);
- }
-
- @Override
- public ListenableFuture<Optional<DataObject>> read(final LogicalDatastoreType store,
- final InstanceIdentifier<?> path) {
- return doRead(getDelegate(), store, path);
- }
+ @Override
+ public BindingTransactionChain createTransactionChain(final TransactionChainListener listener) {
+ return new BindingTranslatedTransactionChain(getDelegate(), getCodec(), listener);
}
}
import java.util.Map;
import java.util.Set;
-
+/**
+ * Event representing change in RPC routing table.
+ *
+ *
+ * @param <C> Type, which is used to represent Routing context.
+ * @param <P> Type of data tree path, which is used to identify route.
+ */
public interface RouteChange<C,P> {
+ /**
+ *
+ * Returns a map of removed routes in associated routing contexts.
+ * <p>
+ * This map represents routes, which were withdrawn from broker local
+ * routing table and broker may need to forward RPC to other broker
+ * in order to process RPC request.
+ *
+ * @return Map of contexts and removed routes
+ */
Map<C,Set<P>> getRemovals();
+ /**
+ *
+ * Returns a map of announced routes in associated routing contexts.
+ *
+ * This map represents routes, which were announced by broker
+ * and are present in broker's local routing table. This routes
+ * are processed by implementations which are registered
+ * to originating broker.
+ *
+ * @return Map of contexts and announced routes
+ */
Map<C,Set<P>> getAnnouncements();
}
package org.opendaylight.controller.md.sal.common.api.routing;
import java.util.EventListener;
-
+/**
+ *
+ * Listener which is interested in receiving RouteChangeEvents
+ * for its local broker.
+ * <p>
+ * Listener is registerd via {@link RouteChangePublisher#registerRouteChangeListener(RouteChangeListener)}
+ *
+ *
+ * @param <C> Type, which is used to represent Routing context.
+ * @param <P> Type of data tree path, which is used to identify route.
+ */
public interface RouteChangeListener<C,P> extends EventListener {
+ /**
+ * Callback which is invoked if there is an rpc routing table change.
+ *
+ * @param change Event representing change in local RPC routing table.
+ */
void onRouteChange(RouteChange<C, P> change);
}
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+/**
+ * Publishes changes in local RPC routing table to registered listener.
+ *
+ * @param <C> Type, which is used to represent Routing context.
+ * @param <P> Type of data tree path, which is used to identify route.
+ */
public interface RouteChangePublisher<C,P> {
<L extends RouteChangeListener<C,P>> ListenerRegistration<L> registerRouteChangeListener(L listener);
import org.opendaylight.yangtools.concepts.Path;
import org.opendaylight.yangtools.concepts.Registration;
+/**
+ * Base interface for a routed RPC RPC implementation registration.
+ *
+ * @param <C> the context type used for routing
+ * @param <P> the path identifier type
+ * @param <S> the RPC implementation type
+ */
public interface RoutedRegistration<C, P extends Path<P>, S> extends Registration<S> {
+ /**
+ * Registers the RPC implementation associated with this registration for the given path
+ * identifier and context.
+ *
+ * @param context the context used for routing RPCs to this implementation.
+ * @param path the path identifier for which to register.
+ */
void registerPath(C context, P path);
+
+ /**
+ * Unregisters the RPC implementation associated with this registration for the given path
+ * identifier and context.
+ *
+ * @param context the context used for routing RPCs to this implementation.
+ * @param path the path identifier for which to unregister.
+ */
void unregisterPath(C context, P path);
@Override
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+
+public class DataChangeListener extends UntypedActor {
+ @Override public void onReceive(Object message) throws Exception {
+ throw new UnsupportedOperationException("onReceive");
+ }
+
+ public static Props props() {
+ return Props.create(new Creator<DataChangeListener>() {
+ @Override
+ public DataChangeListener create() throws Exception {
+ return new DataChangeListener();
+ }
+
+ });
+
+ }
+}
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
*/
-public class DistributedDataStore implements DOMStore {
- private final ActorRef shardManager;
-
- public DistributedDataStore(ActorSystem actorSystem, String type) {
- shardManager = actorSystem.actorOf(ShardManager.props(type));
- }
-
- @Override
- public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) {
- return new ListenerRegistrationProxy();
- }
-
- @Override
- public DOMStoreTransactionChain createTransactionChain() {
- return new TransactionChainProxy();
- }
-
- @Override
- public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new TransactionProxy();
- }
-
- @Override
- public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new TransactionProxy();
- }
-
- @Override
- public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new TransactionProxy();
- }
+public class DistributedDataStore implements DOMStore, SchemaContextListener {
+
+ private static final Logger
+ LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+
+
+ private final String type;
+ private final ActorContext actorContext;
+
+ public DistributedDataStore(ActorSystem actorSystem, String type) {
+ this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type))), type);
+ }
+
+ public DistributedDataStore(ActorContext actorContext, String type) {
+ this.type = type;
+ this.actorContext = actorContext;
+ }
+
+
+ @Override
+ public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
+ InstanceIdentifier path, L listener,
+ AsyncDataBroker.DataChangeScope scope) {
+
+ ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(DataChangeListener.props());
+
+ Object result = actorContext.executeShardOperation(Shard.DEFAULT_NAME,
+ new RegisterChangeListener(path, dataChangeListenerActor.path(),
+ AsyncDataBroker.DataChangeScope.BASE),
+ ActorContext.ASK_DURATION);
+
+ RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+ return new ListenerRegistrationProxy(reply.getListenerRegistrationPath());
+ }
+
+
+
+ @Override
+ public DOMStoreTransactionChain createTransactionChain() {
+ return new TransactionChainProxy(actorContext);
+ }
+
+ @Override
+ public DOMStoreReadTransaction newReadOnlyTransaction() {
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
+ }
+
+ @Override
+ public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
+ }
+
+ @Override
+ public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
+ }
+
+ @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
+ actorContext.getShardManager().tell(new UpdateSchemaContext(schemaContext), null);
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class ListenerProxy implements AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>{
+ private final ActorSelection listenerRegistrationActor;
+
+ public ListenerProxy(ActorSelection listenerRegistrationActor) {
+ this.listenerRegistrationActor = listenerRegistrationActor;
+ }
+
+ @Override public void onDataChanged(
+ AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+ throw new UnsupportedOperationException("onDataChanged");
+ }
+}
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorPath;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
/**
* The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
*/
public class ListenerRegistrationProxy implements ListenerRegistration {
+ private final ActorPath listenerRegistrationPath;
+
+ public ListenerRegistrationProxy(ActorPath listenerRegistrationPath) {
+
+ this.listenerRegistrationPath = listenerRegistrationPath;
+ }
+
@Override
public Object getInstance() {
throw new UnsupportedOperationException("getInstance");
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;
*/
public class Shard extends UntypedProcessor {
- public static final String DEFAULT_NAME = "default";
+ public static final String DEFAULT_NAME = "default";
- private final ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+ private final ListeningExecutorService storeExecutor =
+ MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
- private final InMemoryDOMDataStore store;
+ private final InMemoryDOMDataStore store;
- private final Map<Modification, DOMStoreThreePhaseCommitCohort> modificationToCohort = new HashMap<>();
+ private final Map<Modification, DOMStoreThreePhaseCommitCohort>
+ modificationToCohort = new HashMap<>();
- private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ private final LoggingAdapter log =
+ Logging.getLogger(getContext().system(), this);
- private Shard(String name){
- store = new InMemoryDOMDataStore(name, storeExecutor);
- }
-
- public static Props props(final String name) {
- return Props.create(new Creator<Shard>() {
+ private Shard(String name) {
+ store = new InMemoryDOMDataStore(name, storeExecutor);
+ }
- @Override
- public Shard create() throws Exception {
- return new Shard(name);
- }
+ public static Props props(final String name) {
+ return Props.create(new Creator<Shard>() {
- });
- }
+ @Override
+ public Shard create() throws Exception {
+ return new Shard(name);
+ }
- @Override
- public void onReceive(Object message) throws Exception {
- if (message instanceof CreateTransactionChain) {
- createTransactionChain();
- } else if (message instanceof RegisterChangeListener) {
- registerChangeListener((RegisterChangeListener) message);
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
- } else if (message instanceof ForwardedCommitTransaction ) {
- handleForwardedCommit((ForwardedCommitTransaction) message);
- } else if (message instanceof Persistent){
- commit((Persistent) message);
+ });
}
- }
-
- private void commit(Persistent message) {
- Modification modification = (Modification) message.payload();
- DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(modification);
- if(cohort == null){
- log.error("Could not find cohort for modification : " + modification);
- return;
+
+ @Override
+ public void onReceive(Object message) throws Exception {
+ if (message instanceof CreateTransactionChain) {
+ createTransactionChain();
+ } else if (message instanceof RegisterChangeListener) {
+ registerChangeListener((RegisterChangeListener) message);
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof ForwardedCommitTransaction) {
+ handleForwardedCommit((ForwardedCommitTransaction) message);
+ } else if (message instanceof Persistent) {
+ commit((Persistent) message);
+ }
}
- final ListenableFuture<Void> future = cohort.commit();
- final ActorRef sender = getSender();
- final ActorRef self = getSelf();
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- sender.tell(new CommitTransactionReply(), self);
- } catch (InterruptedException | ExecutionException e) {
- log.error(e, "An exception happened when committing");
+
+ private void commit(Persistent message) {
+ Modification modification = (Modification) message.payload();
+ DOMStoreThreePhaseCommitCohort cohort =
+ modificationToCohort.remove(modification);
+ if (cohort == null) {
+ log.error(
+ "Could not find cohort for modification : " + modification);
+ return;
}
- }
- }, getContext().dispatcher());
- }
-
- private void handleForwardedCommit(ForwardedCommitTransaction message) {
- log.info("received forwarded transaction");
- modificationToCohort.put(message.getModification(), message.getCohort());
- getSelf().forward(Persistent.create(message.getModification()), getContext());
- }
-
- private void updateSchemaContext(UpdateSchemaContext message) {
- store.onGlobalContextUpdated(message.getSchemaContext());
- }
-
- private void registerChangeListener(RegisterChangeListener registerChangeListener) {
- org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration =
- store.registerChangeListener(registerChangeListener.getPath(), registerChangeListener.getListener(), registerChangeListener.getScope());
- ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(registration));
- getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
- }
-
- private void createTransactionChain() {
- DOMStoreTransactionChain chain = store.createTransactionChain();
- ActorRef transactionChain = getContext().actorOf(ShardTransactionChain.props(chain));
- getSender().tell(new CreateTransactionChainReply(transactionChain.path()), getSelf());
- }
+ final ListenableFuture<Void> future = cohort.commit();
+ final ActorRef sender = getSender();
+ final ActorRef self = getSelf();
+ future.addListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ future.get();
+ sender.tell(new CommitTransactionReply(), self);
+ } catch (InterruptedException | ExecutionException e) {
+ log.error(e, "An exception happened when committing");
+ }
+ }
+ }, getContext().dispatcher());
+ }
+
+ private void handleForwardedCommit(ForwardedCommitTransaction message) {
+ log.info("received forwarded transaction");
+ modificationToCohort
+ .put(message.getModification(), message.getCohort());
+ getSelf().forward(Persistent.create(message.getModification()),
+ getContext());
+ }
+
+ private void updateSchemaContext(UpdateSchemaContext message) {
+ store.onGlobalContextUpdated(message.getSchemaContext());
+ }
+
+ private void registerChangeListener(
+ RegisterChangeListener registerChangeListener) {
+
+ ActorSelection listenerRegistrationActor = getContext()
+ .system().actorSelection(registerChangeListener.getDataChangeListenerPath());
+
+ AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>
+ listener = new ListenerProxy(listenerRegistrationActor);
+
+ org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+ registration =
+ store.registerChangeListener(registerChangeListener.getPath(),
+ listener, registerChangeListener.getScope());
+ ActorRef listenerRegistration =
+ getContext().actorOf(ListenerRegistration.props(registration));
+ getSender()
+ .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
+ getSelf());
+ }
+
+ private void createTransactionChain() {
+ DOMStoreTransactionChain chain = store.createTransactionChain();
+ ActorRef transactionChain =
+ getContext().actorOf(ShardTransactionChain.props(chain));
+ getSender()
+ .tell(new CreateTransactionChainReply(transactionChain.path()),
+ getSelf());
+ }
}
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import java.util.HashMap;
import java.util.List;
} else {
getSender().tell(new PrimaryNotFound(shardName), getSelf());
}
+ } else if(message instanceof UpdateSchemaContext){
+ // FIXME : Notify all local shards of a context change
+ getContext().system().actorSelection(defaultShardPath).forward(message, getContext());
}
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorPath;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
+ */
+public class ThreePhaseCommitCohortProxy implements
+ DOMStoreThreePhaseCommitCohort{
+
+ private final List<ActorPath> cohortPaths;
+
+ public ThreePhaseCommitCohortProxy(List<ActorPath> cohortPaths) {
+
+ this.cohortPaths = cohortPaths;
+ }
+
+ @Override public ListenableFuture<Boolean> canCommit() {
+ throw new UnsupportedOperationException("canCommit");
+ }
+
+ @Override public ListenableFuture<Void> preCommit() {
+ throw new UnsupportedOperationException("preCommit");
+ }
+
+ @Override public ListenableFuture<Void> abort() {
+ throw new UnsupportedOperationException("abort");
+ }
+
+ @Override public ListenableFuture<Void> commit() {
+ throw new UnsupportedOperationException("commit");
+ }
+
+ public List<ActorPath> getCohortPaths() {
+ return Collections.unmodifiableList(this.cohortPaths);
+ }
+}
package org.opendaylight.controller.cluster.datastore;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
* TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
*/
public class TransactionChainProxy implements DOMStoreTransactionChain{
+ private final ActorContext actorContext;
+
+ public TransactionChainProxy(ActorContext actorContext) {
+ this.actorContext = actorContext;
+ }
+
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- throw new UnsupportedOperationException("newReadOnlyTransaction");
+ return new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_ONLY);
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- throw new UnsupportedOperationException("newReadWriteTransaction");
+ return new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.WRITE_ONLY);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- throw new UnsupportedOperationException("newWriteOnlyTransaction");
+ return new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_WRITE);
}
@Override
public void close() {
- throw new UnsupportedOperationException("close");
+ throw new UnsupportedOperationException("close - not sure what to do here?");
}
}
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* TransactionProxy acts as a proxy for one or more transactions that were created on a remote shard
- *
+ * <p>
* Creating a transaction on the consumer side will create one instance of a transaction proxy. If during
* the transaction reads and writes are done on data that belongs to different shards then a separate transaction will
* be created on each of those shards by the TransactionProxy
- *
+ *</p>
+ * <p>
* The TransactionProxy does not make any guarantees about atomicity or order in which the transactions on the various
* shards will be executed.
- *
+ * </p>
*/
public class TransactionProxy implements DOMStoreReadWriteTransaction {
+
+ public enum TransactionType {
+ READ_ONLY,
+ WRITE_ONLY,
+ READ_WRITE
+ }
+
+ private static final AtomicLong counter = new AtomicLong();
+
+ private final TransactionType readOnly;
+ private final ActorContext actorContext;
+ private final Map<String, ActorSelection> remoteTransactionPaths = new HashMap<>();
+ private final String identifier;
+
+ public TransactionProxy(
+ ActorContext actorContext,
+ TransactionType readOnly) {
+
+ this.identifier = "transaction-" + counter.getAndIncrement();
+ this.readOnly = readOnly;
+ this.actorContext = actorContext;
+
+ Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION);
+ if(response instanceof CreateTransactionReply){
+ CreateTransactionReply reply = (CreateTransactionReply) response;
+ remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionPath()));
+ }
+ }
+
@Override
- public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(InstanceIdentifier path) {
- throw new UnsupportedOperationException("read");
+ public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
+ final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
+
+ Callable<Optional<NormalizedNode<?,?>>> call = new Callable() {
+
+ @Override public Optional<NormalizedNode<?,?>> call() throws Exception {
+ Object response = actorContext
+ .executeRemoteOperation(remoteTransaction, new ReadData(path),
+ ActorContext.ASK_DURATION);
+ if(response instanceof ReadDataReply){
+ ReadDataReply reply = (ReadDataReply) response;
+ //FIXME : A cast should not be required here ???
+ return (Optional<NormalizedNode<?, ?>>) Optional.of(reply.getNormalizedNode());
+ }
+
+ return Optional.absent();
+ }
+ };
+
+ ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
+ future = ListenableFutureTask.create(call);
+
+ //FIXME : Use a thread pool here
+ Executors.newSingleThreadExecutor().submit(future);
+
+ return future;
}
@Override
public void write(InstanceIdentifier path, NormalizedNode<?, ?> data) {
- throw new UnsupportedOperationException("write");
+ final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
+ remoteTransaction.tell(new WriteData(path, data), null);
}
@Override
public void merge(InstanceIdentifier path, NormalizedNode<?, ?> data) {
- throw new UnsupportedOperationException("merge");
+ final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
+ remoteTransaction.tell(new MergeData(path, data), null);
}
@Override
public void delete(InstanceIdentifier path) {
- throw new UnsupportedOperationException("delete");
+ final ActorSelection remoteTransaction = remoteTransactionFromIdentifier(path);
+ remoteTransaction.tell(new DeleteData(path), null);
}
@Override
public DOMStoreThreePhaseCommitCohort ready() {
- throw new UnsupportedOperationException("ready");
+ List<ActorPath> cohortPaths = new ArrayList<>();
+
+ for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) {
+ Object result = actorContext.executeRemoteOperation(remoteTransaction,
+ new ReadyTransaction(),
+ ActorContext.ASK_DURATION
+ );
+
+ if(result instanceof ReadyTransactionReply){
+ ReadyTransactionReply reply = (ReadyTransactionReply) result;
+ cohortPaths.add(reply.getCohortPath());
+ }
+ }
+
+ return new ThreePhaseCommitCohortProxy(cohortPaths);
}
@Override
public Object getIdentifier() {
- throw new UnsupportedOperationException("getIdentifier");
+ return this.identifier;
}
@Override
public void close() {
- throw new UnsupportedOperationException("close");
+ for(ActorSelection remoteTransaction : remoteTransactionPaths.values()) {
+ remoteTransaction.tell(new CloseTransaction(), null);
+ }
+ }
+
+ private ActorSelection remoteTransactionFromIdentifier(InstanceIdentifier path){
+ String shardName = shardNameFromIdentifier(path);
+ return remoteTransactionPaths.get(shardName);
+ }
+
+ private String shardNameFromIdentifier(InstanceIdentifier path){
+ return Shard.DEFAULT_NAME;
}
}
package org.opendaylight.controller.cluster.datastore.messages;
+import akka.actor.ActorPath;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
public class RegisterChangeListener {
- private final InstanceIdentifier path;
- private final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener;
- private final AsyncDataBroker.DataChangeScope scope;
+ private final InstanceIdentifier path;
+ private final ActorPath dataChangeListenerPath;
+ private final AsyncDataBroker.DataChangeScope scope;
- public RegisterChangeListener(InstanceIdentifier path, AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener, AsyncDataBroker.DataChangeScope scope) {
- this.path = path;
- this.listener = listener;
- this.scope = scope;
- }
+ public RegisterChangeListener(InstanceIdentifier path,
+ ActorPath dataChangeListenerPath,
+ AsyncDataBroker.DataChangeScope scope) {
+ this.path = path;
+ this.dataChangeListenerPath = dataChangeListenerPath;
+ this.scope = scope;
+ }
- public InstanceIdentifier getPath() {
- return path;
- }
+ public InstanceIdentifier getPath() {
+ return path;
+ }
- public AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> getListener() {
- return listener;
- }
- public AsyncDataBroker.DataChangeScope getScope() {
- return scope;
- }
+ public AsyncDataBroker.DataChangeScope getScope() {
+ return scope;
+ }
+
+ public ActorPath getDataChangeListenerPath() {
+ return dataChangeListenerPath;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static akka.pattern.Patterns.ask;
+
+/**
+ * The ActorContext class contains utility methods which could be used by
+ * non-actors (like DistributedDataStore) to work with actors a little more
+ * easily. An ActorContext can be freely passed around to local object instances
+ * but should not be passed to actors especially remote actors
+ */
+public class ActorContext {
+ private static final Logger
+ LOG = LoggerFactory.getLogger(ActorContext.class);
+
+ public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
+ public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
+
+ private final ActorSystem actorSystem;
+ private final ActorRef shardManager;
+
+ public ActorContext(ActorSystem actorSystem, ActorRef shardManager){
+ this.actorSystem = actorSystem;
+ this.shardManager = shardManager;
+ }
+
+ public ActorSystem getActorSystem() {
+ return actorSystem;
+ }
+
+ public ActorRef getShardManager() {
+ return shardManager;
+ }
+
+ public ActorSelection actorSelection(String actorPath){
+ return actorSystem.actorSelection(actorPath);
+ }
+
+ public ActorSelection actorSelection(ActorPath actorPath){
+ return actorSystem.actorSelection(actorPath);
+ }
+
+
+ /**
+ * Finds the primary for a given shard
+ *
+ * @param shardName
+ * @return
+ */
+ public ActorSelection findPrimary(String shardName) {
+ Object result = executeLocalOperation(shardManager,
+ new FindPrimary(shardName), ASK_DURATION);
+
+ if(result instanceof PrimaryFound){
+ PrimaryFound found = (PrimaryFound) result;
+
+ LOG.error("Primary found {}", found.getPrimaryPath());
+
+ return actorSystem.actorSelection(found.getPrimaryPath());
+ }
+ throw new RuntimeException("primary was not found");
+ }
+
+ /**
+ * Executes an operation on a local actor and wait for it's response
+ * @param actor
+ * @param message
+ * @param duration
+ * @return The response of the operation
+ */
+ public Object executeLocalOperation(ActorRef actor, Object message,
+ FiniteDuration duration){
+ Future<Object> future =
+ ask(actor, message, new Timeout(duration));
+
+ try {
+ return Await.result(future, AWAIT_DURATION);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Execute an operation on a remote actor and wait for it's response
+ * @param actor
+ * @param message
+ * @param duration
+ * @return
+ */
+ public Object executeRemoteOperation(ActorSelection actor, Object message,
+ FiniteDuration duration){
+ Future<Object> future =
+ ask(actor, message, new Timeout(duration));
+
+ try {
+ return Await.result(future, AWAIT_DURATION);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Execute an operation on the primary for a given shard
+ * <p>
+ * This method first finds the primary for a given shard ,then sends
+ * the message to the remote shard and waits for a response
+ * </p>
+ * @param shardName
+ * @param message
+ * @param duration
+ * @throws java.lang.RuntimeException when a primary is not found or if the message to the remote shard fails or times out
+ *
+ * @return
+ */
+ public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
+ ActorSelection primary = findPrimary(shardName);
+
+ return executeRemoteOperation(primary, message, duration);
+ }
+
+}
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorRef;
+import akka.actor.Props;
import junit.framework.Assert;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
public class DistributedDataStoreTest extends AbstractActorTest{
private DistributedDataStore distributedDataStore;
+ private MockActorContext mockActorContext;
+ private ActorRef doNothingActorRef;
@org.junit.Before
public void setUp() throws Exception {
- distributedDataStore = new DistributedDataStore(getSystem(), "config");
+ final Props props = Props.create(DoNothingActor.class);
+
+ doNothingActorRef = getSystem().actorOf(props);
+
+ mockActorContext = new MockActorContext(getSystem(), doNothingActorRef);
+ distributedDataStore = new DistributedDataStore(mockActorContext, "config");
+ distributedDataStore.onGlobalContextUpdated(
+ TestModel.createTestContext());
+
+ // Make CreateTransactionReply as the default response. Will need to be
+ // tuned if a specific test requires some other response
+ mockActorContext.setExecuteShardOperationResponse(
+ new CreateTransactionReply(doNothingActorRef.path()));
}
@org.junit.After
@org.junit.Test
public void testRegisterChangeListener() throws Exception {
+ mockActorContext.setExecuteShardOperationResponse(new RegisterChangeListenerReply(doNothingActorRef.path()));
ListenerRegistration registration =
- distributedDataStore.registerChangeListener(InstanceIdentifier.builder().build(), new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
+ distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
@Override
public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
throw new UnsupportedOperationException("onDataChanged");
subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
- subject.tell(new RegisterChangeListener(InstanceIdentifier.builder().build(), noOpDataChangeListener() , AsyncDataBroker.DataChangeScope.BASE), getRef());
+ subject.tell(new RegisterChangeListener(TestModel.TEST_PATH, getRef().path() , AsyncDataBroker.DataChangeScope.BASE), getRef());
final String out = new ExpectMsg<String>("match hint") {
// do not put code outside this method, will run afterwards
}
};
}
-}
\ No newline at end of file
+}
--- /dev/null
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.cluster.datastore.messages.MergeData;
+import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.WriteData;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
+import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+
+import java.util.List;
+
+public class TransactionProxyTest extends AbstractActorTest {
+
+ @Test
+ public void testRead() throws Exception {
+ final Props props = Props.create(DoNothingActor.class);
+ final ActorRef actorRef = getSystem().actorOf(props);
+
+ final MockActorContext actorContext = new MockActorContext(this.getSystem());
+ actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteRemoteOperationResponse("message");
+
+ TransactionProxy transactionProxy =
+ new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_ONLY);
+
+
+ ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
+ transactionProxy.read(TestModel.TEST_PATH);
+
+ Optional<NormalizedNode<?, ?>> normalizedNodeOptional = read.get();
+
+ Assert.assertFalse(normalizedNodeOptional.isPresent());
+
+ actorContext.setExecuteRemoteOperationResponse(new ReadDataReply(
+ ImmutableNodes.containerNode(TestModel.TEST_QNAME)));
+
+ read = transactionProxy.read(TestModel.TEST_PATH);
+
+ normalizedNodeOptional = read.get();
+
+ Assert.assertTrue(normalizedNodeOptional.isPresent());
+ }
+
+ @Test
+ public void testWrite() throws Exception {
+ final Props props = Props.create(MessageCollectorActor.class);
+ final ActorRef actorRef = getSystem().actorOf(props);
+
+ final MockActorContext actorContext = new MockActorContext(this.getSystem());
+ actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteRemoteOperationResponse("message");
+
+ TransactionProxy transactionProxy =
+ new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_ONLY);
+
+ transactionProxy.write(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+
+ ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+ Object messages = testContext
+ .executeLocalOperation(actorRef, "messages",
+ ActorContext.ASK_DURATION);
+
+ Assert.assertNotNull(messages);
+
+ Assert.assertTrue(messages instanceof List);
+
+ List<Object> listMessages = (List<Object>) messages;
+
+ Assert.assertEquals(1, listMessages.size());
+
+ Assert.assertTrue(listMessages.get(0) instanceof WriteData);
+ }
+
+ @Test
+ public void testMerge() throws Exception {
+ final Props props = Props.create(MessageCollectorActor.class);
+ final ActorRef actorRef = getSystem().actorOf(props);
+
+ final MockActorContext actorContext = new MockActorContext(this.getSystem());
+ actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteRemoteOperationResponse("message");
+
+ TransactionProxy transactionProxy =
+ new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_ONLY);
+
+ transactionProxy.merge(TestModel.TEST_PATH,
+ ImmutableNodes.containerNode(TestModel.NAME_QNAME));
+
+ ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+ Object messages = testContext
+ .executeLocalOperation(actorRef, "messages",
+ ActorContext.ASK_DURATION);
+
+ Assert.assertNotNull(messages);
+
+ Assert.assertTrue(messages instanceof List);
+
+ List<Object> listMessages = (List<Object>) messages;
+
+ Assert.assertEquals(1, listMessages.size());
+
+ Assert.assertTrue(listMessages.get(0) instanceof MergeData);
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ final Props props = Props.create(MessageCollectorActor.class);
+ final ActorRef actorRef = getSystem().actorOf(props);
+
+ final MockActorContext actorContext = new MockActorContext(this.getSystem());
+ actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteRemoteOperationResponse("message");
+
+ TransactionProxy transactionProxy =
+ new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_ONLY);
+
+ transactionProxy.delete(TestModel.TEST_PATH);
+
+ ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+ Object messages = testContext
+ .executeLocalOperation(actorRef, "messages",
+ ActorContext.ASK_DURATION);
+
+ Assert.assertNotNull(messages);
+
+ Assert.assertTrue(messages instanceof List);
+
+ List<Object> listMessages = (List<Object>) messages;
+
+ Assert.assertEquals(1, listMessages.size());
+
+ Assert.assertTrue(listMessages.get(0) instanceof DeleteData);
+ }
+
+ @Test
+ public void testReady() throws Exception {
+ final Props props = Props.create(DoNothingActor.class);
+ final ActorRef doNothingActorRef = getSystem().actorOf(props);
+
+ final MockActorContext actorContext = new MockActorContext(this.getSystem());
+ actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(doNothingActorRef.path()));
+ actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
+
+ TransactionProxy transactionProxy =
+ new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_ONLY);
+
+
+ DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
+
+ Assert.assertTrue(ready instanceof ThreePhaseCommitCohortProxy);
+
+ ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
+
+ Assert.assertTrue("No cohort paths returned", proxy.getCohortPaths().size() > 0);
+
+ }
+
+ @Test
+ public void testGetIdentifier(){
+ final Props props = Props.create(DoNothingActor.class);
+ final ActorRef doNothingActorRef = getSystem().actorOf(props);
+
+ final MockActorContext actorContext = new MockActorContext(this.getSystem());
+ actorContext.setExecuteShardOperationResponse(
+ new CreateTransactionReply(doNothingActorRef.path()));
+
+ TransactionProxy transactionProxy =
+ new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_ONLY);
+
+ Assert.assertNotNull(transactionProxy.getIdentifier());
+ }
+
+ @Test
+ public void testClose(){
+ final Props props = Props.create(MessageCollectorActor.class);
+ final ActorRef actorRef = getSystem().actorOf(props);
+
+ final MockActorContext actorContext = new MockActorContext(this.getSystem());
+ actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+ actorContext.setExecuteRemoteOperationResponse("message");
+
+ TransactionProxy transactionProxy =
+ new TransactionProxy(actorContext,
+ TransactionProxy.TransactionType.READ_ONLY);
+
+ transactionProxy.close();
+
+ ActorContext testContext = new ActorContext(getSystem(), getSystem().actorOf(Props.create(DoNothingActor.class)));
+ Object messages = testContext
+ .executeLocalOperation(actorRef, "messages",
+ ActorContext.ASK_DURATION);
+
+ Assert.assertNotNull(messages);
+
+ Assert.assertTrue(messages instanceof List);
+
+ List<Object> listMessages = (List<Object>) messages;
+
+ Assert.assertEquals(1, listMessages.size());
+
+ Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.UntypedActor;
+
+public class DoNothingActor extends UntypedActor {
+
+ @Override public void onReceive(Object message) throws Exception {
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+import akka.actor.UntypedActor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * MessageCollectorActor collects messages as it receives them. It can send
+ * those collected messages to any sender which sends it the "messages" message
+ * <p>
+ * This class would be useful as a mock to test whether messages were sent
+ * to a remote actor or not.
+ * </p>
+ */
+public class MessageCollectorActor extends UntypedActor {
+ private List<Object> messages = new ArrayList<>();
+
+ @Override public void onReceive(Object message) throws Exception {
+ if(message instanceof String){
+ if("messages".equals(message)){
+ getSender().tell(messages, getSelf());
+ }
+ } else {
+ messages.add(message);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.utils;
+
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import scala.concurrent.duration.FiniteDuration;
+
+public class MockActorContext extends ActorContext {
+
+ private Object executeShardOperationResponse;
+ private Object executeRemoteOperationResponse;
+ private Object executeLocalOperationResponse;
+
+ public MockActorContext(ActorSystem actorSystem) {
+ super(actorSystem, null);
+ }
+
+ public MockActorContext(ActorSystem actorSystem, ActorRef shardManager) {
+ super(actorSystem, shardManager);
+ }
+
+
+ @Override public Object executeShardOperation(String shardName,
+ Object message, FiniteDuration duration) {
+ return executeShardOperationResponse;
+ }
+
+ @Override public Object executeRemoteOperation(ActorSelection actor,
+ Object message, FiniteDuration duration) {
+ return executeRemoteOperationResponse;
+ }
+
+ @Override public ActorSelection findPrimary(String shardName) {
+ return null;
+ }
+
+ public void setExecuteShardOperationResponse(Object response){
+ executeShardOperationResponse = response;
+ }
+
+ public void setExecuteRemoteOperationResponse(Object response){
+ executeRemoteOperationResponse = response;
+ }
+
+ public void setExecuteLocalOperationResponse(
+ Object executeLocalOperationResponse) {
+ this.executeLocalOperationResponse = executeLocalOperationResponse;
+ }
+
+
+}