Added implementation of DataBrokerImplRuntimeMXBean as DataBrokerRuntimeMXBeanImpl;
Modified DataBrokerImplModule with DataBrokerRuntimeMXBeanImpl;
Added "submitted" leaf into opendaylight-md-sal-common.yang;
Signed-off-by: Lukas Sedlak <lsedlak@cisco.com>
Change-Id: I5494c7175b37c14e39d6b2b3d87fccd3358f1a57
-/**
-* Generated file
-
-* Generated from: yang module name: opendaylight-sal-binding-broker-impl yang module local name: binding-broker-impl
-* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
-* Generated at: Wed Nov 20 17:33:01 CET 2013
-*
-* Do not modify this file unless it is present under src/main directory
-*/
-package org.opendaylight.controller.config.yang.md.sal.binding.impl;
-
-import org.opendaylight.controller.sal.binding.impl.BindingAwareBrokerImpl;
-import org.osgi.framework.BundleContext;
-
-/**
-*
-*/
-public final class BindingBrokerImplModule extends org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractBindingBrokerImplModule {
-
- private BundleContext bundleContext;
-
- public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
- super(identifier, dependencyResolver);
- }
-
- public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, BindingBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) {
- super(identifier, dependencyResolver, oldModule, oldInstance);
- }
-
- @Override
- public void validate(){
- super.validate();
- // Add custom validation for module attributes here.
- }
-
- @Override
- public java.lang.AutoCloseable createInstance() {
- BindingAwareBrokerImpl broker = new BindingAwareBrokerImpl(getBundleContext());
- broker.setDataBroker(getDataBrokerDependency());
- broker.setNotifyBroker(getNotificationServiceDependency());
- broker.start();
- return broker;
- }
-
- public BundleContext getBundleContext() {
- return bundleContext;
- }
-
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-}
+/**\r
+* Generated file\r
+\r
+* Generated from: yang module name: opendaylight-sal-binding-broker-impl yang module local name: binding-broker-impl\r
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator\r
+* Generated at: Wed Nov 20 17:33:01 CET 2013\r
+*\r
+* Do not modify this file unless it is present under src/main directory\r
+*/\r
+package org.opendaylight.controller.config.yang.md.sal.binding.impl;\r
+\r
+import org.opendaylight.controller.sal.binding.impl.BindingAwareBrokerImpl;\r
+import org.osgi.framework.BundleContext;\r
+\r
+import com.google.common.base.Preconditions;\r
+\r
+/**\r
+*\r
+*/\r
+public final class BindingBrokerImplModule extends org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractBindingBrokerImplModule {\r
+\r
+ private BundleContext bundleContext;\r
+\r
+ public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {\r
+ super(identifier, dependencyResolver);\r
+ }\r
+\r
+ public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, BindingBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) {\r
+ super(identifier, dependencyResolver, oldModule, oldInstance);\r
+ }\r
+\r
+ @Override\r
+ public void validate(){\r
+ super.validate();\r
+ }\r
+\r
+ @Override\r
+ public java.lang.AutoCloseable createInstance() {\r
+ BindingAwareBrokerImpl broker = new BindingAwareBrokerImpl(getBundleContext());\r
+ broker.setDataBroker(getDataBrokerDependency());\r
+ broker.setNotifyBroker(getNotificationServiceDependency());\r
+ broker.start();\r
+ return broker;\r
+ }\r
+\r
+ public BundleContext getBundleContext() {\r
+ return bundleContext;\r
+ }\r
+\r
+ public void setBundleContext(BundleContext bundleContext) {\r
+ this.bundleContext = bundleContext;\r
+ }\r
+}\r
-/**
- * Generated file
-
- * Generated from: yang module name: opendaylight-sal-binding-broker-impl yang module local name: binding-data-broker
- * Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
- * Generated at: Wed Nov 20 17:33:01 CET 2013
- *
- * Do not modify this file unless it is present under src/main directory
- */
-package org.opendaylight.controller.config.yang.md.sal.binding.impl;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
-import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentDataServiceConnector;
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentMappingService;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.data.DataProviderService;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.osgi.framework.BundleContext;
-
-import com.google.common.util.concurrent.MoreExecutors;
-
-/**
-*
-*/
-public final class DataBrokerImplModule extends
- org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractDataBrokerImplModule {
-
- private BundleContext bundleContext;
-
- public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
- org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
- super(identifier, dependencyResolver);
- }
-
- public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
- org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
- DataBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) {
- super(identifier, dependencyResolver, oldModule, oldInstance);
- }
-
- @Override
- public void validate() {
- super.validate();
- }
-
- @Override
- public java.lang.AutoCloseable createInstance() {
- DataBrokerImpl dataBindingBroker = new DataBrokerImpl();
-
- // FIXME: obtain via dependency management
- ExecutorService executor = Executors.newCachedThreadPool();
- ExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
- dataBindingBroker.setExecutor(listeningExecutor);
-
-
-
- Broker domBroker = getDomBrokerDependency();
- BindingIndependentMappingService mappingService = getMappingServiceDependency();
-
- if (domBroker != null && mappingService != null) {
- BindingIndependentDataServiceConnector runtimeMapping = new BindingIndependentDataServiceConnector();
- runtimeMapping.setMappingService(mappingService);
- runtimeMapping.setBaDataService(dataBindingBroker);
- domBroker.registerProvider(runtimeMapping, getBundleContext());
- }
-
- return dataBindingBroker;
- }
-
- public BundleContext getBundleContext() {
- return bundleContext;
- }
-
- public void setBundleContext(BundleContext bundleContext2) {
- this.bundleContext = bundleContext2;
- }
-}
+/**\r
+ * Generated file\r
+\r
+ * Generated from: yang module name: opendaylight-sal-binding-broker-impl yang module local name: binding-data-broker\r
+ * Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator\r
+ * Generated at: Wed Nov 20 17:33:01 CET 2013\r
+ *\r
+ * Do not modify this file unless it is present under src/main directory\r
+ */\r
+package org.opendaylight.controller.config.yang.md.sal.binding.impl;\r
+\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.ScheduledExecutorService;\r
+\r
+import org.opendaylight.controller.config.yang.md.sal.binding.statistics.DataBrokerRuntimeMXBeanImpl;\r
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;\r
+import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;\r
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentDataServiceConnector;\r
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentMappingService;\r
+import org.opendaylight.controller.sal.core.api.Broker;\r
+import org.opendaylight.controller.sal.core.api.data.DataProviderService;\r
+import org.opendaylight.yangtools.yang.binding.DataObject;\r
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
+import org.osgi.framework.BundleContext;\r
+\r
+import com.google.common.util.concurrent.MoreExecutors;\r
+\r
+/**\r
+*\r
+*/\r
+public final class DataBrokerImplModule extends\r
+ org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractDataBrokerImplModule {\r
+\r
+ private BundleContext bundleContext;\r
+\r
+ public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,\r
+ org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {\r
+ super(identifier, dependencyResolver);\r
+ }\r
+\r
+ public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,\r
+ org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,\r
+ DataBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) {\r
+ super(identifier, dependencyResolver, oldModule, oldInstance);\r
+ }\r
+\r
+ @Override\r
+ public void validate() {\r
+ super.validate();\r
+ }\r
+\r
+ @Override\r
+ public java.lang.AutoCloseable createInstance() {\r
+ DataBrokerRuntimeMXBeanImpl dataBindingBroker = new DataBrokerRuntimeMXBeanImpl();\r
+ \r
+ // FIXME: obtain via dependency management\r
+ ExecutorService executor = Executors.newCachedThreadPool();\r
+ ExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);\r
+ dataBindingBroker.setExecutor(listeningExecutor);\r
+\r
+ Broker domBroker = getDomBrokerDependency();\r
+ BindingIndependentMappingService mappingService = getMappingServiceDependency();\r
+ \r
+ if (domBroker != null && mappingService != null) {\r
+ BindingIndependentDataServiceConnector runtimeMapping = new BindingIndependentDataServiceConnector();\r
+ runtimeMapping.setMappingService(mappingService);\r
+ runtimeMapping.setBaDataService(dataBindingBroker);\r
+ domBroker.registerProvider(runtimeMapping, getBundleContext());\r
+ }\r
+ getRootRuntimeBeanRegistratorWrapper().register(dataBindingBroker);\r
+ return dataBindingBroker;\r
+ }\r
+\r
+ public BundleContext getBundleContext() {\r
+ return bundleContext;\r
+ }\r
+\r
+ public void setBundleContext(BundleContext bundleContext2) {\r
+ this.bundleContext = bundleContext2;\r
+ }\r
+}\r
--- /dev/null
+package org.opendaylight.controller.config.yang.md.sal.binding.statistics;\r
+\r
+import org.opendaylight.controller.config.yang.md.sal.binding.impl.DataBrokerImplRuntimeMXBean;\r
+import org.opendaylight.controller.config.yang.md.sal.binding.impl.Transactions;\r
+import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;\r
+\r
+public class DataBrokerRuntimeMXBeanImpl extends DataBrokerImpl implements DataBrokerImplRuntimeMXBean {\r
+ \r
+ private Transactions transactions = new Transactions();\r
+ \r
+ @Override\r
+ public Transactions getTransactions() {\r
+ transactions.setCreated(getCreatedTransactionsCount().get());\r
+ transactions.setSubmitted(getSubmittedTransactionsCount().get());\r
+ transactions.setSuccessful(getFinishedTransactionsCount().get());\r
+ transactions.setFailed(getFailedTransactionsCount().get());\r
+ return transactions;\r
+ }\r
+}\r
-package org.opendaylight.controller.sal.binding.impl;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
-import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataBroker;
-import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
-import org.opendaylight.controller.sal.binding.impl.util.BindingAwareDataReaderRouter;
-import org.opendaylight.controller.sal.common.DataStoreIdentifier;
-import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.DataRoot;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
-
-public class DataBrokerImpl extends AbstractDataBroker<InstanceIdentifier<? extends DataObject>, DataObject, DataChangeListener> implements
- DataProviderService, AutoCloseable {
-
-
-
- private final AtomicLong nextTransaction = new AtomicLong();
-
- public DataBrokerImpl() {
- setDataReadRouter(new BindingAwareDataReaderRouter());
- }
-
- @Override
- public DataTransactionImpl beginTransaction() {
- String transactionId = "BA-" + nextTransaction.getAndIncrement();
- return new DataTransactionImpl(transactionId,this);
- }
-
- @Override
- @Deprecated
- public <T extends DataRoot> T getData(DataStoreIdentifier store, Class<T> rootType) {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- @Deprecated
- public <T extends DataRoot> T getData(DataStoreIdentifier store, T filter) {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- @Deprecated
- public <T extends DataRoot> T getCandidateData(DataStoreIdentifier store, Class<T> rootType) {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- @Deprecated
- public <T extends DataRoot> T getCandidateData(DataStoreIdentifier store, T filter) {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- @Deprecated
- public RpcResult<DataRoot> editCandidateData(DataStoreIdentifier store, DataRoot changeSet) {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- @Deprecated
- public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- @Deprecated
- public DataObject getData(InstanceIdentifier<? extends DataObject> data) {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- @Deprecated
- public DataObject getConfigurationData(InstanceIdentifier<?> data) {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- @Deprecated
- public void registerChangeListener(InstanceIdentifier<? extends DataObject> path, DataChangeListener changeListener) {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- @Deprecated
- public void unregisterChangeListener(InstanceIdentifier<? extends DataObject> path,
- DataChangeListener changeListener) {
- throw new UnsupportedOperationException("Deprecated");
- }
-
- @Override
- public void close() throws Exception {
-
- }
+package org.opendaylight.controller.sal.binding.impl;\r
+\r
+import java.util.concurrent.Future;\r
+import java.util.concurrent.atomic.AtomicLong;\r
+\r
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataBroker;\r
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;\r
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;\r
+import org.opendaylight.controller.sal.binding.impl.util.BindingAwareDataReaderRouter;\r
+import org.opendaylight.controller.sal.common.DataStoreIdentifier;\r
+import org.opendaylight.yangtools.yang.binding.DataObject;\r
+import org.opendaylight.yangtools.yang.binding.DataRoot;\r
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
+import org.opendaylight.yangtools.yang.common.RpcResult;\r
+\r
+\r
+public class DataBrokerImpl extends AbstractDataBroker<InstanceIdentifier<? extends DataObject>, DataObject, DataChangeListener> implements\r
+ DataProviderService, AutoCloseable {\r
+\r
+ private final AtomicLong nextTransaction = new AtomicLong();\r
+ private final AtomicLong createdTransactionsCount = new AtomicLong();\r
+ \r
+ public AtomicLong getCreatedTransactionsCount() {\r
+ return createdTransactionsCount;\r
+ }\r
+\r
+ public DataBrokerImpl() {\r
+ setDataReadRouter(new BindingAwareDataReaderRouter());\r
+ }\r
+\r
+ @Override\r
+ public DataTransactionImpl beginTransaction() {\r
+ String transactionId = "BA-" + nextTransaction.getAndIncrement();\r
+ createdTransactionsCount.getAndIncrement();\r
+ return new DataTransactionImpl(transactionId,this);\r
+ }\r
+\r
+ @Override\r
+ @Deprecated\r
+ public <T extends DataRoot> T getData(DataStoreIdentifier store, Class<T> rootType) {\r
+ throw new UnsupportedOperationException("Deprecated");\r
+ }\r
+\r
+ @Override\r
+ @Deprecated\r
+ public <T extends DataRoot> T getData(DataStoreIdentifier store, T filter) {\r
+ throw new UnsupportedOperationException("Deprecated");\r
+ }\r
+\r
+ @Override\r
+ @Deprecated\r
+ public <T extends DataRoot> T getCandidateData(DataStoreIdentifier store, Class<T> rootType) {\r
+ throw new UnsupportedOperationException("Deprecated");\r
+ }\r
+\r
+ @Override\r
+ @Deprecated\r
+ public <T extends DataRoot> T getCandidateData(DataStoreIdentifier store, T filter) {\r
+ throw new UnsupportedOperationException("Deprecated");\r
+ }\r
+\r
+ @Override\r
+ @Deprecated\r
+ public RpcResult<DataRoot> editCandidateData(DataStoreIdentifier store, DataRoot changeSet) {\r
+ throw new UnsupportedOperationException("Deprecated");\r
+ }\r
+\r
+ @Override\r
+ @Deprecated\r
+ public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {\r
+ throw new UnsupportedOperationException("Deprecated");\r
+ }\r
+\r
+ @Override\r
+ @Deprecated\r
+ public DataObject getData(InstanceIdentifier<? extends DataObject> data) {\r
+ throw new UnsupportedOperationException("Deprecated");\r
+ }\r
+\r
+ @Override\r
+ @Deprecated\r
+ public DataObject getConfigurationData(InstanceIdentifier<?> data) {\r
+ throw new UnsupportedOperationException("Deprecated");\r
+ }\r
+\r
+ @Override\r
+ @Deprecated\r
+ public void registerChangeListener(InstanceIdentifier<? extends DataObject> path, DataChangeListener changeListener) {\r
+ throw new UnsupportedOperationException("Deprecated");\r
+ }\r
+\r
+ @Override\r
+ @Deprecated\r
+ public void unregisterChangeListener(InstanceIdentifier<? extends DataObject> path,\r
+ DataChangeListener changeListener) {\r
+ throw new UnsupportedOperationException("Deprecated");\r
+ }\r
+ \r
+ @Override\r
+ public void close() throws Exception {\r
+ \r
+ }\r
}
\ No newline at end of file
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.binding.impl
-
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService
-import org.opendaylight.yangtools.yang.binding.Notification
-import com.google.common.collect.Multimap
-import org.opendaylight.controller.sal.binding.api.NotificationListener
-import com.google.common.collect.HashMultimap
-import java.util.concurrent.ExecutorService
-import java.util.Collection
-import org.opendaylight.yangtools.concepts.Registration
-import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory
-import org.opendaylight.yangtools.concepts.ListenerRegistration
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
-import java.util.Collections
-import org.slf4j.LoggerFactory
-import java.util.concurrent.Callable
-
-class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
-
- val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;
-
- @Property
- var ExecutorService executor;
-
- new(ExecutorService executor) {
- listeners = HashMultimap.create()
- this.executor = executor;
- }
-
- @Deprecated
- override <T extends Notification> addNotificationListener(Class<T> notificationType,
- NotificationListener<T> listener) {
- listeners.put(notificationType, listener)
- }
-
- @Deprecated
- override <T extends Notification> removeNotificationListener(Class<T> notificationType,
- NotificationListener<T> listener) {
- listeners.remove(notificationType, listener)
- }
-
- override notify(Notification notification) {
- publish(notification)
- }
-
- def getNotificationTypes(Notification notification) {
- notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]
- }
-
- @SuppressWarnings("unchecked")
- private def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {
- listeners.forEach[(it as NotificationListener).onNotification(notification)]
- }
-
- @Deprecated
- override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
- throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");
-
- }
-
- @Deprecated
- override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
- throw new UnsupportedOperationException(
- "Deprecated method. Use RegisterNotificationListener returned value to close registration.")
- }
-
- @Deprecated
- override notify(Notification notification, ExecutorService service) {
- publish(notification, service)
- }
-
- override publish(Notification notification) {
- publish(notification, executor)
- }
-
- override publish(Notification notification, ExecutorService service) {
- val allTypes = notification.notificationTypes
-
- var Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
- for (type : allTypes) {
- listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)
- }
- val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;
- executor.invokeAll(tasks);
- }
-
- override <T extends Notification> registerNotificationListener(Class<T> notificationType,
- NotificationListener<T> listener) {
- val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
- listeners.put(notificationType, listener);
- return reg;
- }
-
- override registerNotificationListener(
- org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
- val invoker = BindingAwareBrokerImpl.generator.invokerFactory.invokerFor(listener);
- for (notifyType : invoker.supportedNotifications) {
- listeners.put(notifyType, invoker.invocationProxy)
- }
- val registration = new GeneratedListenerRegistration(listener, invoker,this);
- return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;
- }
-
- protected def unregisterListener(GenericNotificationRegistration<?> reg) {
- listeners.remove(reg.type, reg.instance);
- }
-
- protected def unregisterListener(GeneratedListenerRegistration reg) {
- for (notifyType : reg.invoker.supportedNotifications) {
- listeners.remove(notifyType, reg.invoker.invocationProxy)
- }
- }
-
- override close() {
- //FIXME: implement properly.
- }
-
-}
-
-class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
-
- @Property
- val Class<T> type;
-
- var NotificationBrokerImpl notificationBroker;
-
- public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {
- super(instance);
- _type = type;
- notificationBroker = broker;
- }
-
- override protected removeRegistration() {
- notificationBroker.unregisterListener(this);
- notificationBroker = null;
- }
-}
-
-class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {
-
- @Property
- val NotificationInvoker invoker;
-
- var NotificationBrokerImpl notificationBroker;
-
-
- new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {
- super(instance);
- _invoker = invoker;
- notificationBroker = broker;
- }
-
- override protected removeRegistration() {
- notificationBroker.unregisterListener(this);
- notificationBroker = null;
- invoker.close();
- }
-}
-
-@Data
-class NotifyTask implements Callable<Object> {
-
- private static val log = LoggerFactory.getLogger(NotifyTask);
-
- val NotificationListener listener;
- val Notification notification;
-
- override call() {
- try {
- log.info("Delivering notification {} to {}",notification,listener);
- listener.onNotification(notification);
- log.info("Notification delivered {} to {}",notification,listener);
- } catch (Exception e) {
- log.error("Unhandled exception thrown by listener: {}", listener, e);
- }
- return null;
- }
-
-}
+/*\r
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+package org.opendaylight.controller.sal.binding.impl\r
+\r
+import com.google.common.collect.HashMultimap\r
+import com.google.common.collect.Multimap\r
+import java.util.Collection\r
+import java.util.Collections\r
+import java.util.concurrent.Callable\r
+import java.util.concurrent.ExecutorService\r
+import org.opendaylight.controller.sal.binding.api.NotificationListener\r
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService\r
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker\r
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
+import org.opendaylight.yangtools.concepts.ListenerRegistration\r
+import org.opendaylight.yangtools.concepts.Registration\r
+import org.opendaylight.yangtools.yang.binding.Notification\r
+import org.slf4j.LoggerFactory\r
+\r
+class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {\r
+\r
+ val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;\r
+\r
+ @Property\r
+ var ExecutorService executor;\r
+\r
+ new(ExecutorService executor) {\r
+ listeners = HashMultimap.create()\r
+ this.executor = executor;\r
+ }\r
+\r
+ @Deprecated\r
+ override <T extends Notification> addNotificationListener(Class<T> notificationType,\r
+ NotificationListener<T> listener) {\r
+ listeners.put(notificationType, listener)\r
+ }\r
+\r
+ @Deprecated\r
+ override <T extends Notification> removeNotificationListener(Class<T> notificationType,\r
+ NotificationListener<T> listener) {\r
+ listeners.remove(notificationType, listener)\r
+ }\r
+\r
+ override notify(Notification notification) {\r
+ publish(notification)\r
+ }\r
+\r
+ def getNotificationTypes(Notification notification) {\r
+ notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]\r
+ }\r
+\r
+ @SuppressWarnings("unchecked")\r
+ private def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {\r
+ listeners.forEach[(it as NotificationListener).onNotification(notification)]\r
+ }\r
+\r
+ @Deprecated\r
+ override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
+ throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");\r
+\r
+ }\r
+\r
+ @Deprecated\r
+ override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
+ throw new UnsupportedOperationException(\r
+ "Deprecated method. Use RegisterNotificationListener returned value to close registration.")\r
+ }\r
+\r
+ @Deprecated\r
+ override notify(Notification notification, ExecutorService service) {\r
+ publish(notification, service)\r
+ }\r
+\r
+ override publish(Notification notification) {\r
+ publish(notification, executor)\r
+ }\r
+\r
+ override publish(Notification notification, ExecutorService service) {\r
+ val allTypes = notification.notificationTypes\r
+\r
+ var Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();\r
+ for (type : allTypes) {\r
+ listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)\r
+ }\r
+ val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;\r
+ executor.invokeAll(tasks);\r
+ }\r
+\r
+ override <T extends Notification> registerNotificationListener(Class<T> notificationType,\r
+ NotificationListener<T> listener) {\r
+ val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);\r
+ listeners.put(notificationType, listener);\r
+ return reg;\r
+ }\r
+\r
+ override registerNotificationListener(\r
+ org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
+ val invoker = BindingAwareBrokerImpl.generator.invokerFactory.invokerFor(listener);\r
+ for (notifyType : invoker.supportedNotifications) {\r
+ listeners.put(notifyType, invoker.invocationProxy)\r
+ }\r
+ val registration = new GeneratedListenerRegistration(listener, invoker,this);\r
+ return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;\r
+ }\r
+\r
+ protected def unregisterListener(GenericNotificationRegistration<?> reg) {\r
+ listeners.remove(reg.type, reg.instance);\r
+ }\r
+\r
+ protected def unregisterListener(GeneratedListenerRegistration reg) {\r
+ for (notifyType : reg.invoker.supportedNotifications) {\r
+ listeners.remove(notifyType, reg.invoker.invocationProxy)\r
+ }\r
+ }\r
+ \r
+ override close() {\r
+ //FIXME: implement properly.\r
+ }\r
+ \r
+}\r
+\r
+class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {\r
+\r
+ @Property\r
+ val Class<T> type;\r
+\r
+ var NotificationBrokerImpl notificationBroker;\r
+\r
+ public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {\r
+ super(instance);\r
+ _type = type;\r
+ notificationBroker = broker;\r
+ }\r
+\r
+ override protected removeRegistration() {\r
+ notificationBroker.unregisterListener(this);\r
+ notificationBroker = null;\r
+ }\r
+}\r
+\r
+class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {\r
+\r
+ @Property\r
+ val NotificationInvoker invoker;\r
+ \r
+ var NotificationBrokerImpl notificationBroker;\r
+ \r
+\r
+ new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {\r
+ super(instance);\r
+ _invoker = invoker;\r
+ notificationBroker = broker;\r
+ }\r
+\r
+ override protected removeRegistration() {\r
+ notificationBroker.unregisterListener(this);\r
+ notificationBroker = null;\r
+ invoker.close();\r
+ }\r
+}\r
+\r
+@Data\r
+class NotifyTask implements Callable<Object> {\r
+\r
+ private static val log = LoggerFactory.getLogger(NotifyTask);\r
+\r
+ val NotificationListener listener;\r
+ val Notification notification;\r
+\r
+ override call() {\r
+ try {\r
+ log.info("Delivering notification {} to {}",notification,listener);\r
+ listener.onNotification(notification);\r
+ log.info("Notification delivered {} to {}",notification,listener);\r
+ } catch (Exception e) {\r
+ log.error("Unhandled exception thrown by listener: {}", listener, e);\r
+ }\r
+ return null;\r
+ }\r
+\r
+}\r
-package org.opendaylight.controller.md.sal.common.impl.service
-
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus
-import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
-import org.opendaylight.yangtools.concepts.ListenerRegistration
-import com.google.common.collect.Multimap
-import static com.google.common.base.Preconditions.*;
-import java.util.List
-import com.google.common.collect.HashMultimap
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Callable
-import org.opendaylight.yangtools.yang.common.RpcResult
-import java.util.Collections
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
-import java.util.ArrayList
-import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
-import java.util.Arrays
-import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
-import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
-import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
-import org.opendaylight.controller.sal.common.util.Rpcs
-import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
-import java.util.concurrent.Future
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
-import org.opendaylight.yangtools.concepts.Path
-import org.slf4j.LoggerFactory
-import java.util.HashSet
-import java.util.Collection
-import com.google.common.collect.FluentIterable;
-import java.util.Set
-import com.google.common.collect.ImmutableList
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
-import org.opendaylight.controller.md.sal.common.api.RegistrationListener
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
-
-abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
-DataReader<P, D>, //
-DataChangePublisher<P, D, DCL>, //
-DataProvisionService<P, D> {
-
- private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
-
- @Property
- var ExecutorService executor;
-
- @Property
- var AbstractDataReadRouter<P, D> dataReadRouter;
-
- Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
- Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
-
- val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
- public new() {
- }
-
- protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
- HashSet<P> paths) {
- return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
- .transformAndConcat[value] //
- .transform[instance].toList()
- }
-
- override final readConfigurationData(P path) {
- return dataReadRouter.readConfigurationData(path);
- }
-
- override final readOperationalData(P path) {
- return dataReadRouter.readOperationalData(path);
- }
-
- override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
- val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
- commitHandlers.put(path, registration)
- LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
- for(listener : commitHandlerRegistrationListeners) {
- try {
- listener.instance.onRegister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
- }
- }
- return registration;
- }
-
- override final def registerDataChangeListener(P path, DCL listener) {
- val reg = new DataChangeListenerRegistration(path, listener, this);
- listeners.put(path, reg);
- val initialConfig = dataReadRouter.readConfigurationData(path);
- val initialOperational = dataReadRouter.readOperationalData(path);
- val event = createInitialListenerEvent(path,initialConfig,initialOperational);
- listener.onDataChanged(event);
- return reg;
- }
-
- final def registerDataReader(P path, DataReader<P, D> reader) {
-
- val confReg = dataReadRouter.registerConfigurationReader(path, reader);
- val dataReg = dataReadRouter.registerOperationalReader(path, reader);
-
- return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
- }
-
- override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
- val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
-
- return ret;
- }
-
- protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
- return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
-
- }
-
- protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
- listeners.remove(registration.path, registration);
- }
-
- protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
- commitHandlers.remove(registration.path, registration);
-
- LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
- for(listener : commitHandlerRegistrationListeners) {
- try {
- listener.instance.onUnregister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
- }
- }
- }
-
- protected final def getActiveCommitHandlers() {
- return commitHandlers.entries;
- }
-
- protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
- HashSet<P> paths) {
- return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
- val operationalState = readOperationalData(key)
- val configurationState = readConfigurationData(key)
- return new ListenerStateCapture(key, value, operationalState, configurationState)
- ].toList()
- }
-
- protected def boolean isAffectedBy(P key, Set<P> paths) {
- if (paths.contains(key)) {
- return true;
- }
- for (path : paths) {
- if (key.contains(path)) {
- return true;
- }
- }
-
- return false;
- }
-
- package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
- checkNotNull(transaction);
- transaction.changeStatus(TransactionStatus.SUBMITED);
- val task = new TwoPhaseCommit(transaction, this);
- return executor.submit(task);
- }
-
-}
-
-@Data
-package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
-
- @Property
- P path;
-
- @Property
- Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
-
- @Property
- D initialOperationalState;
-
- @Property
- D initialConfigurationState;
-}
-
-package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
-
- AbstractDataBroker<P, D, DCL> dataBroker;
-
- @Property
- val P path;
-
- new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
- super(instance)
- dataBroker = broker;
- _path = path;
- }
-
- override protected removeRegistration() {
- dataBroker.removeListener(this);
- dataBroker = null;
- }
-
-}
-
-package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
-implements DataCommitHandlerRegistration<P, D> {
-
- AbstractDataBroker<P, D, ?> dataBroker;
-
- @Property
- val P path;
-
- new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
- super(instance)
- dataBroker = broker;
- _path = path;
- }
-
- override protected removeRegistration() {
- dataBroker.removeCommitHandler(this);
- dataBroker = null;
- }
-}
-
-package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
-
- private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
-
- val AbstractDataTransaction<P, D> transaction;
- val AbstractDataBroker<P, D, DCL> dataBroker;
-
- new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
- this.transaction = transaction;
- this.dataBroker = broker;
- }
-
- override call() throws Exception {
-
- // get affected paths
- val affectedPaths = new HashSet<P>();
-
- affectedPaths.addAll(transaction.createdConfigurationData.keySet);
- affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
- affectedPaths.addAll(transaction.removedConfigurationData);
-
- affectedPaths.addAll(transaction.createdOperationalData.keySet);
- affectedPaths.addAll(transaction.updatedOperationalData.keySet);
- affectedPaths.addAll(transaction.removedOperationalData);
-
- val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
-
- val transactionId = transaction.identifier;
-
- log.info("Transaction: {} Started.",transactionId);
- // requesting commits
- val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
- val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
- try {
- for (handler : commitHandlers) {
- handlerTransactions.add(handler.requestCommit(transaction));
- }
- } catch (Exception e) {
- log.error("Transaction: {} Request Commit failed", transactionId,e);
- return rollback(handlerTransactions, e);
- }
- val List<RpcResult<Void>> results = new ArrayList();
- try {
- for (subtransaction : handlerTransactions) {
- results.add(subtransaction.finish());
- }
- listeners.publishDataChangeEvent();
- } catch (Exception e) {
- log.error("Transaction: {} Finish Commit failed",transactionId, e);
- return rollback(handlerTransactions, e);
- }
- log.info("Transaction: {} Finished succesfully.",transactionId);
- return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
-
- }
-
- def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
- for (listenerSet : listeners) {
- val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
- val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
-
- val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
- listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
- for (listener : listenerSet.listeners) {
- try {
- listener.instance.onDataChanged(changeEvent);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
- for (transaction : transactions) {
- transaction.rollback()
- }
-
- // FIXME return encountered error.
- return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
- }
-}
-
-public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
-
- @Property
- private val Object identifier;
-
- var TransactionStatus status;
-
- var AbstractDataBroker<P, D, ?> broker;
-
- protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
- super(dataBroker);
- _identifier = identifier;
- broker = dataBroker;
- status = TransactionStatus.NEW;
-
- //listeners = new ListenerRegistry<>();
- }
-
- override commit() {
- return broker.commit(this);
- }
-
- override readConfigurationData(P path) {
- return broker.readConfigurationData(path);
- }
-
- override readOperationalData(P path) {
- return broker.readOperationalData(path);
- }
-
- override hashCode() {
- return identifier.hashCode;
- }
-
- override equals(Object obj) {
- if (this === obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- val other = (obj as AbstractDataTransaction<P,D>);
- if (broker == null) {
- if (other.broker != null)
- return false;
- } else if (!broker.equals(other.broker))
- return false;
- if (identifier == null) {
- if (other.identifier != null)
- return false;
- } else if (!identifier.equals(other.identifier))
- return false;
- return true;
- }
-
- override TransactionStatus getStatus() {
- return status;
- }
-
- protected abstract def void onStatusChange(TransactionStatus status);
-
- public def changeStatus(TransactionStatus status) {
- this.status = status;
- onStatusChange(status);
- }
-
-}
+package org.opendaylight.controller.md.sal.common.impl.service\r
+\r
+import com.google.common.collect.FluentIterable\r
+import com.google.common.collect.HashMultimap\r
+import com.google.common.collect.ImmutableList\r
+import com.google.common.collect.Multimap\r
+import java.util.ArrayList\r
+import java.util.Arrays\r
+import java.util.Collection\r
+import java.util.Collections\r
+import java.util.HashSet\r
+import java.util.List\r
+import java.util.Set\r
+import java.util.concurrent.Callable\r
+import java.util.concurrent.ExecutorService\r
+import java.util.concurrent.Future\r
+import java.util.concurrent.atomic.AtomicLong\r
+import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
+import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
+import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
+import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
+import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
+import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
+import org.opendaylight.controller.sal.common.util.Rpcs\r
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
+import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
+import org.opendaylight.yangtools.concepts.ListenerRegistration\r
+import org.opendaylight.yangtools.concepts.Path\r
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
+import org.opendaylight.yangtools.yang.common.RpcResult\r
+import org.slf4j.LoggerFactory\r
+\r
+import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
+
+abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
+DataReader<P, D>, //\r
+DataChangePublisher<P, D, DCL>, //\r
+DataProvisionService<P, D> {\r
+\r
+ private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
+\r
+ @Property\r
+ var ExecutorService executor;\r
+\r
+ @Property\r
+ var AbstractDataReadRouter<P, D> dataReadRouter;\r
+ \r
+ @Property\r
+ private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
+ \r
+ @Property\r
+ private val AtomicLong failedTransactionsCount = new AtomicLong\r
+ \r
+ @Property\r
+ private val AtomicLong finishedTransactionsCount = new AtomicLong\r
+\r
+ Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();\r
+ Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();\r
+ \r
+ val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
+ public new() {\r
+ }\r
+\r
+ protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
+ HashSet<P> paths) {\r
+ return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
+ .transformAndConcat[value] //\r
+ .transform[instance].toList()\r
+ }\r
+\r
+ override final readConfigurationData(P path) {\r
+ return dataReadRouter.readConfigurationData(path);\r
+ }\r
+\r
+ override final readOperationalData(P path) {\r
+ return dataReadRouter.readOperationalData(path);\r
+ }\r
+\r
+ override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
+ val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
+ commitHandlers.put(path, registration)\r
+ LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);\r
+ for(listener : commitHandlerRegistrationListeners) {\r
+ try {\r
+ listener.instance.onRegister(registration);\r
+ } catch (Exception e) {\r
+ LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
+ }\r
+ }\r
+ return registration;\r
+ }\r
+\r
+ override final def registerDataChangeListener(P path, DCL listener) {\r
+ val reg = new DataChangeListenerRegistration(path, listener, this);\r
+ listeners.put(path, reg);\r
+ val initialConfig = dataReadRouter.readConfigurationData(path);\r
+ val initialOperational = dataReadRouter.readOperationalData(path);\r
+ val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
+ listener.onDataChanged(event);\r
+ return reg;\r
+ }\r
+\r
+ final def registerDataReader(P path, DataReader<P, D> reader) {\r
+\r
+ val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
+ val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
+\r
+ return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));\r
+ }\r
+ \r
+ override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
+ val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
+ \r
+ return ret;\r
+ }\r
+ \r
+ protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
+ return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
+ \r
+ }\r
+\r
+ protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {\r
+ listeners.remove(registration.path, registration);\r
+ }\r
+\r
+ protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
+ commitHandlers.remove(registration.path, registration);\r
+ \r
+ LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
+ for(listener : commitHandlerRegistrationListeners) {\r
+ try {\r
+ listener.instance.onUnregister(registration);\r
+ } catch (Exception e) {\r
+ LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
+ }\r
+ }\r
+ }\r
+\r
+ protected final def getActiveCommitHandlers() {\r
+ return commitHandlers.entries;\r
+ }\r
+\r
+ protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
+ HashSet<P> paths) {\r
+ return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
+ val operationalState = readOperationalData(key)\r
+ val configurationState = readConfigurationData(key)\r
+ return new ListenerStateCapture(key, value, operationalState, configurationState)\r
+ ].toList()\r
+ }\r
+\r
+ protected def boolean isAffectedBy(P key, Set<P> paths) {\r
+ if (paths.contains(key)) {\r
+ return true;\r
+ }\r
+ for (path : paths) {\r
+ if (key.contains(path)) {\r
+ return true;\r
+ }\r
+ }\r
+\r
+ return false;\r
+ }\r
+\r
+ package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
+ checkNotNull(transaction);\r
+ transaction.changeStatus(TransactionStatus.SUBMITED);\r
+ val task = new TwoPhaseCommit(transaction, this);\r
+ submittedTransactionsCount.andIncrement;\r
+ return executor.submit(task);\r
+ }\r
+\r
+}\r
+\r
+@Data\r
+package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
+\r
+ @Property\r
+ P path;\r
+\r
+ @Property\r
+ Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
+\r
+ @Property\r
+ D initialOperationalState;\r
+\r
+ @Property\r
+ D initialConfigurationState;\r
+}\r
+\r
+package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
+\r
+ AbstractDataBroker<P, D, DCL> dataBroker;\r
+\r
+ @Property\r
+ val P path;\r
+\r
+ new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
+ super(instance)\r
+ dataBroker = broker;\r
+ _path = path;\r
+ }\r
+\r
+ override protected removeRegistration() {\r
+ dataBroker.removeListener(this);\r
+ dataBroker = null;\r
+ }\r
+\r
+}\r
+\r
+package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
+extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
+implements DataCommitHandlerRegistration<P, D> {\r
+\r
+ AbstractDataBroker<P, D, ?> dataBroker;\r
+\r
+ @Property\r
+ val P path;\r
+\r
+ new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
+ super(instance)\r
+ dataBroker = broker;\r
+ _path = path;\r
+ }\r
+\r
+ override protected removeRegistration() {\r
+ dataBroker.removeCommitHandler(this);\r
+ dataBroker = null;\r
+ }\r
+}\r
+\r
+package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
+\r
+ private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
+\r
+ val AbstractDataTransaction<P, D> transaction;\r
+ val AbstractDataBroker<P, D, DCL> dataBroker;\r
+ \r
+ new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
+ this.transaction = transaction;\r
+ this.dataBroker = broker;\r
+ }\r
+\r
+ override call() throws Exception {\r
+\r
+ // get affected paths\r
+ val affectedPaths = new HashSet<P>();\r
+\r
+ affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
+ affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
+ affectedPaths.addAll(transaction.removedConfigurationData);\r
+\r
+ affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
+ affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
+ affectedPaths.addAll(transaction.removedOperationalData);\r
+\r
+ val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
+\r
+ val transactionId = transaction.identifier;\r
+\r
+ log.info("Transaction: {} Started.",transactionId);\r
+ // requesting commits\r
+ val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
+ val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
+ try {\r
+ for (handler : commitHandlers) {\r
+ handlerTransactions.add(handler.requestCommit(transaction));\r
+ }\r
+ } catch (Exception e) {\r
+ log.error("Transaction: {} Request Commit failed", transactionId,e);\r
+ dataBroker.failedTransactionsCount.andIncrement\r
+ return rollback(handlerTransactions, e);\r
+ }\r
+ val List<RpcResult<Void>> results = new ArrayList();\r
+ try {\r
+ for (subtransaction : handlerTransactions) {\r
+ results.add(subtransaction.finish());\r
+ }\r
+ listeners.publishDataChangeEvent();\r
+ } catch (Exception e) {\r
+ log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
+ dataBroker.failedTransactionsCount.andIncrement\r
+ return rollback(handlerTransactions, e);\r
+ }\r
+ log.info("Transaction: {} Finished successfully.",transactionId);\r
+ dataBroker.finishedTransactionsCount.andIncrement;\r
+ return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
+\r
+ }\r
+\r
+ def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
+ for (listenerSet : listeners) {\r
+ val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);\r
+ val updatedOperational = dataBroker.readOperationalData(listenerSet.path);\r
+\r
+ val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,\r
+ listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);\r
+ for (listener : listenerSet.listeners) {\r
+ try {\r
+ listener.instance.onDataChanged(changeEvent);\r
+\r
+ } catch (Exception e) {\r
+ e.printStackTrace();\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
+ for (transaction : transactions) {\r
+ transaction.rollback()\r
+ }\r
+\r
+ // FIXME return encountered error.\r
+ return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
+ }\r
+}\r
+\r
+public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
+\r
+ @Property\r
+ private val Object identifier;\r
+\r
+ var TransactionStatus status;\r
+\r
+ var AbstractDataBroker<P, D, ?> broker;\r
+\r
+ protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
+ super(dataBroker);\r
+ _identifier = identifier;\r
+ broker = dataBroker;\r
+ status = TransactionStatus.NEW;\r
+\r
+ //listeners = new ListenerRegistry<>();\r
+ }\r
+\r
+ override commit() {\r
+ return broker.commit(this);\r
+ }\r
+\r
+ override readConfigurationData(P path) {\r
+ return broker.readConfigurationData(path);\r
+ }\r
+\r
+ override readOperationalData(P path) {\r
+ return broker.readOperationalData(path);\r
+ }\r
+\r
+ override hashCode() {\r
+ return identifier.hashCode;\r
+ }\r
+\r
+ override equals(Object obj) {\r
+ if (this === obj)\r
+ return true;\r
+ if (obj == null)\r
+ return false;\r
+ if (getClass() != obj.getClass())\r
+ return false;\r
+ val other = (obj as AbstractDataTransaction<P,D>);\r
+ if (broker == null) {\r
+ if (other.broker != null)\r
+ return false;\r
+ } else if (!broker.equals(other.broker))\r
+ return false;\r
+ if (identifier == null) {\r
+ if (other.identifier != null)\r
+ return false;\r
+ } else if (!identifier.equals(other.identifier))\r
+ return false;\r
+ return true;\r
+ }\r
+\r
+ override TransactionStatus getStatus() {\r
+ return status;\r
+ }\r
+\r
+ protected abstract def void onStatusChange(TransactionStatus status);\r
+\r
+ public def changeStatus(TransactionStatus status) {\r
+ this.status = status;\r
+ onStatusChange(status);\r
+ }\r
+\r
+}\r
-module opendaylight-md-sal-common {
- yang-version 1;
- namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:common";
- prefix "md-sal-common";
-
- description
- "Common definition for MD-SAL.";
-
- revision "2013-10-28" {
- description
- "Initial revision";
- }
-
- grouping rpc-routing-table {
-
- leaf routing-context {
- type string;
- }
- list routes {
- leaf path {
- type string;
- }
- leaf destination {
- type string;
- }
- }
-
- }
-
- grouping rpc-router {
- leaf module {
- type string;
- }
- container routing-tables {
- list routing-table {
- uses rpc-routing-table;
- }
- }
- }
-
- grouping rpc-state {
- list rpc-router {
- uses rpc-router;
- }
- }
-
- grouping notification-state {
- container notifications {
- leaf published {
- type uint32;
- }
- }
- }
-
- grouping data-state {
- container transactions {
- leaf created {
- type uint32;
- }
- leaf successful {
- type uint32;
- }
- leaf failed {
- type uint32;
- }
- }
- }
+module opendaylight-md-sal-common {\r
+ yang-version 1;\r
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:common";\r
+ prefix "md-sal-common";\r
+ \r
+ description\r
+ "Common definition for MD-SAL.";\r
+ \r
+ revision "2013-10-28" {\r
+ description\r
+ "Initial revision";\r
+ }\r
+ \r
+ grouping rpc-routing-table {\r
+ \r
+ leaf routing-context {\r
+ type string;\r
+ }\r
+ list routes {\r
+ leaf path {\r
+ type string;\r
+ }\r
+ leaf destination {\r
+ type string;\r
+ }\r
+ }\r
+ \r
+ }\r
+\r
+ grouping rpc-router {\r
+ leaf module {\r
+ type string;\r
+ }\r
+ container routing-tables {\r
+ list routing-table {\r
+ uses rpc-routing-table;\r
+ }\r
+ }\r
+ }\r
+ \r
+ grouping rpc-state {\r
+ list rpc-router {\r
+ uses rpc-router;\r
+ }\r
+ }\r
+ \r
+ grouping notification-state {\r
+ container notifications {\r
+ leaf published {\r
+ type uint32;\r
+ }\r
+ }\r
+ }\r
+ \r
+ grouping data-state {\r
+ container transactions {\r
+ leaf created {\r
+ type uint32;\r
+ }\r
+ leaf submitted {\r
+ type uint32;\r
+ }\r
+ leaf successful {\r
+ type uint32;\r
+ }\r
+ leaf failed {\r
+ type uint32;\r
+ }\r
+ }\r
+ }\r
}
\ No newline at end of file