Added support for binding-independent RPCs
[controller.git] / opendaylight / sal / yang-prototype / sal / sal-broker-impl / src / main / java / org / opendaylight / controller / sal / core / impl / DataBrokerModule.java
diff --git a/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/DataBrokerModule.java b/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/DataBrokerModule.java
new file mode 100644 (file)
index 0000000..852f6b6
--- /dev/null
@@ -0,0 +1,419 @@
+/*\r
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+package org.opendaylight.controller.sal.core.impl;\r
+\r
+import java.util.ArrayList;\r
+import java.util.Collections;\r
+import java.util.HashSet;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Future;\r
+\r
+import org.opendaylight.controller.sal.common.DataStoreIdentifier;\r
+import org.opendaylight.controller.sal.common.util.Rpcs;\r
+import org.opendaylight.controller.sal.core.api.BrokerService;\r
+import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession;\r
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;\r
+import org.opendaylight.controller.sal.core.api.Consumer.ConsumerFunctionality;\r
+import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;\r
+import org.opendaylight.controller.sal.core.api.data.DataBrokerService;\r
+import org.opendaylight.controller.sal.core.api.data.DataCommitHandler;\r
+import org.opendaylight.controller.sal.core.api.data.DataProviderService;\r
+import org.opendaylight.controller.sal.core.api.data.DataValidator;\r
+import org.opendaylight.controller.sal.core.api.data.DataCommitHandler.CommitTransaction;\r
+import org.opendaylight.controller.sal.core.api.data.DataProviderService.DataRefresher;\r
+import org.opendaylight.controller.sal.core.spi.BrokerModule;\r
+import org.opendaylight.controller.yang.common.RpcError;\r
+import org.opendaylight.controller.yang.common.RpcResult;\r
+import org.opendaylight.controller.yang.data.api.CompositeNode;\r
+import org.opendaylight.controller.yang.data.api.CompositeNodeModification;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+import com.google.common.collect.ImmutableSet;\r
+\r
+public class DataBrokerModule implements BrokerModule {\r
+\r
+    private static final Logger log = LoggerFactory\r
+            .getLogger(DataBrokerModule.class);\r
+\r
+    private static final Set<Class<? extends ProviderFunctionality>> SUPPORTED_PROVIDER_FUNCTIONALITY = ImmutableSet\r
+            .of((Class<? extends ProviderFunctionality>) DataValidator.class,\r
+                    DataRefresher.class, DataCommitHandler.class);\r
+\r
+    private static final Set<Class<? extends BrokerService>> PROVIDED_SESSION_SERVICES = ImmutableSet\r
+            .of((Class<? extends BrokerService>) DataBrokerService.class,\r
+                    DataProviderService.class);\r
+\r
+    private Map<DataStoreIdentifier, StoreContext> storeContext;\r
+\r
+    private ExecutorService executor;\r
+    \r
+    private SequentialCommitHandlerCoordinator coordinator = new SequentialCommitHandlerCoordinator();\r
+\r
+    @Override\r
+    public Set<Class<? extends BrokerService>> getProvidedServices() {\r
+        return PROVIDED_SESSION_SERVICES;\r
+    }\r
+\r
+    @Override\r
+    public Set<Class<? extends ProviderFunctionality>> getSupportedProviderFunctionality() {\r
+        return SUPPORTED_PROVIDER_FUNCTIONALITY;\r
+    }\r
+\r
+    @Override\r
+    public Set<Class<? extends ConsumerFunctionality>> getSupportedConsumerFunctionality() {\r
+        return Collections.emptySet();\r
+    }\r
+\r
+    @Override\r
+    public <T extends BrokerService> T getServiceForSession(Class<T> service,\r
+            ConsumerSession session) {\r
+        if (DataProviderService.class.equals(service)\r
+                && session instanceof ProviderSession) {\r
+            @SuppressWarnings("unchecked")\r
+            T ret = (T) newDataProviderService(session);\r
+            return ret;\r
+        } else if (DataBrokerService.class.equals(service)) {\r
+\r
+            @SuppressWarnings("unchecked")\r
+            T ret = (T) newDataConsumerService(session);\r
+            return ret;\r
+        }\r
+\r
+        throw new IllegalArgumentException(\r
+                "The requested session-specific service is not provided by this module.");\r
+    }\r
+\r
+    private DataProviderService newDataProviderService(ConsumerSession session) {\r
+        return new DataProviderSession();\r
+    }\r
+\r
+    private DataBrokerService newDataConsumerService(ConsumerSession session) {\r
+        return new DataConsumerSession();\r
+    }\r
+\r
+    private StoreContext context(DataStoreIdentifier store) {\r
+        return storeContext.get(store);\r
+    }\r
+\r
+    private static class StoreContext {\r
+        private Set<DataCommitHandler> commitHandlers = Collections\r
+                .synchronizedSet(new HashSet<DataCommitHandler>());\r
+        private Set<DataValidator> validators = Collections\r
+                .synchronizedSet(new HashSet<DataValidator>());\r
+        private Set<DataRefresher> refreshers = Collections\r
+                .synchronizedSet(new HashSet<DataRefresher>());\r
+    }\r
+\r
+    private class DataConsumerSession implements DataBrokerService {\r
+\r
+        @Override\r
+        public CompositeNode getData(DataStoreIdentifier store) {\r
+            // TODO Implement this method\r
+            throw new UnsupportedOperationException("Not implemented");\r
+        }\r
+\r
+        @Override\r
+        public CompositeNode getData(DataStoreIdentifier store,\r
+                CompositeNode filter) {\r
+            // TODO Implement this method\r
+            throw new UnsupportedOperationException("Not implemented");\r
+        }\r
+\r
+        @Override\r
+        public CompositeNode getCandidateData(DataStoreIdentifier store) {\r
+            // TODO Implement this method\r
+            throw new UnsupportedOperationException("Not implemented");\r
+        }\r
+\r
+        @Override\r
+        public CompositeNode getCandidateData(DataStoreIdentifier store,\r
+                CompositeNode filter) {\r
+            // TODO Implement this method\r
+            throw new UnsupportedOperationException("Not implemented");\r
+        }\r
+\r
+        @Override\r
+        public RpcResult<CompositeNode> editCandidateData(\r
+                DataStoreIdentifier store, CompositeNodeModification changeSet) {\r
+            // TODO Implement this method\r
+            throw new UnsupportedOperationException("Not implemented");\r
+        }\r
+\r
+        @Override\r
+        public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {\r
+            // TODO Implement this method\r
+            throw new UnsupportedOperationException("Not implemented");\r
+        }\r
+\r
+        @Override\r
+        public void closeSession() {\r
+            // TODO Implement this method\r
+            throw new UnsupportedOperationException("Not implemented");\r
+        }\r
+\r
+        @Override\r
+        public Set<DataStoreIdentifier> getDataStores() {\r
+            // TODO Auto-generated method stub\r
+            return null;\r
+        }\r
+\r
+    }\r
+\r
+    private class DataProviderSession extends DataConsumerSession implements\r
+            DataProviderService {\r
+\r
+        private Set<DataCommitHandler> providerCommitHandlers = new HashSet<DataCommitHandler>();\r
+        private Set<DataValidator> providerValidators = new HashSet<DataValidator>();\r
+        private Set<DataRefresher> providerRefreshers = new HashSet<DataRefresher>();\r
+\r
+        @Override\r
+        public void addValidator(DataStoreIdentifier store,\r
+                DataValidator validator) {\r
+            if (validator == null)\r
+                throw new IllegalArgumentException(\r
+                        "Validator should not be null");\r
+\r
+            providerValidators.add(validator);\r
+            context(store).validators.add(validator);\r
+        }\r
+\r
+        @Override\r
+        public void removeValidator(DataStoreIdentifier store,\r
+                DataValidator validator) {\r
+            if (validator == null)\r
+                throw new IllegalArgumentException(\r
+                        "Validator should not be null");\r
+\r
+            providerValidators.remove(validator);\r
+            context(store).validators.remove(validator);\r
+        }\r
+\r
+        @Override\r
+        public void addCommitHandler(DataStoreIdentifier store,\r
+                DataCommitHandler provider) {\r
+            if (provider == null)\r
+                throw new IllegalArgumentException(\r
+                        "CommitHandler should not be null");\r
+\r
+            providerCommitHandlers.add(provider);\r
+            context(store).commitHandlers.add(provider);\r
+        }\r
+\r
+        @Override\r
+        public void removeCommitHandler(DataStoreIdentifier store,\r
+                DataCommitHandler provider) {\r
+            if (provider == null)\r
+                throw new IllegalArgumentException(\r
+                        "CommitHandler should not be null");\r
+\r
+            providerCommitHandlers.remove(provider);\r
+            context(store).commitHandlers.remove(provider);\r
+        }\r
+\r
+        @Override\r
+        public void addRefresher(DataStoreIdentifier store,\r
+                DataRefresher provider) {\r
+            if (provider == null)\r
+                throw new IllegalArgumentException(\r
+                        "Refresher should not be null");\r
+\r
+            providerRefreshers.add(provider);\r
+            context(store).refreshers.add(provider);\r
+        }\r
+\r
+        @Override\r
+        public void removeRefresher(DataStoreIdentifier store,\r
+                DataRefresher provider) {\r
+            if (provider == null)\r
+                throw new IllegalArgumentException(\r
+                        "Refresher should not be null");\r
+\r
+            providerRefreshers.remove(provider);\r
+            context(store).refreshers.remove(provider);\r
+        }\r
+\r
+    }\r
+\r
+    private class SequentialCommitHandlerCoordinator implements\r
+            DataCommitHandler {\r
+\r
+        @Override\r
+        public RpcResult<CommitTransaction> requestCommit(\r
+                DataStoreIdentifier store) {\r
+            List<RpcError> errors = new ArrayList<RpcError>();\r
+            Set<CommitTransaction> transactions = new HashSet<DataCommitHandler.CommitTransaction>();\r
+            boolean successful = true;\r
+\r
+            for (DataCommitHandler commitHandler : context(store).commitHandlers) {\r
+                try {\r
+                    RpcResult<CommitTransaction> partialResult = commitHandler\r
+                            .requestCommit(store);\r
+                    successful = partialResult.isSuccessful() & successful;\r
+                    if (partialResult.isSuccessful()) {\r
+                        transactions.add(partialResult.getResult());\r
+                    }\r
+\r
+                    errors.addAll(partialResult.getErrors());\r
+                } catch (Exception e) {\r
+                    log.error("Uncaught exception prevented commit request."\r
+                            + e.getMessage(), e);\r
+                    successful = false;\r
+                    // FIXME: Add RPC Error with exception.\r
+                }\r
+                if (successful == false)\r
+                    break;\r
+            }\r
+            CommitTransaction transaction = new SequentialCommitTransaction(\r
+                    store, transactions);\r
+            return Rpcs.getRpcResult(successful, transaction, errors);\r
+        }\r
+\r
+        @Override\r
+        public Set<DataStoreIdentifier> getSupportedDataStores() {\r
+            return Collections.emptySet();\r
+        }\r
+    }\r
+\r
+    private class SequentialCommitTransaction implements CommitTransaction {\r
+\r
+        final Set<CommitTransaction> transactions;\r
+        final DataStoreIdentifier store;\r
+\r
+        public SequentialCommitTransaction(DataStoreIdentifier s,\r
+                Set<CommitTransaction> t) {\r
+            transactions = t;\r
+            store = s;\r
+        }\r
+\r
+        @Override\r
+        public RpcResult<Void> finish() {\r
+            List<RpcError> errors = new ArrayList<RpcError>();\r
+            boolean successful = true;\r
+\r
+            for (CommitTransaction commitHandler : transactions) {\r
+                try {\r
+                    RpcResult<Void> partialResult = commitHandler.finish();\r
+                    successful = partialResult.isSuccessful() & successful;\r
+                    errors.addAll(partialResult.getErrors());\r
+                } catch (Exception e) {\r
+                    log.error(\r
+                            "Uncaught exception prevented finishing of commit."\r
+                                    + e.getMessage(), e);\r
+                    successful = false;\r
+                    // FIXME: Add RPC Error with exception.\r
+                }\r
+                if (successful == false)\r
+                    break;\r
+            }\r
+\r
+            return Rpcs.getRpcResult(successful, null, errors);\r
+        }\r
+\r
+        @Override\r
+        public RpcResult<Void> rollback() {\r
+            List<RpcError> errors = new ArrayList<RpcError>();\r
+            boolean successful = true;\r
+\r
+            for (CommitTransaction commitHandler : transactions) {\r
+                try {\r
+                    RpcResult<Void> partialResult = commitHandler.rollback();\r
+                    successful = partialResult.isSuccessful() & successful;\r
+                    errors.addAll(partialResult.getErrors());\r
+                } catch (Exception e) {\r
+                    log.error(\r
+                            "Uncaught exception prevented rollback of commit."\r
+                                    + e.getMessage(), e);\r
+                    successful = false;\r
+                    // FIXME: Add RPC Error with exception.\r
+                }\r
+                if (successful == false)\r
+                    break;\r
+            }\r
+\r
+            return Rpcs.getRpcResult(successful, null, errors);\r
+        }\r
+\r
+        @Override\r
+        public DataStoreIdentifier getDataStore() {\r
+            return this.store;\r
+        }\r
+\r
+        @Override\r
+        public DataCommitHandler getHandler() {\r
+            return coordinator;\r
+        }\r
+    }\r
+\r
+    private class ValidationCoordinator implements DataValidator {\r
+\r
+        private final DataStoreIdentifier store;\r
+\r
+        ValidationCoordinator(DataStoreIdentifier store) {\r
+            this.store = store;\r
+        }\r
+\r
+        @Override\r
+        public RpcResult<Void> validate(CompositeNode toValidate) {\r
+            List<RpcError> errors = new ArrayList<RpcError>();\r
+            boolean successful = true;\r
+\r
+            for (DataValidator validator : context(store).validators) {\r
+                try {\r
+                    RpcResult<Void> partialResult = validator\r
+                            .validate(toValidate);\r
+                    successful = partialResult.isSuccessful() & successful;\r
+                    errors.addAll(partialResult.getErrors());\r
+                } catch (Exception e) {\r
+                    log.error(\r
+                            "Uncaught exception prevented validation."\r
+                                    + e.getMessage(), e);\r
+                    successful = false;\r
+                    // FIXME: Add RPC Error with exception.\r
+                }\r
+                if (successful == false)\r
+                    break;\r
+            }\r
+\r
+            return Rpcs.getRpcResult(successful, null, errors);\r
+        }\r
+\r
+        @Override\r
+        public Set<DataStoreIdentifier> getSupportedDataStores() {\r
+            return Collections.emptySet();\r
+        }\r
+\r
+    }\r
+\r
+    private class DataRefreshCoordinator implements DataRefresher {\r
+\r
+        private final DataStoreIdentifier store;\r
+\r
+        DataRefreshCoordinator(DataStoreIdentifier store) {\r
+            this.store = store;\r
+        }\r
+\r
+        @Override\r
+        public void refreshData() {\r
+\r
+            for (DataRefresher refresher : context(store).refreshers) {\r
+                try {\r
+                    refresher.refreshData();\r
+                } catch (Exception e) {\r
+                    log.error(\r
+                            "Uncaught exception during refresh of data: "\r
+                                    + e.getMessage(), e);\r
+                }\r
+\r
+            }\r
+        }\r
+    }\r
+}\r