Added transactions statistics support. 35/3335/2
authorLukas Sedlak <lsedlak@cisco.com>
Mon, 2 Dec 2013 11:38:03 +0000 (12:38 +0100)
committerLukas Sedlak <lsedlak@cisco.com>
Mon, 2 Dec 2013 12:51:06 +0000 (13:51 +0100)
Added implementation of DataBrokerImplRuntimeMXBean as DataBrokerRuntimeMXBeanImpl;
Modified DataBrokerImplModule with DataBrokerRuntimeMXBeanImpl;
Added "submitted" leaf into opendaylight-md-sal-common.yang;

Signed-off-by: Lukas Sedlak <lsedlak@cisco.com>
Change-Id: I5494c7175b37c14e39d6b2b3d87fccd3358f1a57

opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/BindingBrokerImplModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/statistics/DataBrokerRuntimeMXBeanImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.java
opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend
opendaylight/md-sal/sal-dom-api/src/main/yang/opendaylight-md-sal-common.yang

index bce1f61..48c33ad 100644 (file)
@@ -1,52 +1,53 @@
-/**
-* Generated file
-
-* Generated from: yang module name: opendaylight-sal-binding-broker-impl  yang module local name: binding-broker-impl
-* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
-* Generated at: Wed Nov 20 17:33:01 CET 2013
-*
-* Do not modify this file unless it is present under src/main directory
-*/
-package org.opendaylight.controller.config.yang.md.sal.binding.impl;
-
-import org.opendaylight.controller.sal.binding.impl.BindingAwareBrokerImpl;
-import org.osgi.framework.BundleContext;
-
-/**
-*
-*/
-public final class BindingBrokerImplModule extends org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractBindingBrokerImplModule {
-
-    private BundleContext bundleContext;
-
-    public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
-        super(identifier, dependencyResolver);
-    }
-
-    public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, BindingBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) {
-        super(identifier, dependencyResolver, oldModule, oldInstance);
-    }
-
-    @Override
-    public void validate(){
-        super.validate();
-        // Add custom validation for module attributes here.
-    }
-
-    @Override
-    public java.lang.AutoCloseable createInstance() {
-        BindingAwareBrokerImpl broker = new BindingAwareBrokerImpl(getBundleContext());
-        broker.setDataBroker(getDataBrokerDependency());
-        broker.setNotifyBroker(getNotificationServiceDependency());
-        broker.start();
-        return broker;
-    }
-
-    public BundleContext getBundleContext() {
-        return bundleContext;
-    }
-
-    public void setBundleContext(BundleContext bundleContext) {
-        this.bundleContext = bundleContext;
-    }
-}
+/**\r
+* Generated file\r
+\r
+* Generated from: yang module name: opendaylight-sal-binding-broker-impl  yang module local name: binding-broker-impl\r
+* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator\r
+* Generated at: Wed Nov 20 17:33:01 CET 2013\r
+*\r
+* Do not modify this file unless it is present under src/main directory\r
+*/\r
+package org.opendaylight.controller.config.yang.md.sal.binding.impl;\r
+\r
+import org.opendaylight.controller.sal.binding.impl.BindingAwareBrokerImpl;\r
+import org.osgi.framework.BundleContext;\r
+\r
+import com.google.common.base.Preconditions;\r
+\r
+/**\r
+*\r
+*/\r
+public final class BindingBrokerImplModule extends org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractBindingBrokerImplModule {\r
+\r
+    private BundleContext bundleContext;\r
+\r
+    public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {\r
+        super(identifier, dependencyResolver);\r
+    }\r
+\r
+    public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, BindingBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) {\r
+        super(identifier, dependencyResolver, oldModule, oldInstance);\r
+    }\r
+\r
+    @Override\r
+    public void validate(){\r
+        super.validate();\r
+    }\r
+\r
+    @Override\r
+    public java.lang.AutoCloseable createInstance() {\r
+        BindingAwareBrokerImpl broker = new BindingAwareBrokerImpl(getBundleContext());\r
+        broker.setDataBroker(getDataBrokerDependency());\r
+        broker.setNotifyBroker(getNotificationServiceDependency());\r
+        broker.start();\r
+        return broker;\r
+    }\r
+\r
+    public BundleContext getBundleContext() {\r
+        return bundleContext;\r
+    }\r
+\r
+    public void setBundleContext(BundleContext bundleContext) {\r
+        this.bundleContext = bundleContext;\r
+    }\r
+}\r
index 8cbfbb6..74b6ad8 100644 (file)
@@ -1,83 +1,82 @@
-/**
- * Generated file
-
- * Generated from: yang module name: opendaylight-sal-binding-broker-impl  yang module local name: binding-data-broker
- * Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator
- * Generated at: Wed Nov 20 17:33:01 CET 2013
- *
- * Do not modify this file unless it is present under src/main directory
- */
-package org.opendaylight.controller.config.yang.md.sal.binding.impl;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
-import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentDataServiceConnector;
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentMappingService;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.data.DataProviderService;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.osgi.framework.BundleContext;
-
-import com.google.common.util.concurrent.MoreExecutors;
-
-/**
-*
-*/
-public final class DataBrokerImplModule extends
-        org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractDataBrokerImplModule {
-
-    private BundleContext bundleContext;
-
-    public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
-            org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
-        super(identifier, dependencyResolver);
-    }
-
-    public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
-            org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
-            DataBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) {
-        super(identifier, dependencyResolver, oldModule, oldInstance);
-    }
-
-    @Override
-    public void validate() {
-        super.validate();
-    }
-
-    @Override
-    public java.lang.AutoCloseable createInstance() {
-        DataBrokerImpl dataBindingBroker = new DataBrokerImpl();
-        
-        // FIXME: obtain via dependency management
-        ExecutorService executor = Executors.newCachedThreadPool();
-        ExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
-        dataBindingBroker.setExecutor(listeningExecutor);
-
-        
-        
-        Broker domBroker = getDomBrokerDependency();
-        BindingIndependentMappingService mappingService = getMappingServiceDependency();
-        
-        if (domBroker != null && mappingService != null) {
-            BindingIndependentDataServiceConnector runtimeMapping = new BindingIndependentDataServiceConnector();
-            runtimeMapping.setMappingService(mappingService);
-            runtimeMapping.setBaDataService(dataBindingBroker);
-            domBroker.registerProvider(runtimeMapping, getBundleContext());
-        }
-
-        return dataBindingBroker;
-    }
-
-    public BundleContext getBundleContext() {
-        return bundleContext;
-    }
-
-    public void setBundleContext(BundleContext bundleContext2) {
-        this.bundleContext = bundleContext2;
-    }
-}
+/**\r
+ * Generated file\r
+\r
+ * Generated from: yang module name: opendaylight-sal-binding-broker-impl  yang module local name: binding-data-broker\r
+ * Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator\r
+ * Generated at: Wed Nov 20 17:33:01 CET 2013\r
+ *\r
+ * Do not modify this file unless it is present under src/main directory\r
+ */\r
+package org.opendaylight.controller.config.yang.md.sal.binding.impl;\r
+\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.ScheduledExecutorService;\r
+\r
+import org.opendaylight.controller.config.yang.md.sal.binding.statistics.DataBrokerRuntimeMXBeanImpl;\r
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;\r
+import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;\r
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentDataServiceConnector;\r
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentMappingService;\r
+import org.opendaylight.controller.sal.core.api.Broker;\r
+import org.opendaylight.controller.sal.core.api.data.DataProviderService;\r
+import org.opendaylight.yangtools.yang.binding.DataObject;\r
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
+import org.osgi.framework.BundleContext;\r
+\r
+import com.google.common.util.concurrent.MoreExecutors;\r
+\r
+/**\r
+*\r
+*/\r
+public final class DataBrokerImplModule extends\r
+        org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractDataBrokerImplModule {\r
+\r
+    private BundleContext bundleContext;\r
+\r
+    public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,\r
+            org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {\r
+        super(identifier, dependencyResolver);\r
+    }\r
+\r
+    public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,\r
+            org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,\r
+            DataBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) {\r
+        super(identifier, dependencyResolver, oldModule, oldInstance);\r
+    }\r
+\r
+    @Override\r
+    public void validate() {\r
+        super.validate();\r
+    }\r
+\r
+    @Override\r
+    public java.lang.AutoCloseable createInstance() {\r
+        DataBrokerRuntimeMXBeanImpl dataBindingBroker = new DataBrokerRuntimeMXBeanImpl();\r
+        \r
+        // FIXME: obtain via dependency management\r
+        ExecutorService executor = Executors.newCachedThreadPool();\r
+        ExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);\r
+        dataBindingBroker.setExecutor(listeningExecutor);\r
+\r
+        Broker domBroker = getDomBrokerDependency();\r
+        BindingIndependentMappingService mappingService = getMappingServiceDependency();\r
+        \r
+        if (domBroker != null && mappingService != null) {\r
+            BindingIndependentDataServiceConnector runtimeMapping = new BindingIndependentDataServiceConnector();\r
+            runtimeMapping.setMappingService(mappingService);\r
+            runtimeMapping.setBaDataService(dataBindingBroker);\r
+            domBroker.registerProvider(runtimeMapping, getBundleContext());\r
+        }\r
+        getRootRuntimeBeanRegistratorWrapper().register(dataBindingBroker);\r
+        return dataBindingBroker;\r
+    }\r
+\r
+    public BundleContext getBundleContext() {\r
+        return bundleContext;\r
+    }\r
+\r
+    public void setBundleContext(BundleContext bundleContext2) {\r
+        this.bundleContext = bundleContext2;\r
+    }\r
+}\r
diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/statistics/DataBrokerRuntimeMXBeanImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/statistics/DataBrokerRuntimeMXBeanImpl.java
new file mode 100644 (file)
index 0000000..a1a24eb
--- /dev/null
@@ -0,0 +1,19 @@
+package org.opendaylight.controller.config.yang.md.sal.binding.statistics;\r
+\r
+import org.opendaylight.controller.config.yang.md.sal.binding.impl.DataBrokerImplRuntimeMXBean;\r
+import org.opendaylight.controller.config.yang.md.sal.binding.impl.Transactions;\r
+import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl;\r
+\r
+public class DataBrokerRuntimeMXBeanImpl extends DataBrokerImpl implements DataBrokerImplRuntimeMXBean {\r
+    \r
+    private Transactions transactions = new Transactions();\r
+    \r
+    @Override\r
+    public Transactions getTransactions() {\r
+        transactions.setCreated(getCreatedTransactionsCount().get());\r
+        transactions.setSubmitted(getSubmittedTransactionsCount().get());\r
+        transactions.setSuccessful(getFinishedTransactionsCount().get());\r
+        transactions.setFailed(getFailedTransactionsCount().get());\r
+        return transactions;\r
+    }\r
+}\r
index 2f4510d..02c59b2 100644 (file)
-package org.opendaylight.controller.sal.binding.impl;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
-import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataBroker;
-import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
-import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
-import org.opendaylight.controller.sal.binding.impl.util.BindingAwareDataReaderRouter;
-import org.opendaylight.controller.sal.common.DataStoreIdentifier;
-import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.DataRoot;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
-
-public class DataBrokerImpl extends AbstractDataBroker<InstanceIdentifier<? extends DataObject>, DataObject, DataChangeListener> implements
-        DataProviderService, AutoCloseable {
-
-    
-    
-    private final AtomicLong nextTransaction = new AtomicLong();
-    
-    public DataBrokerImpl() {
-        setDataReadRouter(new BindingAwareDataReaderRouter());
-    }
-
-    @Override
-    public DataTransactionImpl beginTransaction() {
-        String transactionId = "BA-" + nextTransaction.getAndIncrement();
-        return new DataTransactionImpl(transactionId,this);
-    }
-
-    @Override
-    @Deprecated
-    public <T extends DataRoot> T getData(DataStoreIdentifier store, Class<T> rootType) {
-        throw new UnsupportedOperationException("Deprecated");
-    }
-
-    @Override
-    @Deprecated
-    public <T extends DataRoot> T getData(DataStoreIdentifier store, T filter) {
-        throw new UnsupportedOperationException("Deprecated");
-    }
-
-    @Override
-    @Deprecated
-    public <T extends DataRoot> T getCandidateData(DataStoreIdentifier store, Class<T> rootType) {
-        throw new UnsupportedOperationException("Deprecated");
-    }
-
-    @Override
-    @Deprecated
-    public <T extends DataRoot> T getCandidateData(DataStoreIdentifier store, T filter) {
-        throw new UnsupportedOperationException("Deprecated");
-    }
-
-    @Override
-    @Deprecated
-    public RpcResult<DataRoot> editCandidateData(DataStoreIdentifier store, DataRoot changeSet) {
-        throw new UnsupportedOperationException("Deprecated");
-    }
-
-    @Override
-    @Deprecated
-    public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {
-        throw new UnsupportedOperationException("Deprecated");
-    }
-
-    @Override
-    @Deprecated
-    public DataObject getData(InstanceIdentifier<? extends DataObject> data) {
-        throw new UnsupportedOperationException("Deprecated");
-    }
-
-    @Override
-    @Deprecated
-    public DataObject getConfigurationData(InstanceIdentifier<?> data) {
-        throw new UnsupportedOperationException("Deprecated");
-    }
-
-    @Override
-    @Deprecated
-    public void registerChangeListener(InstanceIdentifier<? extends DataObject> path, DataChangeListener changeListener) {
-        throw new UnsupportedOperationException("Deprecated");
-    }
-
-    @Override
-    @Deprecated
-    public void unregisterChangeListener(InstanceIdentifier<? extends DataObject> path,
-            DataChangeListener changeListener) {
-        throw new UnsupportedOperationException("Deprecated");
-    }
-    
-    @Override
-    public void close() throws Exception {
-        
-    }
+package org.opendaylight.controller.sal.binding.impl;\r
+\r
+import java.util.concurrent.Future;\r
+import java.util.concurrent.atomic.AtomicLong;\r
+\r
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataBroker;\r
+import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;\r
+import org.opendaylight.controller.sal.binding.api.data.DataProviderService;\r
+import org.opendaylight.controller.sal.binding.impl.util.BindingAwareDataReaderRouter;\r
+import org.opendaylight.controller.sal.common.DataStoreIdentifier;\r
+import org.opendaylight.yangtools.yang.binding.DataObject;\r
+import org.opendaylight.yangtools.yang.binding.DataRoot;\r
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;\r
+import org.opendaylight.yangtools.yang.common.RpcResult;\r
+\r
+\r
+public class DataBrokerImpl extends AbstractDataBroker<InstanceIdentifier<? extends DataObject>, DataObject, DataChangeListener> implements\r
+        DataProviderService, AutoCloseable {\r
+\r
+    private final AtomicLong nextTransaction = new AtomicLong();\r
+    private final AtomicLong createdTransactionsCount = new AtomicLong();\r
+    \r
+    public AtomicLong getCreatedTransactionsCount() {\r
+        return createdTransactionsCount;\r
+    }\r
+\r
+    public DataBrokerImpl() {\r
+        setDataReadRouter(new BindingAwareDataReaderRouter());\r
+    }\r
+\r
+    @Override\r
+    public DataTransactionImpl beginTransaction() {\r
+        String transactionId = "BA-" + nextTransaction.getAndIncrement();\r
+        createdTransactionsCount.getAndIncrement();\r
+        return new DataTransactionImpl(transactionId,this);\r
+    }\r
+\r
+    @Override\r
+    @Deprecated\r
+    public <T extends DataRoot> T getData(DataStoreIdentifier store, Class<T> rootType) {\r
+        throw new UnsupportedOperationException("Deprecated");\r
+    }\r
+\r
+    @Override\r
+    @Deprecated\r
+    public <T extends DataRoot> T getData(DataStoreIdentifier store, T filter) {\r
+        throw new UnsupportedOperationException("Deprecated");\r
+    }\r
+\r
+    @Override\r
+    @Deprecated\r
+    public <T extends DataRoot> T getCandidateData(DataStoreIdentifier store, Class<T> rootType) {\r
+        throw new UnsupportedOperationException("Deprecated");\r
+    }\r
+\r
+    @Override\r
+    @Deprecated\r
+    public <T extends DataRoot> T getCandidateData(DataStoreIdentifier store, T filter) {\r
+        throw new UnsupportedOperationException("Deprecated");\r
+    }\r
+\r
+    @Override\r
+    @Deprecated\r
+    public RpcResult<DataRoot> editCandidateData(DataStoreIdentifier store, DataRoot changeSet) {\r
+        throw new UnsupportedOperationException("Deprecated");\r
+    }\r
+\r
+    @Override\r
+    @Deprecated\r
+    public Future<RpcResult<Void>> commit(DataStoreIdentifier store) {\r
+        throw new UnsupportedOperationException("Deprecated");\r
+    }\r
+\r
+    @Override\r
+    @Deprecated\r
+    public DataObject getData(InstanceIdentifier<? extends DataObject> data) {\r
+        throw new UnsupportedOperationException("Deprecated");\r
+    }\r
+\r
+    @Override\r
+    @Deprecated\r
+    public DataObject getConfigurationData(InstanceIdentifier<?> data) {\r
+        throw new UnsupportedOperationException("Deprecated");\r
+    }\r
+\r
+    @Override\r
+    @Deprecated\r
+    public void registerChangeListener(InstanceIdentifier<? extends DataObject> path, DataChangeListener changeListener) {\r
+        throw new UnsupportedOperationException("Deprecated");\r
+    }\r
+\r
+    @Override\r
+    @Deprecated\r
+    public void unregisterChangeListener(InstanceIdentifier<? extends DataObject> path,\r
+            DataChangeListener changeListener) {\r
+        throw new UnsupportedOperationException("Deprecated");\r
+    }\r
+    \r
+    @Override\r
+    public void close() throws Exception {\r
+        \r
+    }\r
 }
\ No newline at end of file
index cf339ee..e8b3850 100644 (file)
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.binding.impl
-
-import org.opendaylight.controller.sal.binding.api.NotificationProviderService
-import org.opendaylight.yangtools.yang.binding.Notification
-import com.google.common.collect.Multimap
-import org.opendaylight.controller.sal.binding.api.NotificationListener
-import com.google.common.collect.HashMultimap
-import java.util.concurrent.ExecutorService
-import java.util.Collection
-import org.opendaylight.yangtools.concepts.Registration
-import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory
-import org.opendaylight.yangtools.concepts.ListenerRegistration
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
-import java.util.Collections
-import org.slf4j.LoggerFactory
-import java.util.concurrent.Callable
-
-class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
-
-    val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;
-
-    @Property
-    var ExecutorService executor;
-
-    new(ExecutorService executor) {
-        listeners = HashMultimap.create()
-        this.executor = executor;
-    }
-
-    @Deprecated
-    override <T extends Notification> addNotificationListener(Class<T> notificationType,
-        NotificationListener<T> listener) {
-        listeners.put(notificationType, listener)
-    }
-
-    @Deprecated
-    override <T extends Notification> removeNotificationListener(Class<T> notificationType,
-        NotificationListener<T> listener) {
-        listeners.remove(notificationType, listener)
-    }
-
-    override notify(Notification notification) {
-        publish(notification)
-    }
-
-    def getNotificationTypes(Notification notification) {
-        notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]
-    }
-
-    @SuppressWarnings("unchecked")
-    private def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {
-        listeners.forEach[(it as NotificationListener).onNotification(notification)]
-    }
-
-    @Deprecated
-    override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
-        throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");
-
-    }
-
-    @Deprecated
-    override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
-        throw new UnsupportedOperationException(
-            "Deprecated method. Use RegisterNotificationListener returned value to close registration.")
-    }
-
-    @Deprecated
-    override notify(Notification notification, ExecutorService service) {
-        publish(notification, service)
-    }
-
-    override publish(Notification notification) {
-        publish(notification, executor)
-    }
-
-    override publish(Notification notification, ExecutorService service) {
-        val allTypes = notification.notificationTypes
-
-        var Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();
-        for (type : allTypes) {
-            listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)
-        }
-        val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;
-        executor.invokeAll(tasks);
-    }
-
-    override <T extends Notification> registerNotificationListener(Class<T> notificationType,
-        NotificationListener<T> listener) {
-        val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);
-        listeners.put(notificationType, listener);
-        return reg;
-    }
-
-    override registerNotificationListener(
-        org.opendaylight.yangtools.yang.binding.NotificationListener listener) {
-        val invoker = BindingAwareBrokerImpl.generator.invokerFactory.invokerFor(listener);
-        for (notifyType : invoker.supportedNotifications) {
-            listeners.put(notifyType, invoker.invocationProxy)
-        }
-        val registration = new GeneratedListenerRegistration(listener, invoker,this);
-        return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;
-    }
-
-    protected def unregisterListener(GenericNotificationRegistration<?> reg) {
-        listeners.remove(reg.type, reg.instance);
-    }
-
-    protected def unregisterListener(GeneratedListenerRegistration reg) {
-        for (notifyType : reg.invoker.supportedNotifications) {
-            listeners.remove(notifyType, reg.invoker.invocationProxy)
-        }
-    }
-    
-    override close()  {
-        //FIXME: implement properly.
-    }
-    
-}
-
-class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {
-
-    @Property
-    val Class<T> type;
-
-    var NotificationBrokerImpl notificationBroker;
-
-    public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {
-        super(instance);
-        _type = type;
-        notificationBroker = broker;
-    }
-
-    override protected removeRegistration() {
-        notificationBroker.unregisterListener(this);
-        notificationBroker = null;
-    }
-}
-
-class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {
-
-    @Property
-    val NotificationInvoker invoker;
-    
-    var NotificationBrokerImpl notificationBroker;
-    
-
-    new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {
-        super(instance);
-        _invoker = invoker;
-        notificationBroker = broker;
-    }
-
-    override protected removeRegistration() {
-        notificationBroker.unregisterListener(this);
-        notificationBroker = null;
-        invoker.close();
-    }
-}
-
-@Data
-class NotifyTask implements Callable<Object> {
-
-    private static val log = LoggerFactory.getLogger(NotifyTask);
-
-    val NotificationListener listener;
-    val Notification notification;
-
-    override call() {
-        try {
-            log.info("Delivering notification {} to {}",notification,listener);
-            listener.onNotification(notification);
-            log.info("Notification delivered {} to {}",notification,listener);
-        } catch (Exception e) {
-            log.error("Unhandled exception thrown by listener: {}", listener, e);
-        }
-        return null;
-    }
-
-}
+/*\r
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.\r
+ *\r
+ * This program and the accompanying materials are made available under the\r
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
+ * and is available at http://www.eclipse.org/legal/epl-v10.html\r
+ */\r
+package org.opendaylight.controller.sal.binding.impl\r
+\r
+import com.google.common.collect.HashMultimap\r
+import com.google.common.collect.Multimap\r
+import java.util.Collection\r
+import java.util.Collections\r
+import java.util.concurrent.Callable\r
+import java.util.concurrent.ExecutorService\r
+import org.opendaylight.controller.sal.binding.api.NotificationListener\r
+import org.opendaylight.controller.sal.binding.api.NotificationProviderService\r
+import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker\r
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
+import org.opendaylight.yangtools.concepts.ListenerRegistration\r
+import org.opendaylight.yangtools.concepts.Registration\r
+import org.opendaylight.yangtools.yang.binding.Notification\r
+import org.slf4j.LoggerFactory\r
+\r
+class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {\r
+\r
+    val Multimap<Class<? extends Notification>, NotificationListener<?>> listeners;\r
+\r
+    @Property\r
+    var ExecutorService executor;\r
+\r
+    new(ExecutorService executor) {\r
+        listeners = HashMultimap.create()\r
+        this.executor = executor;\r
+    }\r
+\r
+    @Deprecated\r
+    override <T extends Notification> addNotificationListener(Class<T> notificationType,\r
+        NotificationListener<T> listener) {\r
+        listeners.put(notificationType, listener)\r
+    }\r
+\r
+    @Deprecated\r
+    override <T extends Notification> removeNotificationListener(Class<T> notificationType,\r
+        NotificationListener<T> listener) {\r
+        listeners.remove(notificationType, listener)\r
+    }\r
+\r
+    override notify(Notification notification) {\r
+        publish(notification)\r
+    }\r
+\r
+    def getNotificationTypes(Notification notification) {\r
+        notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)]\r
+    }\r
+\r
+    @SuppressWarnings("unchecked")\r
+    private def notifyAll(Collection<NotificationListener<?>> listeners, Notification notification) {\r
+        listeners.forEach[(it as NotificationListener).onNotification(notification)]\r
+    }\r
+\r
+    @Deprecated\r
+    override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
+        throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead.");\r
+\r
+    }\r
+\r
+    @Deprecated\r
+    override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
+        throw new UnsupportedOperationException(\r
+            "Deprecated method. Use RegisterNotificationListener returned value to close registration.")\r
+    }\r
+\r
+    @Deprecated\r
+    override notify(Notification notification, ExecutorService service) {\r
+        publish(notification, service)\r
+    }\r
+\r
+    override publish(Notification notification) {\r
+        publish(notification, executor)\r
+    }\r
+\r
+    override publish(Notification notification, ExecutorService service) {\r
+        val allTypes = notification.notificationTypes\r
+\r
+        var Iterable<NotificationListener<?>> listenerToNotify = Collections.emptySet();\r
+        for (type : allTypes) {\r
+            listenerToNotify = listenerToNotify + listeners.get(type as Class<? extends Notification>)\r
+        }\r
+        val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet;\r
+        executor.invokeAll(tasks);\r
+    }\r
+\r
+    override <T extends Notification> registerNotificationListener(Class<T> notificationType,\r
+        NotificationListener<T> listener) {\r
+        val reg = new GenericNotificationRegistration<T>(notificationType, listener, this);\r
+        listeners.put(notificationType, listener);\r
+        return reg;\r
+    }\r
+\r
+    override registerNotificationListener(\r
+        org.opendaylight.yangtools.yang.binding.NotificationListener listener) {\r
+        val invoker = BindingAwareBrokerImpl.generator.invokerFactory.invokerFor(listener);\r
+        for (notifyType : invoker.supportedNotifications) {\r
+            listeners.put(notifyType, invoker.invocationProxy)\r
+        }\r
+        val registration = new GeneratedListenerRegistration(listener, invoker,this);\r
+        return registration as Registration<org.opendaylight.yangtools.yang.binding.NotificationListener>;\r
+    }\r
+\r
+    protected def unregisterListener(GenericNotificationRegistration<?> reg) {\r
+        listeners.remove(reg.type, reg.instance);\r
+    }\r
+\r
+    protected def unregisterListener(GeneratedListenerRegistration reg) {\r
+        for (notifyType : reg.invoker.supportedNotifications) {\r
+            listeners.remove(notifyType, reg.invoker.invocationProxy)\r
+        }\r
+    }\r
+    \r
+    override close()  {\r
+        //FIXME: implement properly.\r
+    }\r
+    \r
+}\r
+\r
+class GenericNotificationRegistration<T extends Notification> extends AbstractObjectRegistration<NotificationListener<T>> implements ListenerRegistration<NotificationListener<T>> {\r
+\r
+    @Property\r
+    val Class<T> type;\r
+\r
+    var NotificationBrokerImpl notificationBroker;\r
+\r
+    public new(Class<T> type, NotificationListener<T> instance, NotificationBrokerImpl broker) {\r
+        super(instance);\r
+        _type = type;\r
+        notificationBroker = broker;\r
+    }\r
+\r
+    override protected removeRegistration() {\r
+        notificationBroker.unregisterListener(this);\r
+        notificationBroker = null;\r
+    }\r
+}\r
+\r
+class GeneratedListenerRegistration extends AbstractObjectRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> implements ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> {\r
+\r
+    @Property\r
+    val NotificationInvoker invoker;\r
+    \r
+    var NotificationBrokerImpl notificationBroker;\r
+    \r
+\r
+    new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) {\r
+        super(instance);\r
+        _invoker = invoker;\r
+        notificationBroker = broker;\r
+    }\r
+\r
+    override protected removeRegistration() {\r
+        notificationBroker.unregisterListener(this);\r
+        notificationBroker = null;\r
+        invoker.close();\r
+    }\r
+}\r
+\r
+@Data\r
+class NotifyTask implements Callable<Object> {\r
+\r
+    private static val log = LoggerFactory.getLogger(NotifyTask);\r
+\r
+    val NotificationListener listener;\r
+    val Notification notification;\r
+\r
+    override call() {\r
+        try {\r
+            log.info("Delivering notification {} to {}",notification,listener);\r
+            listener.onNotification(notification);\r
+            log.info("Notification delivered {} to {}",notification,listener);\r
+        } catch (Exception e) {\r
+            log.error("Unhandled exception thrown by listener: {}", listener, e);\r
+        }\r
+        return null;\r
+    }\r
+\r
+}\r
index 5f4f815..da47438 100644 (file)
-package org.opendaylight.controller.md.sal.common.impl.service
-
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus
-import org.opendaylight.controller.md.sal.common.api.data.DataReader
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration
-import org.opendaylight.yangtools.concepts.ListenerRegistration
-import com.google.common.collect.Multimap
-import static com.google.common.base.Preconditions.*;
-import java.util.List
-import com.google.common.collect.HashMultimap
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Callable
-import org.opendaylight.yangtools.yang.common.RpcResult
-import java.util.Collections
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction
-import java.util.ArrayList
-import org.opendaylight.yangtools.concepts.CompositeObjectRegistration
-import java.util.Arrays
-import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService
-import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory
-import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener
-import org.opendaylight.controller.sal.common.util.Rpcs
-import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification
-import java.util.concurrent.Future
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter
-import org.opendaylight.yangtools.concepts.Path
-import org.slf4j.LoggerFactory
-import java.util.HashSet
-import java.util.Collection
-import com.google.common.collect.FluentIterable;
-import java.util.Set
-import com.google.common.collect.ImmutableList
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration
-import org.opendaylight.controller.md.sal.common.api.RegistrationListener
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
-
-abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //
-DataReader<P, D>, //
-DataChangePublisher<P, D, DCL>, //
-DataProvisionService<P, D> {
-
-    private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);
-
-    @Property
-    var ExecutorService executor;
-
-    @Property
-    var AbstractDataReadRouter<P, D> dataReadRouter;
-
-    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();
-    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();
-    
-    val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();
-    public new() {
-    }
-
-    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(
-        HashSet<P> paths) {
-        return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
-        .transformAndConcat[value] //
-        .transform[instance].toList()
-    }
-
-    override final readConfigurationData(P path) {
-        return dataReadRouter.readConfigurationData(path);
-    }
-
-    override final readOperationalData(P path) {
-        return dataReadRouter.readOperationalData(path);
-    }
-
-    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
-        val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
-        commitHandlers.put(path, registration)
-        LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);
-        for(listener : commitHandlerRegistrationListeners) {
-            try {
-                listener.instance.onRegister(registration);
-            } catch (Exception e) {
-                LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
-            }
-        }
-        return registration;
-    }
-
-    override final def registerDataChangeListener(P path, DCL listener) {
-        val reg = new DataChangeListenerRegistration(path, listener, this);
-        listeners.put(path, reg);
-        val initialConfig = dataReadRouter.readConfigurationData(path);
-        val initialOperational = dataReadRouter.readOperationalData(path);
-        val event = createInitialListenerEvent(path,initialConfig,initialOperational);
-        listener.onDataChanged(event);
-        return reg;
-    }
-
-    final def registerDataReader(P path, DataReader<P, D> reader) {
-
-        val confReg = dataReadRouter.registerConfigurationReader(path, reader);
-        val dataReg = dataReadRouter.registerOperationalReader(path, reader);
-
-        return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
-    }
-    
-    override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
-        val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);
-        
-        return ret;
-    }
-    
-    protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
-        return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);
-        
-    }
-
-    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
-        listeners.remove(registration.path, registration);
-    }
-
-    protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {
-        commitHandlers.remove(registration.path, registration);
-        
-         LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
-        for(listener : commitHandlerRegistrationListeners) {
-            try {
-                listener.instance.onUnregister(registration);
-            } catch (Exception e) {
-                LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
-            }
-        }
-    }
-
-    protected final def getActiveCommitHandlers() {
-        return commitHandlers.entries;
-    }
-
-    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(
-        HashSet<P> paths) {
-        return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
-            val operationalState = readOperationalData(key)
-            val configurationState = readConfigurationData(key)
-            return new ListenerStateCapture(key, value, operationalState, configurationState)
-        ].toList()
-    }
-
-    protected def boolean isAffectedBy(P key, Set<P> paths) {
-        if (paths.contains(key)) {
-            return true;
-        }
-        for (path : paths) {
-            if (key.contains(path)) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {
-        checkNotNull(transaction);
-        transaction.changeStatus(TransactionStatus.SUBMITED);
-        val task = new TwoPhaseCommit(transaction, this);
-        return executor.submit(task);
-    }
-
-}
-
-@Data
-package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
-
-    @Property
-    P path;
-
-    @Property
-    Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;
-
-    @Property
-    D initialOperationalState;
-
-    @Property
-    D initialConfigurationState;
-}
-
-package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
-
-    AbstractDataBroker<P, D, DCL> dataBroker;
-
-    @Property
-    val P path;
-
-    new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {
-        super(instance)
-        dataBroker = broker;
-        _path = path;
-    }
-
-    override protected removeRegistration() {
-        dataBroker.removeListener(this);
-        dataBroker = null;
-    }
-
-}
-
-package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
-implements DataCommitHandlerRegistration<P, D> {
-
-    AbstractDataBroker<P, D, ?> dataBroker;
-
-    @Property
-    val P path;
-
-    new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {
-        super(instance)
-        dataBroker = broker;
-        _path = path;
-    }
-
-    override protected removeRegistration() {
-        dataBroker.removeCommitHandler(this);
-        dataBroker = null;
-    }
-}
-
-package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {
-
-    private static val log = LoggerFactory.getLogger(TwoPhaseCommit);
-
-    val AbstractDataTransaction<P, D> transaction;
-    val AbstractDataBroker<P, D, DCL> dataBroker;
-
-    new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {
-        this.transaction = transaction;
-        this.dataBroker = broker;
-    }
-
-    override call() throws Exception {
-
-        // get affected paths
-        val affectedPaths = new HashSet<P>();
-
-        affectedPaths.addAll(transaction.createdConfigurationData.keySet);
-        affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
-        affectedPaths.addAll(transaction.removedConfigurationData);
-
-        affectedPaths.addAll(transaction.createdOperationalData.keySet);
-        affectedPaths.addAll(transaction.updatedOperationalData.keySet);
-        affectedPaths.addAll(transaction.removedOperationalData);
-
-        val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
-
-        val transactionId = transaction.identifier;
-
-        log.info("Transaction: {} Started.",transactionId);
-        // requesting commits
-        val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);
-        val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();
-        try {
-            for (handler : commitHandlers) {
-                handlerTransactions.add(handler.requestCommit(transaction));
-            }
-        } catch (Exception e) {
-            log.error("Transaction: {} Request Commit failed", transactionId,e);
-            return rollback(handlerTransactions, e);
-        }
-        val List<RpcResult<Void>> results = new ArrayList();
-        try {
-            for (subtransaction : handlerTransactions) {
-                results.add(subtransaction.finish());
-            }
-            listeners.publishDataChangeEvent();
-        } catch (Exception e) {
-            log.error("Transaction: {} Finish Commit failed",transactionId, e);
-            return rollback(handlerTransactions, e);
-        }
-        log.info("Transaction: {} Finished succesfully.",transactionId);
-        return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());
-
-    }
-
-    def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
-        for (listenerSet : listeners) {
-            val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
-            val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
-
-            val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
-                listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
-            for (listener : listenerSet.listeners) {
-                try {
-                    listener.instance.onDataChanged(changeEvent);
-
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-
-    def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {
-        for (transaction : transactions) {
-            transaction.rollback()
-        }
-
-        // FIXME return encountered error.
-        return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());
-    }
-}
-
-public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {
-
-    @Property
-    private val Object identifier;
-
-    var TransactionStatus status;
-
-    var AbstractDataBroker<P, D, ?> broker;
-
-    protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {
-        super(dataBroker);
-        _identifier = identifier;
-        broker = dataBroker;
-        status = TransactionStatus.NEW;
-
-    //listeners = new ListenerRegistry<>();
-    }
-
-    override commit() {
-        return broker.commit(this);
-    }
-
-    override readConfigurationData(P path) {
-        return broker.readConfigurationData(path);
-    }
-
-    override readOperationalData(P path) {
-        return broker.readOperationalData(path);
-    }
-
-    override hashCode() {
-        return identifier.hashCode;
-    }
-
-    override equals(Object obj) {
-        if (this === obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        val other = (obj as AbstractDataTransaction<P,D>);
-        if (broker == null) {
-            if (other.broker != null)
-                return false;
-        } else if (!broker.equals(other.broker))
-            return false;
-        if (identifier == null) {
-            if (other.identifier != null)
-                return false;
-        } else if (!identifier.equals(other.identifier))
-            return false;
-        return true;
-    }
-
-    override TransactionStatus getStatus() {
-        return status;
-    }
-
-    protected abstract def void onStatusChange(TransactionStatus status);
-
-    public def changeStatus(TransactionStatus status) {
-        this.status = status;
-        onStatusChange(status);
-    }
-
-}
+package org.opendaylight.controller.md.sal.common.impl.service\r
+\r
+import com.google.common.collect.FluentIterable\r
+import com.google.common.collect.HashMultimap\r
+import com.google.common.collect.ImmutableList\r
+import com.google.common.collect.Multimap\r
+import java.util.ArrayList\r
+import java.util.Arrays\r
+import java.util.Collection\r
+import java.util.Collections\r
+import java.util.HashSet\r
+import java.util.List\r
+import java.util.Set\r
+import java.util.concurrent.Callable\r
+import java.util.concurrent.ExecutorService\r
+import java.util.concurrent.Future\r
+import java.util.concurrent.atomic.AtomicLong\r
+import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
+import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
+import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
+import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
+import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
+import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
+import org.opendaylight.controller.sal.common.util.Rpcs\r
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
+import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
+import org.opendaylight.yangtools.concepts.ListenerRegistration\r
+import org.opendaylight.yangtools.concepts.Path\r
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
+import org.opendaylight.yangtools.yang.common.RpcResult\r
+import org.slf4j.LoggerFactory\r
+\r
+import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
+
+abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
+DataReader<P, D>, //\r
+DataChangePublisher<P, D, DCL>, //\r
+DataProvisionService<P, D> {\r
+\r
+    private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
+\r
+    @Property\r
+    var ExecutorService executor;\r
+\r
+    @Property\r
+    var AbstractDataReadRouter<P, D> dataReadRouter;\r
+    \r
+    @Property\r
+    private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
+    \r
+    @Property\r
+    private val AtomicLong failedTransactionsCount = new AtomicLong\r
+    \r
+    @Property\r
+    private val AtomicLong finishedTransactionsCount = new AtomicLong\r
+\r
+    Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = HashMultimap.create();\r
+    Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = HashMultimap.create();\r
+    \r
+    val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
+    public new() {\r
+    }\r
+\r
+    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
+        HashSet<P> paths) {\r
+        return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
+        .transformAndConcat[value] //\r
+        .transform[instance].toList()\r
+    }\r
+\r
+    override final readConfigurationData(P path) {\r
+        return dataReadRouter.readConfigurationData(path);\r
+    }\r
+\r
+    override final readOperationalData(P path) {\r
+        return dataReadRouter.readOperationalData(path);\r
+    }\r
+\r
+    override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {\r
+        val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
+        commitHandlers.put(path, registration)\r
+        LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path);\r
+        for(listener : commitHandlerRegistrationListeners) {\r
+            try {\r
+                listener.instance.onRegister(registration);\r
+            } catch (Exception e) {\r
+                LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
+            }\r
+        }\r
+        return registration;\r
+    }\r
+\r
+    override final def registerDataChangeListener(P path, DCL listener) {\r
+        val reg = new DataChangeListenerRegistration(path, listener, this);\r
+        listeners.put(path, reg);\r
+        val initialConfig = dataReadRouter.readConfigurationData(path);\r
+        val initialOperational = dataReadRouter.readOperationalData(path);\r
+        val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
+        listener.onDataChanged(event);\r
+        return reg;\r
+    }\r
+\r
+    final def registerDataReader(P path, DataReader<P, D> reader) {\r
+\r
+        val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
+        val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
+\r
+        return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));\r
+    }\r
+    \r
+    override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
+        val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
+        \r
+        return ret;\r
+    }\r
+    \r
+    protected  def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
+        return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
+        \r
+    }\r
+\r
+    protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {\r
+        listeners.remove(registration.path, registration);\r
+    }\r
+\r
+    protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
+        commitHandlers.remove(registration.path, registration);\r
+        \r
+         LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
+        for(listener : commitHandlerRegistrationListeners) {\r
+            try {\r
+                listener.instance.onUnregister(registration);\r
+            } catch (Exception e) {\r
+                LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
+            }\r
+        }\r
+    }\r
+\r
+    protected final def getActiveCommitHandlers() {\r
+        return commitHandlers.entries;\r
+    }\r
+\r
+    protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
+        HashSet<P> paths) {\r
+        return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
+            val operationalState = readOperationalData(key)\r
+            val configurationState = readConfigurationData(key)\r
+            return new ListenerStateCapture(key, value, operationalState, configurationState)\r
+        ].toList()\r
+    }\r
+\r
+    protected def boolean isAffectedBy(P key, Set<P> paths) {\r
+        if (paths.contains(key)) {\r
+            return true;\r
+        }\r
+        for (path : paths) {\r
+            if (key.contains(path)) {\r
+                return true;\r
+            }\r
+        }\r
+\r
+        return false;\r
+    }\r
+\r
+    package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
+        checkNotNull(transaction);\r
+        transaction.changeStatus(TransactionStatus.SUBMITED);\r
+        val task = new TwoPhaseCommit(transaction, this);\r
+        submittedTransactionsCount.andIncrement;\r
+        return executor.submit(task);\r
+    }\r
+\r
+}\r
+\r
+@Data\r
+package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
+\r
+    @Property\r
+    P path;\r
+\r
+    @Property\r
+    Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
+\r
+    @Property\r
+    D initialOperationalState;\r
+\r
+    @Property\r
+    D initialConfigurationState;\r
+}\r
+\r
+package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
+\r
+    AbstractDataBroker<P, D, DCL> dataBroker;\r
+\r
+    @Property\r
+    val P path;\r
+\r
+    new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
+        super(instance)\r
+        dataBroker = broker;\r
+        _path = path;\r
+    }\r
+\r
+    override protected removeRegistration() {\r
+        dataBroker.removeListener(this);\r
+        dataBroker = null;\r
+    }\r
+\r
+}\r
+\r
+package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
+extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
+implements DataCommitHandlerRegistration<P, D> {\r
+\r
+    AbstractDataBroker<P, D, ?> dataBroker;\r
+\r
+    @Property\r
+    val P path;\r
+\r
+    new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
+        super(instance)\r
+        dataBroker = broker;\r
+        _path = path;\r
+    }\r
+\r
+    override protected removeRegistration() {\r
+        dataBroker.removeCommitHandler(this);\r
+        dataBroker = null;\r
+    }\r
+}\r
+\r
+package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
+\r
+    private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
+\r
+    val AbstractDataTransaction<P, D> transaction;\r
+    val AbstractDataBroker<P, D, DCL> dataBroker;\r
+    \r
+    new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
+        this.transaction = transaction;\r
+        this.dataBroker = broker;\r
+    }\r
+\r
+    override call() throws Exception {\r
+\r
+        // get affected paths\r
+        val affectedPaths = new HashSet<P>();\r
+\r
+        affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
+        affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
+        affectedPaths.addAll(transaction.removedConfigurationData);\r
+\r
+        affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
+        affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
+        affectedPaths.addAll(transaction.removedOperationalData);\r
+\r
+        val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
+\r
+        val transactionId = transaction.identifier;\r
+\r
+        log.info("Transaction: {} Started.",transactionId);\r
+        // requesting commits\r
+        val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
+        val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
+        try {\r
+            for (handler : commitHandlers) {\r
+                handlerTransactions.add(handler.requestCommit(transaction));\r
+            }\r
+        } catch (Exception e) {\r
+            log.error("Transaction: {} Request Commit failed", transactionId,e);\r
+            dataBroker.failedTransactionsCount.andIncrement\r
+            return rollback(handlerTransactions, e);\r
+        }\r
+        val List<RpcResult<Void>> results = new ArrayList();\r
+        try {\r
+            for (subtransaction : handlerTransactions) {\r
+                results.add(subtransaction.finish());\r
+            }\r
+            listeners.publishDataChangeEvent();\r
+        } catch (Exception e) {\r
+            log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
+            dataBroker.failedTransactionsCount.andIncrement\r
+            return rollback(handlerTransactions, e);\r
+        }\r
+        log.info("Transaction: {} Finished successfully.",transactionId);\r
+        dataBroker.finishedTransactionsCount.andIncrement;\r
+        return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
+\r
+    }\r
+\r
+    def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
+        for (listenerSet : listeners) {\r
+            val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);\r
+            val updatedOperational = dataBroker.readOperationalData(listenerSet.path);\r
+\r
+            val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,\r
+                listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);\r
+            for (listener : listenerSet.listeners) {\r
+                try {\r
+                    listener.instance.onDataChanged(changeEvent);\r
+\r
+                } catch (Exception e) {\r
+                    e.printStackTrace();\r
+                }\r
+            }\r
+        }\r
+    }\r
+\r
+    def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
+        for (transaction : transactions) {\r
+            transaction.rollback()\r
+        }\r
+\r
+        // FIXME return encountered error.\r
+        return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
+    }\r
+}\r
+\r
+public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
+\r
+    @Property\r
+    private val Object identifier;\r
+\r
+    var TransactionStatus status;\r
+\r
+    var AbstractDataBroker<P, D, ?> broker;\r
+\r
+    protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
+        super(dataBroker);\r
+        _identifier = identifier;\r
+        broker = dataBroker;\r
+        status = TransactionStatus.NEW;\r
+\r
+    //listeners = new ListenerRegistry<>();\r
+    }\r
+\r
+    override commit() {\r
+        return broker.commit(this);\r
+    }\r
+\r
+    override readConfigurationData(P path) {\r
+        return broker.readConfigurationData(path);\r
+    }\r
+\r
+    override readOperationalData(P path) {\r
+        return broker.readOperationalData(path);\r
+    }\r
+\r
+    override hashCode() {\r
+        return identifier.hashCode;\r
+    }\r
+\r
+    override equals(Object obj) {\r
+        if (this === obj)\r
+            return true;\r
+        if (obj == null)\r
+            return false;\r
+        if (getClass() != obj.getClass())\r
+            return false;\r
+        val other = (obj as AbstractDataTransaction<P,D>);\r
+        if (broker == null) {\r
+            if (other.broker != null)\r
+                return false;\r
+        } else if (!broker.equals(other.broker))\r
+            return false;\r
+        if (identifier == null) {\r
+            if (other.identifier != null)\r
+                return false;\r
+        } else if (!identifier.equals(other.identifier))\r
+            return false;\r
+        return true;\r
+    }\r
+\r
+    override TransactionStatus getStatus() {\r
+        return status;\r
+    }\r
+\r
+    protected abstract def void onStatusChange(TransactionStatus status);\r
+\r
+    public def changeStatus(TransactionStatus status) {\r
+        this.status = status;\r
+        onStatusChange(status);\r
+    }\r
+\r
+}\r
index 004574d..d963da1 100644 (file)
@@ -1,68 +1,71 @@
-module opendaylight-md-sal-common {
-       yang-version 1;
-    namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:common";
-    prefix "md-sal-common";
-    
-    description
-        "Common definition for MD-SAL.";
-    revision "2013-10-28" {
-        description
-            "Initial revision";
-    }
-    
-    grouping rpc-routing-table {
-        
-        leaf routing-context {
-            type string;
-        }
-        list routes {
-            leaf path {
-                type string;
-            }
-            leaf destination {
-                type string;
-            }
-        }
-    
-    }
-
-    grouping rpc-router {
-        leaf module {
-            type string;
-        }
-        container routing-tables {
-            list routing-table {
-                uses rpc-routing-table;
-            }
-        }
-    }
-    
-    grouping rpc-state {
-        list rpc-router {
-            uses rpc-router;
-        }
-    }
-    
-    grouping notification-state {
-        container notifications {
-            leaf published {
-                type uint32;
-            }
-        }
-    }
-    
-    grouping data-state {
-        container transactions {
-            leaf created {
-                type uint32;
-            }
-            leaf successful {
-                type uint32;
-            }
-            leaf failed {
-                type uint32;
-            }
-        }
-    }
+module opendaylight-md-sal-common {\r
+       yang-version 1;\r
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:common";\r
+    prefix "md-sal-common";\r
+    \r
+    description\r
+        "Common definition for MD-SAL.";\r
\r
+    revision "2013-10-28" {\r
+        description\r
+            "Initial revision";\r
+    }\r
+    \r
+    grouping rpc-routing-table {\r
+        \r
+        leaf routing-context {\r
+            type string;\r
+        }\r
+        list routes {\r
+            leaf path {\r
+                type string;\r
+            }\r
+            leaf destination {\r
+                type string;\r
+            }\r
+        }\r
+    \r
+    }\r
+\r
+    grouping rpc-router {\r
+        leaf module {\r
+            type string;\r
+        }\r
+        container routing-tables {\r
+            list routing-table {\r
+                uses rpc-routing-table;\r
+            }\r
+        }\r
+    }\r
+    \r
+    grouping rpc-state {\r
+        list rpc-router {\r
+            uses rpc-router;\r
+        }\r
+    }\r
+    \r
+    grouping notification-state {\r
+        container notifications {\r
+            leaf published {\r
+                type uint32;\r
+            }\r
+        }\r
+    }\r
+    \r
+    grouping data-state {\r
+        container transactions {\r
+            leaf created {\r
+                type uint32;\r
+            }\r
+            leaf submitted {\r
+                type uint32;\r
+            }\r
+            leaf successful {\r
+                type uint32;\r
+            }\r
+            leaf failed {\r
+                type uint32;\r
+            }\r
+        }\r
+    }\r
 }
\ No newline at end of file

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.