Merge "BUG 3057 - notify added event source by topics created before"
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / restconf / impl / BrokerFacade.java
index 1cc1f783d676ed69f68c959d1024a58349dc51d9..6a3adcccd6576b8f7ac766eb521264b05a04f6b8 100644 (file)
@@ -7,44 +7,63 @@
  */
 package org.opendaylight.controller.sal.restconf.impl;
 
-import java.util.concurrent.Future;
-
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
 import javax.ws.rs.core.Response.Status;
-
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
 import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;
-import org.opendaylight.controller.sal.core.api.data.DataBrokerService;
-import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
-import org.opendaylight.controller.sal.core.api.data.DataModificationTransaction;
-import org.opendaylight.controller.sal.core.api.mount.MountInstance;
-import org.opendaylight.controller.sal.rest.impl.RestconfProvider;
+import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorTag;
+import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.controller.sal.streams.listeners.ListenerAdapter;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BrokerFacade implements DataReader<InstanceIdentifier, CompositeNode> {
-    private final static Logger LOG = LoggerFactory.getLogger( BrokerFacade.class );
+public class BrokerFacade {
+    private final static Logger LOG = LoggerFactory.getLogger(BrokerFacade.class);
 
     private final static BrokerFacade INSTANCE = new BrokerFacade();
-
-    private volatile DataBrokerService dataService;
+    private volatile DOMRpcService rpcService;
     private volatile ConsumerSession context;
+    private DOMDataBroker domDataBroker;
 
     private BrokerFacade() {
     }
 
-    public void setContext( final ConsumerSession context ) {
-        this.context = context;
+    public void setRpcService(final DOMRpcService router) {
+        rpcService = router;
     }
 
-    public void setDataService( final DataBrokerService dataService ) {
-        this.dataService = dataService;
+    public void setContext(final ConsumerSession context) {
+        this.context = context;
     }
 
     public static BrokerFacade getInstance() {
@@ -52,150 +71,229 @@ public class BrokerFacade implements DataReader<InstanceIdentifier, CompositeNod
     }
 
     private void checkPreconditions() {
-        if( context == null || dataService == null ) {
-            ResponseException _responseException = new ResponseException( Status.SERVICE_UNAVAILABLE,
-                    RestconfProvider.NOT_INITALIZED_MSG );
-            throw _responseException;
+        if (context == null || domDataBroker == null) {
+            throw new RestconfDocumentedException(Status.SERVICE_UNAVAILABLE);
         }
     }
 
-    @Override
-    public CompositeNode readConfigurationData( final InstanceIdentifier path ) {
-        this.checkPreconditions();
-
-        LOG.trace( "Read Configuration via Restconf: {}", path );
-
-        return dataService.readConfigurationData( path );
+    // READ configuration
+    public NormalizedNode<?, ?> readConfigurationData(final YangInstanceIdentifier path) {
+        checkPreconditions();
+        return readDataViaTransaction(domDataBroker.newReadOnlyTransaction(), CONFIGURATION, path);
     }
 
-    public CompositeNode readConfigurationDataBehindMountPoint( final MountInstance mountPoint,
-                                                                final InstanceIdentifier path ) {
-        this.checkPreconditions();
-
-        LOG.trace( "Read Configuration via Restconf: {}", path );
-
-        return mountPoint.readConfigurationData( path );
+    public NormalizedNode<?, ?> readConfigurationData(final DOMMountPoint mountPoint, final YangInstanceIdentifier path) {
+        final Optional<DOMDataBroker> domDataBrokerService = mountPoint.getService(DOMDataBroker.class);
+        if (domDataBrokerService.isPresent()) {
+            return readDataViaTransaction(domDataBrokerService.get().newReadOnlyTransaction(), CONFIGURATION, path);
+        }
+        throw new RestconfDocumentedException("DOM data broker service isn't available for mount point.");
     }
 
-    @Override
-    public CompositeNode readOperationalData( final InstanceIdentifier path ) {
-        this.checkPreconditions();
-
-        BrokerFacade.LOG.trace( "Read Operational via Restconf: {}", path );
+    // READ operational
+    public NormalizedNode<?, ?> readOperationalData(final YangInstanceIdentifier path) {
+        checkPreconditions();
+        return readDataViaTransaction(domDataBroker.newReadOnlyTransaction(), OPERATIONAL, path);
+    }
 
-        return dataService.readOperationalData( path );
+    public NormalizedNode<?, ?> readOperationalData(final DOMMountPoint mountPoint, final YangInstanceIdentifier path) {
+        final Optional<DOMDataBroker> domDataBrokerService = mountPoint.getService(DOMDataBroker.class);
+        if (domDataBrokerService.isPresent()) {
+            return readDataViaTransaction(domDataBrokerService.get().newReadOnlyTransaction(), OPERATIONAL, path);
+        }
+        throw new RestconfDocumentedException("DOM data broker service isn't available for mount point.");
     }
 
-    public CompositeNode readOperationalDataBehindMountPoint( final MountInstance mountPoint,
-                                                              final InstanceIdentifier path ) {
-        this.checkPreconditions();
+    // PUT configuration
+    public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPut(
+            final SchemaContext globalSchema, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
+        checkPreconditions();
+        return putDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
+    }
 
-        BrokerFacade.LOG.trace( "Read Operational via Restconf: {}", path );
+    public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPut(
+            final DOMMountPoint mountPoint, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
+        final Optional<DOMDataBroker> domDataBrokerService = mountPoint.getService(DOMDataBroker.class);
+        if (domDataBrokerService.isPresent()) {
+            return putDataViaTransaction(domDataBrokerService.get().newReadWriteTransaction(), CONFIGURATION, path,
+                    payload, mountPoint.getSchemaContext());
+        }
+        throw new RestconfDocumentedException("DOM data broker service isn't available for mount point.");
+    }
 
-        return mountPoint.readOperationalData( path );
+    // POST configuration
+    public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPost(
+            final SchemaContext globalSchema, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
+        checkPreconditions();
+        return postDataViaTransaction(domDataBroker.newReadWriteTransaction(), CONFIGURATION, path, payload, globalSchema);
     }
 
-    public RpcResult<CompositeNode> invokeRpc( final QName type, final CompositeNode payload ) {
-        this.checkPreconditions();
+    public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataPost(
+            final DOMMountPoint mountPoint, final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload) {
+        final Optional<DOMDataBroker> domDataBrokerService = mountPoint.getService(DOMDataBroker.class);
+        if (domDataBrokerService.isPresent()) {
+            return postDataViaTransaction(domDataBrokerService.get().newReadWriteTransaction(), CONFIGURATION, path,
+                    payload, mountPoint.getSchemaContext());
+        }
+        throw new RestconfDocumentedException("DOM data broker service isn't available for mount point.");
+    }
 
-        final Future<RpcResult<CompositeNode>> future = context.rpc( type, payload );
+    // DELETE configuration
+    public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataDelete(
+            final YangInstanceIdentifier path) {
+        checkPreconditions();
+        return deleteDataViaTransaction(domDataBroker.newWriteOnlyTransaction(), CONFIGURATION, path);
+    }
 
-        try {
-            return future.get();
+    public CheckedFuture<Void, TransactionCommitFailedException> commitConfigurationDataDelete(
+            final DOMMountPoint mountPoint, final YangInstanceIdentifier path) {
+        final Optional<DOMDataBroker> domDataBrokerService = mountPoint.getService(DOMDataBroker.class);
+        if (domDataBrokerService.isPresent()) {
+            return deleteDataViaTransaction(domDataBrokerService.get().newWriteOnlyTransaction(), CONFIGURATION, path);
         }
-        catch( Exception e ) {
-            throw new ResponseException( e, "Error invoking RPC " + type );
+        throw new RestconfDocumentedException("DOM data broker service isn't available for mount point.");
+    }
+
+    // RPC
+    public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(final SchemaPath type, final NormalizedNode<?, ?> input) {
+        checkPreconditions();
+        if (rpcService == null) {
+            throw new RestconfDocumentedException(Status.SERVICE_UNAVAILABLE);
         }
+        return rpcService.invokeRpc(type, input);
     }
 
-    public Future<RpcResult<TransactionStatus>> commitConfigurationDataPut( final InstanceIdentifier path,
-                                                                            final CompositeNode payload ) {
-        this.checkPreconditions();
+    public void registerToListenDataChanges(final LogicalDatastoreType datastore, final DataChangeScope scope,
+            final ListenerAdapter listener) {
+        checkPreconditions();
 
-        final DataModificationTransaction transaction = dataService.beginTransaction();
-        BrokerFacade.LOG.trace( "Put Configuration via Restconf: {}", path );
-        transaction.putConfigurationData( path, payload );
-        return transaction.commit();
-    }
+        if (listener.isListening()) {
+            return;
+        }
 
-    public Future<RpcResult<TransactionStatus>> commitConfigurationDataPutBehindMountPoint(
-            final MountInstance mountPoint, final InstanceIdentifier path, final CompositeNode payload ) {
-        this.checkPreconditions();
+        final YangInstanceIdentifier path = listener.getPath();
+        final ListenerRegistration<DOMDataChangeListener> registration = domDataBroker.registerDataChangeListener(
+                datastore, path, listener, scope);
 
-        final DataModificationTransaction transaction = mountPoint.beginTransaction();
-        BrokerFacade.LOG.trace( "Put Configuration via Restconf: {}", path );
-        transaction.putConfigurationData( path, payload );
-        return transaction.commit();
+        listener.setRegistration(registration);
     }
 
-    public Future<RpcResult<TransactionStatus>> commitConfigurationDataPost( final InstanceIdentifier path,
-                                                                             final CompositeNode payload) {
-        this.checkPreconditions();
-
-        final DataModificationTransaction transaction = dataService.beginTransaction();
-        /* check for available Node in Configuration DataStore by path */
-        CompositeNode availableNode = transaction.readConfigurationData( path );
-        if (availableNode != null) {
-            String errMsg = "Post Configuration via Restconf was not executed because data already exists";
-            BrokerFacade.LOG.warn((new StringBuilder(errMsg)).append(" : ").append(path).toString());
-            // FIXME: return correct ietf-restconf:errors -> follow specification
-            // (http://tools.ietf.org/html/draft-bierman-netconf-restconf-03#page-48)
-            throw new ResponseException(Status.CONFLICT, errMsg);
+    private NormalizedNode<?, ?> readDataViaTransaction(final DOMDataReadTransaction transaction,
+            final LogicalDatastoreType datastore, final YangInstanceIdentifier path) {
+        LOG.trace("Read " + datastore.name() + " via Restconf: {}", path);
+        final ListenableFuture<Optional<NormalizedNode<?, ?>>> listenableFuture = transaction.read(datastore, path);
+        if (listenableFuture != null) {
+            Optional<NormalizedNode<?, ?>> optional;
+            try {
+                LOG.debug("Reading result data from transaction.");
+                optional = listenableFuture.get();
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RestconfDocumentedException("Problem to get data from transaction.", e.getCause());
+
+            }
+            if (optional != null) {
+                if (optional.isPresent()) {
+                    return optional.get();
+                }
+            }
         }
-        BrokerFacade.LOG.trace( "Post Configuration via Restconf: {}", path );
-        transaction.putConfigurationData( path, payload );
-        return transaction.commit();
-    }
-
-    public Future<RpcResult<TransactionStatus>> commitConfigurationDataPostBehindMountPoint(
-            final MountInstance mountPoint, final InstanceIdentifier path, final CompositeNode payload ) {
-        this.checkPreconditions();
-
-        final DataModificationTransaction transaction = mountPoint.beginTransaction();
-        /* check for available Node in Configuration DataStore by path */
-        CompositeNode availableNode = transaction.readConfigurationData( path );
-        if (availableNode != null) {
-            String errMsg = "Post Configuration via Restconf was not executed because data already exists";
-            BrokerFacade.LOG.warn((new StringBuilder(errMsg)).append(" : ").append(path).toString());
-            // FIXME: return correct ietf-restconf:errors -> follow specification
-            // (http://tools.ietf.org/html/draft-bierman-netconf-restconf-03#page-48)
-            throw new ResponseException(Status.CONFLICT, errMsg);
+        return null;
+    }
+
+    private CheckedFuture<Void, TransactionCommitFailedException> postDataViaTransaction(
+            final DOMDataReadWriteTransaction rWTransaction, final LogicalDatastoreType datastore,
+            final YangInstanceIdentifier parentPath, final NormalizedNode<?, ?> payload, final SchemaContext schemaContext) {
+        // FIXME: This is doing correct post for container and list children
+        //        not sure if this will work for choice case
+        if(payload instanceof MapNode) {
+            final YangInstanceIdentifier mapPath = parentPath.node(payload.getIdentifier());
+            final NormalizedNode<?, ?> emptySubtree = ImmutableNodes.fromInstanceId(schemaContext, mapPath);
+            rWTransaction.merge(datastore, YangInstanceIdentifier.create(emptySubtree.getIdentifier()), emptySubtree);
+            ensureParentsByMerge(datastore, mapPath, rWTransaction, schemaContext);
+            for(final MapEntryNode child : ((MapNode) payload).getValue()) {
+                final YangInstanceIdentifier childPath = mapPath.node(child.getIdentifier());
+                checkItemDoesNotExists(rWTransaction, datastore, childPath);
+                rWTransaction.put(datastore, childPath, child);
+            }
+        } else {
+            final YangInstanceIdentifier path;
+            if(payload instanceof MapEntryNode) {
+                path = parentPath.node(payload.getNodeType()).node(payload.getIdentifier());
+            } else {
+                path = parentPath.node(payload.getIdentifier());
+            }
+            checkItemDoesNotExists(rWTransaction,datastore, path);
+            ensureParentsByMerge(datastore, path, rWTransaction, schemaContext);
+            rWTransaction.put(datastore, path, payload);
         }
-        BrokerFacade.LOG.trace( "Post Configuration via Restconf: {}", path );
-        transaction.putConfigurationData( path, payload );
-        return transaction.commit();
+        return rWTransaction.submit();
     }
 
-    public Future<RpcResult<TransactionStatus>> commitConfigurationDataDelete( final InstanceIdentifier path ) {
-        this.checkPreconditions();
+    private void checkItemDoesNotExists(final DOMDataReadWriteTransaction rWTransaction,final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+        final ListenableFuture<Boolean> futureDatastoreData = rWTransaction.exists(store, path);
+        try {
+            if (futureDatastoreData.get()) {
+                final String errMsg = "Post Configuration via Restconf was not executed because data already exists";
+                LOG.debug(errMsg + ":{}", path);
+                rWTransaction.cancel();
+                throw new RestconfDocumentedException("Data already exists for path: " + path, ErrorType.PROTOCOL,
+                        ErrorTag.DATA_EXISTS);
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.trace("It wasn't possible to get data loaded from datastore at path " + path);
+        }
 
-        final DataModificationTransaction transaction = dataService.beginTransaction();
-        LOG.info( "Delete Configuration via Restconf: {}", path );
-        transaction.removeConfigurationData( path );
-        return transaction.commit();
     }
 
-    public Future<RpcResult<TransactionStatus>> commitConfigurationDataDeleteBehindMountPoint(
-                                          final MountInstance mountPoint, final InstanceIdentifier path ) {
-        this.checkPreconditions();
+    private CheckedFuture<Void, TransactionCommitFailedException> putDataViaTransaction(
+            final DOMDataReadWriteTransaction writeTransaction, final LogicalDatastoreType datastore,
+            final YangInstanceIdentifier path, final NormalizedNode<?, ?> payload, final SchemaContext schemaContext) {
+        LOG.trace("Put " + datastore.name() + " via Restconf: {}", path);
+        ensureParentsByMerge(datastore, path, writeTransaction, schemaContext);
+        writeTransaction.put(datastore, path, payload);
+        return writeTransaction.submit();
+    }
+
+    private CheckedFuture<Void, TransactionCommitFailedException> deleteDataViaTransaction(
+            final DOMDataWriteTransaction writeTransaction, final LogicalDatastoreType datastore,
+            final YangInstanceIdentifier path) {
+        LOG.trace("Delete " + datastore.name() + " via Restconf: {}", path);
+        writeTransaction.delete(datastore, path);
+        return writeTransaction.submit();
+    }
 
-        final DataModificationTransaction transaction = mountPoint.beginTransaction();
-        LOG.info( "Delete Configuration via Restconf: {}", path );
-        transaction.removeConfigurationData( path );
-        return transaction.commit();
+    public void setDomDataBroker(final DOMDataBroker domDataBroker) {
+        this.domDataBroker = domDataBroker;
     }
 
-    public void registerToListenDataChanges( final ListenerAdapter listener ) {
-        this.checkPreconditions();
+    private void ensureParentsByMerge(final LogicalDatastoreType store,
+                                      final YangInstanceIdentifier normalizedPath, final DOMDataReadWriteTransaction rwTx, final SchemaContext schemaContext) {
+        final List<PathArgument> normalizedPathWithoutChildArgs = new ArrayList<>();
+        YangInstanceIdentifier rootNormalizedPath = null;
+
+        final Iterator<PathArgument> it = normalizedPath.getPathArguments().iterator();
+
+        while(it.hasNext()) {
+            final PathArgument pathArgument = it.next();
+            if(rootNormalizedPath == null) {
+                rootNormalizedPath = YangInstanceIdentifier.create(pathArgument);
+            }
+
+            // Skip last element, its not a parent
+            if(it.hasNext()) {
+                normalizedPathWithoutChildArgs.add(pathArgument);
+            }
+        }
 
-        if( listener.isListening() ) {
+        // No parent structure involved, no need to ensure parents
+        if(normalizedPathWithoutChildArgs.isEmpty()) {
             return;
         }
 
-        InstanceIdentifier path = listener.getPath();
-        final ListenerRegistration<DataChangeListener> registration =
-                                             dataService.registerDataChangeListener( path, listener );
+        Preconditions.checkArgument(rootNormalizedPath != null, "Empty path received");
 
-        listener.setRegistration( registration );
+        final NormalizedNode<?, ?> parentStructure =
+                ImmutableNodes.fromInstanceId(schemaContext, YangInstanceIdentifier.create(normalizedPathWithoutChildArgs));
+        rwTx.merge(store, rootNormalizedPath, parentStructure);
     }
 }