From e5b6f6c1b1f677207cc7d85eeb816b3abe27297c Mon Sep 17 00:00:00 2001 From: Lukas Sedlak Date: Mon, 2 Dec 2013 12:38:03 +0100 Subject: [PATCH] Added transactions statistics support. Added implementation of DataBrokerImplRuntimeMXBean as DataBrokerRuntimeMXBeanImpl; Modified DataBrokerImplModule with DataBrokerRuntimeMXBeanImpl; Added "submitted" leaf into opendaylight-md-sal-common.yang; Signed-off-by: Lukas Sedlak Change-Id: I5494c7175b37c14e39d6b2b3d87fccd3358f1a57 --- .../binding/impl/BindingBrokerImplModule.java | 105 +-- .../binding/impl/DataBrokerImplModule.java | 165 ++-- .../DataBrokerRuntimeMXBeanImpl.java | 19 + .../sal/binding/impl/DataBrokerImpl.java | 203 ++--- .../binding/impl/NotificationBrokerImpl.xtend | 374 +++++---- .../impl/service/AbstractDataBroker.xtend | 768 +++++++++--------- .../main/yang/opendaylight-md-sal-common.yang | 137 ++-- 7 files changed, 903 insertions(+), 868 deletions(-) create mode 100644 opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/statistics/DataBrokerRuntimeMXBeanImpl.java diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/BindingBrokerImplModule.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/BindingBrokerImplModule.java index bce1f61cf5..48c33ad0fc 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/BindingBrokerImplModule.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/BindingBrokerImplModule.java @@ -1,52 +1,53 @@ -/** -* Generated file - -* Generated from: yang module name: opendaylight-sal-binding-broker-impl yang module local name: binding-broker-impl -* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator -* Generated at: Wed Nov 20 17:33:01 CET 2013 -* -* Do not modify this file unless it is present under src/main directory -*/ -package org.opendaylight.controller.config.yang.md.sal.binding.impl; - -import org.opendaylight.controller.sal.binding.impl.BindingAwareBrokerImpl; -import org.osgi.framework.BundleContext; - -/** -* -*/ -public final class BindingBrokerImplModule extends org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractBindingBrokerImplModule { - - private BundleContext bundleContext; - - public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { - super(identifier, dependencyResolver); - } - - public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, BindingBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) { - super(identifier, dependencyResolver, oldModule, oldInstance); - } - - @Override - public void validate(){ - super.validate(); - // Add custom validation for module attributes here. - } - - @Override - public java.lang.AutoCloseable createInstance() { - BindingAwareBrokerImpl broker = new BindingAwareBrokerImpl(getBundleContext()); - broker.setDataBroker(getDataBrokerDependency()); - broker.setNotifyBroker(getNotificationServiceDependency()); - broker.start(); - return broker; - } - - public BundleContext getBundleContext() { - return bundleContext; - } - - public void setBundleContext(BundleContext bundleContext) { - this.bundleContext = bundleContext; - } -} +/** +* Generated file + +* Generated from: yang module name: opendaylight-sal-binding-broker-impl yang module local name: binding-broker-impl +* Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator +* Generated at: Wed Nov 20 17:33:01 CET 2013 +* +* Do not modify this file unless it is present under src/main directory +*/ +package org.opendaylight.controller.config.yang.md.sal.binding.impl; + +import org.opendaylight.controller.sal.binding.impl.BindingAwareBrokerImpl; +import org.osgi.framework.BundleContext; + +import com.google.common.base.Preconditions; + +/** +* +*/ +public final class BindingBrokerImplModule extends org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractBindingBrokerImplModule { + + private BundleContext bundleContext; + + public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { + super(identifier, dependencyResolver); + } + + public BindingBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, BindingBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) { + super(identifier, dependencyResolver, oldModule, oldInstance); + } + + @Override + public void validate(){ + super.validate(); + } + + @Override + public java.lang.AutoCloseable createInstance() { + BindingAwareBrokerImpl broker = new BindingAwareBrokerImpl(getBundleContext()); + broker.setDataBroker(getDataBrokerDependency()); + broker.setNotifyBroker(getNotificationServiceDependency()); + broker.start(); + return broker; + } + + public BundleContext getBundleContext() { + return bundleContext; + } + + public void setBundleContext(BundleContext bundleContext) { + this.bundleContext = bundleContext; + } +} diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java index 8cbfbb6da2..74b6ad8a23 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java @@ -1,83 +1,82 @@ -/** - * Generated file - - * Generated from: yang module name: opendaylight-sal-binding-broker-impl yang module local name: binding-data-broker - * Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator - * Generated at: Wed Nov 20 17:33:01 CET 2013 - * - * Do not modify this file unless it is present under src/main directory - */ -package org.opendaylight.controller.config.yang.md.sal.binding.impl; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - -import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter; -import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl; -import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentDataServiceConnector; -import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentMappingService; -import org.opendaylight.controller.sal.core.api.Broker; -import org.opendaylight.controller.sal.core.api.data.DataProviderService; -import org.opendaylight.yangtools.yang.binding.DataObject; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.osgi.framework.BundleContext; - -import com.google.common.util.concurrent.MoreExecutors; - -/** -* -*/ -public final class DataBrokerImplModule extends - org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractDataBrokerImplModule { - - private BundleContext bundleContext; - - public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, - org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { - super(identifier, dependencyResolver); - } - - public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, - org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, - DataBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) { - super(identifier, dependencyResolver, oldModule, oldInstance); - } - - @Override - public void validate() { - super.validate(); - } - - @Override - public java.lang.AutoCloseable createInstance() { - DataBrokerImpl dataBindingBroker = new DataBrokerImpl(); - - // FIXME: obtain via dependency management - ExecutorService executor = Executors.newCachedThreadPool(); - ExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor); - dataBindingBroker.setExecutor(listeningExecutor); - - - - Broker domBroker = getDomBrokerDependency(); - BindingIndependentMappingService mappingService = getMappingServiceDependency(); - - if (domBroker != null && mappingService != null) { - BindingIndependentDataServiceConnector runtimeMapping = new BindingIndependentDataServiceConnector(); - runtimeMapping.setMappingService(mappingService); - runtimeMapping.setBaDataService(dataBindingBroker); - domBroker.registerProvider(runtimeMapping, getBundleContext()); - } - - return dataBindingBroker; - } - - public BundleContext getBundleContext() { - return bundleContext; - } - - public void setBundleContext(BundleContext bundleContext2) { - this.bundleContext = bundleContext2; - } -} +/** + * Generated file + + * Generated from: yang module name: opendaylight-sal-binding-broker-impl yang module local name: binding-data-broker + * Generated by: org.opendaylight.controller.config.yangjmxgenerator.plugin.JMXGenerator + * Generated at: Wed Nov 20 17:33:01 CET 2013 + * + * Do not modify this file unless it is present under src/main directory + */ +package org.opendaylight.controller.config.yang.md.sal.binding.impl; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.opendaylight.controller.config.yang.md.sal.binding.statistics.DataBrokerRuntimeMXBeanImpl; +import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter; +import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl; +import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentDataServiceConnector; +import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentMappingService; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.data.DataProviderService; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.osgi.framework.BundleContext; + +import com.google.common.util.concurrent.MoreExecutors; + +/** +* +*/ +public final class DataBrokerImplModule extends + org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractDataBrokerImplModule { + + private BundleContext bundleContext; + + public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, + org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) { + super(identifier, dependencyResolver); + } + + public DataBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, + org.opendaylight.controller.config.api.DependencyResolver dependencyResolver, + DataBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) { + super(identifier, dependencyResolver, oldModule, oldInstance); + } + + @Override + public void validate() { + super.validate(); + } + + @Override + public java.lang.AutoCloseable createInstance() { + DataBrokerRuntimeMXBeanImpl dataBindingBroker = new DataBrokerRuntimeMXBeanImpl(); + + // FIXME: obtain via dependency management + ExecutorService executor = Executors.newCachedThreadPool(); + ExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor); + dataBindingBroker.setExecutor(listeningExecutor); + + Broker domBroker = getDomBrokerDependency(); + BindingIndependentMappingService mappingService = getMappingServiceDependency(); + + if (domBroker != null && mappingService != null) { + BindingIndependentDataServiceConnector runtimeMapping = new BindingIndependentDataServiceConnector(); + runtimeMapping.setMappingService(mappingService); + runtimeMapping.setBaDataService(dataBindingBroker); + domBroker.registerProvider(runtimeMapping, getBundleContext()); + } + getRootRuntimeBeanRegistratorWrapper().register(dataBindingBroker); + return dataBindingBroker; + } + + public BundleContext getBundleContext() { + return bundleContext; + } + + public void setBundleContext(BundleContext bundleContext2) { + this.bundleContext = bundleContext2; + } +} diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/statistics/DataBrokerRuntimeMXBeanImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/statistics/DataBrokerRuntimeMXBeanImpl.java new file mode 100644 index 0000000000..a1a24ebc8a --- /dev/null +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/statistics/DataBrokerRuntimeMXBeanImpl.java @@ -0,0 +1,19 @@ +package org.opendaylight.controller.config.yang.md.sal.binding.statistics; + +import org.opendaylight.controller.config.yang.md.sal.binding.impl.DataBrokerImplRuntimeMXBean; +import org.opendaylight.controller.config.yang.md.sal.binding.impl.Transactions; +import org.opendaylight.controller.sal.binding.impl.DataBrokerImpl; + +public class DataBrokerRuntimeMXBeanImpl extends DataBrokerImpl implements DataBrokerImplRuntimeMXBean { + + private Transactions transactions = new Transactions(); + + @Override + public Transactions getTransactions() { + transactions.setCreated(getCreatedTransactionsCount().get()); + transactions.setSubmitted(getSubmittedTransactionsCount().get()); + transactions.setSuccessful(getFinishedTransactionsCount().get()); + transactions.setFailed(getFailedTransactionsCount().get()); + return transactions; + } +} diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.java index 2f4510d5b4..02c59b2779 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.java @@ -1,102 +1,103 @@ -package org.opendaylight.controller.sal.binding.impl; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; - -import org.opendaylight.controller.md.sal.common.api.data.DataReader; -import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataBroker; -import org.opendaylight.controller.sal.binding.api.data.DataChangeListener; -import org.opendaylight.controller.sal.binding.api.data.DataProviderService; -import org.opendaylight.controller.sal.binding.impl.util.BindingAwareDataReaderRouter; -import org.opendaylight.controller.sal.common.DataStoreIdentifier; -import org.opendaylight.yangtools.concepts.Registration; -import org.opendaylight.yangtools.yang.binding.DataObject; -import org.opendaylight.yangtools.yang.binding.DataRoot; -import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcResult; - - -public class DataBrokerImpl extends AbstractDataBroker, DataObject, DataChangeListener> implements - DataProviderService, AutoCloseable { - - - - private final AtomicLong nextTransaction = new AtomicLong(); - - public DataBrokerImpl() { - setDataReadRouter(new BindingAwareDataReaderRouter()); - } - - @Override - public DataTransactionImpl beginTransaction() { - String transactionId = "BA-" + nextTransaction.getAndIncrement(); - return new DataTransactionImpl(transactionId,this); - } - - @Override - @Deprecated - public T getData(DataStoreIdentifier store, Class rootType) { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - @Deprecated - public T getData(DataStoreIdentifier store, T filter) { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - @Deprecated - public T getCandidateData(DataStoreIdentifier store, Class rootType) { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - @Deprecated - public T getCandidateData(DataStoreIdentifier store, T filter) { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - @Deprecated - public RpcResult editCandidateData(DataStoreIdentifier store, DataRoot changeSet) { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - @Deprecated - public Future> commit(DataStoreIdentifier store) { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - @Deprecated - public DataObject getData(InstanceIdentifier data) { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - @Deprecated - public DataObject getConfigurationData(InstanceIdentifier data) { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - @Deprecated - public void registerChangeListener(InstanceIdentifier path, DataChangeListener changeListener) { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - @Deprecated - public void unregisterChangeListener(InstanceIdentifier path, - DataChangeListener changeListener) { - throw new UnsupportedOperationException("Deprecated"); - } - - @Override - public void close() throws Exception { - - } +package org.opendaylight.controller.sal.binding.impl; + +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; + +import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataBroker; +import org.opendaylight.controller.sal.binding.api.data.DataChangeListener; +import org.opendaylight.controller.sal.binding.api.data.DataProviderService; +import org.opendaylight.controller.sal.binding.impl.util.BindingAwareDataReaderRouter; +import org.opendaylight.controller.sal.common.DataStoreIdentifier; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.DataRoot; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; + + +public class DataBrokerImpl extends AbstractDataBroker, DataObject, DataChangeListener> implements + DataProviderService, AutoCloseable { + + private final AtomicLong nextTransaction = new AtomicLong(); + private final AtomicLong createdTransactionsCount = new AtomicLong(); + + public AtomicLong getCreatedTransactionsCount() { + return createdTransactionsCount; + } + + public DataBrokerImpl() { + setDataReadRouter(new BindingAwareDataReaderRouter()); + } + + @Override + public DataTransactionImpl beginTransaction() { + String transactionId = "BA-" + nextTransaction.getAndIncrement(); + createdTransactionsCount.getAndIncrement(); + return new DataTransactionImpl(transactionId,this); + } + + @Override + @Deprecated + public T getData(DataStoreIdentifier store, Class rootType) { + throw new UnsupportedOperationException("Deprecated"); + } + + @Override + @Deprecated + public T getData(DataStoreIdentifier store, T filter) { + throw new UnsupportedOperationException("Deprecated"); + } + + @Override + @Deprecated + public T getCandidateData(DataStoreIdentifier store, Class rootType) { + throw new UnsupportedOperationException("Deprecated"); + } + + @Override + @Deprecated + public T getCandidateData(DataStoreIdentifier store, T filter) { + throw new UnsupportedOperationException("Deprecated"); + } + + @Override + @Deprecated + public RpcResult editCandidateData(DataStoreIdentifier store, DataRoot changeSet) { + throw new UnsupportedOperationException("Deprecated"); + } + + @Override + @Deprecated + public Future> commit(DataStoreIdentifier store) { + throw new UnsupportedOperationException("Deprecated"); + } + + @Override + @Deprecated + public DataObject getData(InstanceIdentifier data) { + throw new UnsupportedOperationException("Deprecated"); + } + + @Override + @Deprecated + public DataObject getConfigurationData(InstanceIdentifier data) { + throw new UnsupportedOperationException("Deprecated"); + } + + @Override + @Deprecated + public void registerChangeListener(InstanceIdentifier path, DataChangeListener changeListener) { + throw new UnsupportedOperationException("Deprecated"); + } + + @Override + @Deprecated + public void unregisterChangeListener(InstanceIdentifier path, + DataChangeListener changeListener) { + throw new UnsupportedOperationException("Deprecated"); + } + + @Override + public void close() throws Exception { + + } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend index cf339ee4f4..e8b3850b77 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/NotificationBrokerImpl.xtend @@ -1,188 +1,186 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.sal.binding.impl - -import org.opendaylight.controller.sal.binding.api.NotificationProviderService -import org.opendaylight.yangtools.yang.binding.Notification -import com.google.common.collect.Multimap -import org.opendaylight.controller.sal.binding.api.NotificationListener -import com.google.common.collect.HashMultimap -import java.util.concurrent.ExecutorService -import java.util.Collection -import org.opendaylight.yangtools.concepts.Registration -import org.opendaylight.controller.sal.binding.codegen.RuntimeCodeGenerator -import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory -import org.opendaylight.yangtools.concepts.ListenerRegistration -import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker -import org.opendaylight.yangtools.concepts.AbstractObjectRegistration -import java.util.Collections -import org.slf4j.LoggerFactory -import java.util.concurrent.Callable - -class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { - - val Multimap, NotificationListener> listeners; - - @Property - var ExecutorService executor; - - new(ExecutorService executor) { - listeners = HashMultimap.create() - this.executor = executor; - } - - @Deprecated - override addNotificationListener(Class notificationType, - NotificationListener listener) { - listeners.put(notificationType, listener) - } - - @Deprecated - override removeNotificationListener(Class notificationType, - NotificationListener listener) { - listeners.remove(notificationType, listener) - } - - override notify(Notification notification) { - publish(notification) - } - - def getNotificationTypes(Notification notification) { - notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)] - } - - @SuppressWarnings("unchecked") - private def notifyAll(Collection> listeners, Notification notification) { - listeners.forEach[(it as NotificationListener).onNotification(notification)] - } - - @Deprecated - override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) { - throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead."); - - } - - @Deprecated - override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) { - throw new UnsupportedOperationException( - "Deprecated method. Use RegisterNotificationListener returned value to close registration.") - } - - @Deprecated - override notify(Notification notification, ExecutorService service) { - publish(notification, service) - } - - override publish(Notification notification) { - publish(notification, executor) - } - - override publish(Notification notification, ExecutorService service) { - val allTypes = notification.notificationTypes - - var Iterable> listenerToNotify = Collections.emptySet(); - for (type : allTypes) { - listenerToNotify = listenerToNotify + listeners.get(type as Class) - } - val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet; - executor.invokeAll(tasks); - } - - override registerNotificationListener(Class notificationType, - NotificationListener listener) { - val reg = new GenericNotificationRegistration(notificationType, listener, this); - listeners.put(notificationType, listener); - return reg; - } - - override registerNotificationListener( - org.opendaylight.yangtools.yang.binding.NotificationListener listener) { - val invoker = BindingAwareBrokerImpl.generator.invokerFactory.invokerFor(listener); - for (notifyType : invoker.supportedNotifications) { - listeners.put(notifyType, invoker.invocationProxy) - } - val registration = new GeneratedListenerRegistration(listener, invoker,this); - return registration as Registration; - } - - protected def unregisterListener(GenericNotificationRegistration reg) { - listeners.remove(reg.type, reg.instance); - } - - protected def unregisterListener(GeneratedListenerRegistration reg) { - for (notifyType : reg.invoker.supportedNotifications) { - listeners.remove(notifyType, reg.invoker.invocationProxy) - } - } - - override close() { - //FIXME: implement properly. - } - -} - -class GenericNotificationRegistration extends AbstractObjectRegistration> implements ListenerRegistration> { - - @Property - val Class type; - - var NotificationBrokerImpl notificationBroker; - - public new(Class type, NotificationListener instance, NotificationBrokerImpl broker) { - super(instance); - _type = type; - notificationBroker = broker; - } - - override protected removeRegistration() { - notificationBroker.unregisterListener(this); - notificationBroker = null; - } -} - -class GeneratedListenerRegistration extends AbstractObjectRegistration implements ListenerRegistration { - - @Property - val NotificationInvoker invoker; - - var NotificationBrokerImpl notificationBroker; - - - new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) { - super(instance); - _invoker = invoker; - notificationBroker = broker; - } - - override protected removeRegistration() { - notificationBroker.unregisterListener(this); - notificationBroker = null; - invoker.close(); - } -} - -@Data -class NotifyTask implements Callable { - - private static val log = LoggerFactory.getLogger(NotifyTask); - - val NotificationListener listener; - val Notification notification; - - override call() { - try { - log.info("Delivering notification {} to {}",notification,listener); - listener.onNotification(notification); - log.info("Notification delivered {} to {}",notification,listener); - } catch (Exception e) { - log.error("Unhandled exception thrown by listener: {}", listener, e); - } - return null; - } - -} +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.binding.impl + +import com.google.common.collect.HashMultimap +import com.google.common.collect.Multimap +import java.util.Collection +import java.util.Collections +import java.util.concurrent.Callable +import java.util.concurrent.ExecutorService +import org.opendaylight.controller.sal.binding.api.NotificationListener +import org.opendaylight.controller.sal.binding.api.NotificationProviderService +import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration +import org.opendaylight.yangtools.concepts.ListenerRegistration +import org.opendaylight.yangtools.concepts.Registration +import org.opendaylight.yangtools.yang.binding.Notification +import org.slf4j.LoggerFactory + +class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable { + + val Multimap, NotificationListener> listeners; + + @Property + var ExecutorService executor; + + new(ExecutorService executor) { + listeners = HashMultimap.create() + this.executor = executor; + } + + @Deprecated + override addNotificationListener(Class notificationType, + NotificationListener listener) { + listeners.put(notificationType, listener) + } + + @Deprecated + override removeNotificationListener(Class notificationType, + NotificationListener listener) { + listeners.remove(notificationType, listener) + } + + override notify(Notification notification) { + publish(notification) + } + + def getNotificationTypes(Notification notification) { + notification.class.interfaces.filter[it != Notification && Notification.isAssignableFrom(it)] + } + + @SuppressWarnings("unchecked") + private def notifyAll(Collection> listeners, Notification notification) { + listeners.forEach[(it as NotificationListener).onNotification(notification)] + } + + @Deprecated + override addNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) { + throw new UnsupportedOperationException("Deprecated method. Use registerNotificationListener instead."); + + } + + @Deprecated + override removeNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener listener) { + throw new UnsupportedOperationException( + "Deprecated method. Use RegisterNotificationListener returned value to close registration.") + } + + @Deprecated + override notify(Notification notification, ExecutorService service) { + publish(notification, service) + } + + override publish(Notification notification) { + publish(notification, executor) + } + + override publish(Notification notification, ExecutorService service) { + val allTypes = notification.notificationTypes + + var Iterable> listenerToNotify = Collections.emptySet(); + for (type : allTypes) { + listenerToNotify = listenerToNotify + listeners.get(type as Class) + } + val tasks = listenerToNotify.map[new NotifyTask(it, notification)].toSet; + executor.invokeAll(tasks); + } + + override registerNotificationListener(Class notificationType, + NotificationListener listener) { + val reg = new GenericNotificationRegistration(notificationType, listener, this); + listeners.put(notificationType, listener); + return reg; + } + + override registerNotificationListener( + org.opendaylight.yangtools.yang.binding.NotificationListener listener) { + val invoker = BindingAwareBrokerImpl.generator.invokerFactory.invokerFor(listener); + for (notifyType : invoker.supportedNotifications) { + listeners.put(notifyType, invoker.invocationProxy) + } + val registration = new GeneratedListenerRegistration(listener, invoker,this); + return registration as Registration; + } + + protected def unregisterListener(GenericNotificationRegistration reg) { + listeners.remove(reg.type, reg.instance); + } + + protected def unregisterListener(GeneratedListenerRegistration reg) { + for (notifyType : reg.invoker.supportedNotifications) { + listeners.remove(notifyType, reg.invoker.invocationProxy) + } + } + + override close() { + //FIXME: implement properly. + } + +} + +class GenericNotificationRegistration extends AbstractObjectRegistration> implements ListenerRegistration> { + + @Property + val Class type; + + var NotificationBrokerImpl notificationBroker; + + public new(Class type, NotificationListener instance, NotificationBrokerImpl broker) { + super(instance); + _type = type; + notificationBroker = broker; + } + + override protected removeRegistration() { + notificationBroker.unregisterListener(this); + notificationBroker = null; + } +} + +class GeneratedListenerRegistration extends AbstractObjectRegistration implements ListenerRegistration { + + @Property + val NotificationInvoker invoker; + + var NotificationBrokerImpl notificationBroker; + + + new(org.opendaylight.yangtools.yang.binding.NotificationListener instance, NotificationInvoker invoker, NotificationBrokerImpl broker) { + super(instance); + _invoker = invoker; + notificationBroker = broker; + } + + override protected removeRegistration() { + notificationBroker.unregisterListener(this); + notificationBroker = null; + invoker.close(); + } +} + +@Data +class NotifyTask implements Callable { + + private static val log = LoggerFactory.getLogger(NotifyTask); + + val NotificationListener listener; + val Notification notification; + + override call() { + try { + log.info("Delivering notification {} to {}",notification,listener); + listener.onNotification(notification); + log.info("Notification delivered {} to {}",notification,listener); + } catch (Exception e) { + log.error("Unhandled exception thrown by listener: {}", listener, e); + } + return null; + } + +} diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend index 5f4f815961..da47438a6b 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend @@ -1,377 +1,391 @@ -package org.opendaylight.controller.md.sal.common.impl.service - -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler -import org.opendaylight.controller.md.sal.common.api.TransactionStatus -import org.opendaylight.controller.md.sal.common.api.data.DataReader -import org.opendaylight.yangtools.concepts.AbstractObjectRegistration -import org.opendaylight.yangtools.concepts.ListenerRegistration -import com.google.common.collect.Multimap -import static com.google.common.base.Preconditions.*; -import java.util.List -import com.google.common.collect.HashMultimap -import java.util.concurrent.ExecutorService -import java.util.concurrent.Callable -import org.opendaylight.yangtools.yang.common.RpcResult -import java.util.Collections -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction -import java.util.ArrayList -import org.opendaylight.yangtools.concepts.CompositeObjectRegistration -import java.util.Arrays -import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService -import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory -import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher -import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener -import org.opendaylight.controller.sal.common.util.Rpcs -import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification -import java.util.concurrent.Future -import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter -import org.opendaylight.yangtools.concepts.Path -import org.slf4j.LoggerFactory -import java.util.HashSet -import java.util.Collection -import com.google.common.collect.FluentIterable; -import java.util.Set -import com.google.common.collect.ImmutableList -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration -import org.opendaylight.controller.md.sal.common.api.RegistrationListener -import org.opendaylight.yangtools.concepts.util.ListenerRegistry -import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent - -abstract class AbstractDataBroker

, D, DCL extends DataChangeListener> implements DataModificationTransactionFactory, // -DataReader, // -DataChangePublisher, // -DataProvisionService { - - private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); - - @Property - var ExecutorService executor; - - @Property - var AbstractDataReadRouter dataReadRouter; - - Multimap> listeners = HashMultimap.create(); - Multimap> commitHandlers = HashMultimap.create(); - - val ListenerRegistry>> commitHandlerRegistrationListeners = new ListenerRegistry(); - public new() { - } - - protected def /*Iterator>,D>>*/ affectedCommitHandlers( - HashSet

paths) { - return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] // - .transformAndConcat[value] // - .transform[instance].toList() - } - - override final readConfigurationData(P path) { - return dataReadRouter.readConfigurationData(path); - } - - override final readOperationalData(P path) { - return dataReadRouter.readOperationalData(path); - } - - override final registerCommitHandler(P path, DataCommitHandler commitHandler) { - val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this); - commitHandlers.put(path, registration) - LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path); - for(listener : commitHandlerRegistrationListeners) { - try { - listener.instance.onRegister(registration); - } catch (Exception e) { - LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e); - } - } - return registration; - } - - override final def registerDataChangeListener(P path, DCL listener) { - val reg = new DataChangeListenerRegistration(path, listener, this); - listeners.put(path, reg); - val initialConfig = dataReadRouter.readConfigurationData(path); - val initialOperational = dataReadRouter.readOperationalData(path); - val event = createInitialListenerEvent(path,initialConfig,initialOperational); - listener.onDataChanged(event); - return reg; - } - - final def registerDataReader(P path, DataReader reader) { - - val confReg = dataReadRouter.registerConfigurationReader(path, reader); - val dataReg = dataReadRouter.registerOperationalReader(path, reader); - - return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg)); - } - - override registerCommitHandlerListener(RegistrationListener> commitHandlerListener) { - val ret = commitHandlerRegistrationListeners.register(commitHandlerListener); - - return ret; - } - - protected def DataChangeEvent createInitialListenerEvent(P path,D initialConfig,D initialOperational) { - return new InitialDataChangeEventImpl(initialConfig,initialOperational); - - } - - protected final def removeListener(DataChangeListenerRegistration registration) { - listeners.remove(registration.path, registration); - } - - protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) { - commitHandlers.remove(registration.path, registration); - - LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path); - for(listener : commitHandlerRegistrationListeners) { - try { - listener.instance.onUnregister(registration); - } catch (Exception e) { - LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e); - } - } - } - - protected final def getActiveCommitHandlers() { - return commitHandlers.entries; - } - - protected def /*Iterator>,D>>*/ affectedListenersWithInitialState( - HashSet

paths) { - return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [ - val operationalState = readOperationalData(key) - val configurationState = readConfigurationData(key) - return new ListenerStateCapture(key, value, operationalState, configurationState) - ].toList() - } - - protected def boolean isAffectedBy(P key, Set

paths) { - if (paths.contains(key)) { - return true; - } - for (path : paths) { - if (key.contains(path)) { - return true; - } - } - - return false; - } - - package final def Future> commit(AbstractDataTransaction transaction) { - checkNotNull(transaction); - transaction.changeStatus(TransactionStatus.SUBMITED); - val task = new TwoPhaseCommit(transaction, this); - return executor.submit(task); - } - -} - -@Data -package class ListenerStateCapture

, D, DCL extends DataChangeListener> { - - @Property - P path; - - @Property - Collection> listeners; - - @Property - D initialOperationalState; - - @Property - D initialConfigurationState; -} - -package class DataChangeListenerRegistration

, D, DCL extends DataChangeListener> extends AbstractObjectRegistration implements ListenerRegistration { - - AbstractDataBroker dataBroker; - - @Property - val P path; - - new(P path, DCL instance, AbstractDataBroker broker) { - super(instance) - dataBroker = broker; - _path = path; - } - - override protected removeRegistration() { - dataBroker.removeListener(this); - dataBroker = null; - } - -} - -package class DataCommitHandlerRegistrationImpl

, D> // -extends AbstractObjectRegistration> // -implements DataCommitHandlerRegistration { - - AbstractDataBroker dataBroker; - - @Property - val P path; - - new(P path, DataCommitHandler instance, AbstractDataBroker broker) { - super(instance) - dataBroker = broker; - _path = path; - } - - override protected removeRegistration() { - dataBroker.removeCommitHandler(this); - dataBroker = null; - } -} - -package class TwoPhaseCommit

, D, DCL extends DataChangeListener> implements Callable> { - - private static val log = LoggerFactory.getLogger(TwoPhaseCommit); - - val AbstractDataTransaction transaction; - val AbstractDataBroker dataBroker; - - new(AbstractDataTransaction transaction, AbstractDataBroker broker) { - this.transaction = transaction; - this.dataBroker = broker; - } - - override call() throws Exception { - - // get affected paths - val affectedPaths = new HashSet

(); - - affectedPaths.addAll(transaction.createdConfigurationData.keySet); - affectedPaths.addAll(transaction.updatedConfigurationData.keySet); - affectedPaths.addAll(transaction.removedConfigurationData); - - affectedPaths.addAll(transaction.createdOperationalData.keySet); - affectedPaths.addAll(transaction.updatedOperationalData.keySet); - affectedPaths.addAll(transaction.removedOperationalData); - - val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths); - - val transactionId = transaction.identifier; - - log.info("Transaction: {} Started.",transactionId); - // requesting commits - val Iterable> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths); - val List> handlerTransactions = new ArrayList(); - try { - for (handler : commitHandlers) { - handlerTransactions.add(handler.requestCommit(transaction)); - } - } catch (Exception e) { - log.error("Transaction: {} Request Commit failed", transactionId,e); - return rollback(handlerTransactions, e); - } - val List> results = new ArrayList(); - try { - for (subtransaction : handlerTransactions) { - results.add(subtransaction.finish()); - } - listeners.publishDataChangeEvent(); - } catch (Exception e) { - log.error("Transaction: {} Finish Commit failed",transactionId, e); - return rollback(handlerTransactions, e); - } - log.info("Transaction: {} Finished succesfully.",transactionId); - return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet()); - - } - - def void publishDataChangeEvent(ImmutableList> listeners) { - for (listenerSet : listeners) { - val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path); - val updatedOperational = dataBroker.readOperationalData(listenerSet.path); - - val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState, - listenerSet.initialOperationalState, updatedOperational, updatedConfiguration); - for (listener : listenerSet.listeners) { - try { - listener.instance.onDataChanged(changeEvent); - - } catch (Exception e) { - e.printStackTrace(); - } - } - } - } - - def rollback(List> transactions, Exception e) { - for (transaction : transactions) { - transaction.rollback() - } - - // FIXME return encountered error. - return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet()); - } -} - -public abstract class AbstractDataTransaction

, D> extends AbstractDataModification { - - @Property - private val Object identifier; - - var TransactionStatus status; - - var AbstractDataBroker broker; - - protected new(Object identifier,AbstractDataBroker dataBroker) { - super(dataBroker); - _identifier = identifier; - broker = dataBroker; - status = TransactionStatus.NEW; - - //listeners = new ListenerRegistry<>(); - } - - override commit() { - return broker.commit(this); - } - - override readConfigurationData(P path) { - return broker.readConfigurationData(path); - } - - override readOperationalData(P path) { - return broker.readOperationalData(path); - } - - override hashCode() { - return identifier.hashCode; - } - - override equals(Object obj) { - if (this === obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - val other = (obj as AbstractDataTransaction); - if (broker == null) { - if (other.broker != null) - return false; - } else if (!broker.equals(other.broker)) - return false; - if (identifier == null) { - if (other.identifier != null) - return false; - } else if (!identifier.equals(other.identifier)) - return false; - return true; - } - - override TransactionStatus getStatus() { - return status; - } - - protected abstract def void onStatusChange(TransactionStatus status); - - public def changeStatus(TransactionStatus status) { - this.status = status; - onStatusChange(status); - } - -} +package org.opendaylight.controller.md.sal.common.impl.service + +import com.google.common.collect.FluentIterable +import com.google.common.collect.HashMultimap +import com.google.common.collect.ImmutableList +import com.google.common.collect.Multimap +import java.util.ArrayList +import java.util.Arrays +import java.util.Collection +import java.util.Collections +import java.util.HashSet +import java.util.List +import java.util.Set +import java.util.concurrent.Callable +import java.util.concurrent.ExecutorService +import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicLong +import org.opendaylight.controller.md.sal.common.api.RegistrationListener +import org.opendaylight.controller.md.sal.common.api.TransactionStatus +import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener +import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration +import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory +import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService +import org.opendaylight.controller.md.sal.common.api.data.DataReader +import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification +import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter +import org.opendaylight.controller.sal.common.util.Rpcs +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration +import org.opendaylight.yangtools.concepts.CompositeObjectRegistration +import org.opendaylight.yangtools.concepts.ListenerRegistration +import org.opendaylight.yangtools.concepts.Path +import org.opendaylight.yangtools.concepts.util.ListenerRegistry +import org.opendaylight.yangtools.yang.common.RpcResult +import org.slf4j.LoggerFactory + +import static com.google.common.base.Preconditions.* import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent + +abstract class AbstractDataBroker

, D, DCL extends DataChangeListener> implements DataModificationTransactionFactory, // +DataReader, // +DataChangePublisher, // +DataProvisionService { + + private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); + + @Property + var ExecutorService executor; + + @Property + var AbstractDataReadRouter dataReadRouter; + + @Property + private val AtomicLong submittedTransactionsCount = new AtomicLong; + + @Property + private val AtomicLong failedTransactionsCount = new AtomicLong + + @Property + private val AtomicLong finishedTransactionsCount = new AtomicLong + + Multimap> listeners = HashMultimap.create(); + Multimap> commitHandlers = HashMultimap.create(); + + val ListenerRegistry>> commitHandlerRegistrationListeners = new ListenerRegistry(); + public new() { + } + + protected def /*Iterator>,D>>*/ affectedCommitHandlers( + HashSet

paths) { + return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] // + .transformAndConcat[value] // + .transform[instance].toList() + } + + override final readConfigurationData(P path) { + return dataReadRouter.readConfigurationData(path); + } + + override final readOperationalData(P path) { + return dataReadRouter.readOperationalData(path); + } + + override final registerCommitHandler(P path, DataCommitHandler commitHandler) { + val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this); + commitHandlers.put(path, registration) + LOG.info("Registering Commit Handler {} for path: {}",commitHandler,path); + for(listener : commitHandlerRegistrationListeners) { + try { + listener.instance.onRegister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e); + } + } + return registration; + } + + override final def registerDataChangeListener(P path, DCL listener) { + val reg = new DataChangeListenerRegistration(path, listener, this); + listeners.put(path, reg); + val initialConfig = dataReadRouter.readConfigurationData(path); + val initialOperational = dataReadRouter.readOperationalData(path); + val event = createInitialListenerEvent(path,initialConfig,initialOperational); + listener.onDataChanged(event); + return reg; + } + + final def registerDataReader(P path, DataReader reader) { + + val confReg = dataReadRouter.registerConfigurationReader(path, reader); + val dataReg = dataReadRouter.registerOperationalReader(path, reader); + + return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg)); + } + + override registerCommitHandlerListener(RegistrationListener> commitHandlerListener) { + val ret = commitHandlerRegistrationListeners.register(commitHandlerListener); + + return ret; + } + + protected def DataChangeEvent createInitialListenerEvent(P path,D initialConfig,D initialOperational) { + return new InitialDataChangeEventImpl(initialConfig,initialOperational); + + } + + protected final def removeListener(DataChangeListenerRegistration registration) { + listeners.remove(registration.path, registration); + } + + protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) { + commitHandlers.remove(registration.path, registration); + + LOG.info("Removing Commit Handler {} for path: {}",registration.instance,registration.path); + for(listener : commitHandlerRegistrationListeners) { + try { + listener.instance.onUnregister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e); + } + } + } + + protected final def getActiveCommitHandlers() { + return commitHandlers.entries; + } + + protected def /*Iterator>,D>>*/ affectedListenersWithInitialState( + HashSet

paths) { + return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [ + val operationalState = readOperationalData(key) + val configurationState = readConfigurationData(key) + return new ListenerStateCapture(key, value, operationalState, configurationState) + ].toList() + } + + protected def boolean isAffectedBy(P key, Set

paths) { + if (paths.contains(key)) { + return true; + } + for (path : paths) { + if (key.contains(path)) { + return true; + } + } + + return false; + } + + package final def Future> commit(AbstractDataTransaction transaction) { + checkNotNull(transaction); + transaction.changeStatus(TransactionStatus.SUBMITED); + val task = new TwoPhaseCommit(transaction, this); + submittedTransactionsCount.andIncrement; + return executor.submit(task); + } + +} + +@Data +package class ListenerStateCapture

, D, DCL extends DataChangeListener> { + + @Property + P path; + + @Property + Collection> listeners; + + @Property + D initialOperationalState; + + @Property + D initialConfigurationState; +} + +package class DataChangeListenerRegistration

, D, DCL extends DataChangeListener> extends AbstractObjectRegistration implements ListenerRegistration { + + AbstractDataBroker dataBroker; + + @Property + val P path; + + new(P path, DCL instance, AbstractDataBroker broker) { + super(instance) + dataBroker = broker; + _path = path; + } + + override protected removeRegistration() { + dataBroker.removeListener(this); + dataBroker = null; + } + +} + +package class DataCommitHandlerRegistrationImpl

, D> // +extends AbstractObjectRegistration> // +implements DataCommitHandlerRegistration { + + AbstractDataBroker dataBroker; + + @Property + val P path; + + new(P path, DataCommitHandler instance, AbstractDataBroker broker) { + super(instance) + dataBroker = broker; + _path = path; + } + + override protected removeRegistration() { + dataBroker.removeCommitHandler(this); + dataBroker = null; + } +} + +package class TwoPhaseCommit

, D, DCL extends DataChangeListener> implements Callable> { + + private static val log = LoggerFactory.getLogger(TwoPhaseCommit); + + val AbstractDataTransaction transaction; + val AbstractDataBroker dataBroker; + + new(AbstractDataTransaction transaction, AbstractDataBroker broker) { + this.transaction = transaction; + this.dataBroker = broker; + } + + override call() throws Exception { + + // get affected paths + val affectedPaths = new HashSet

(); + + affectedPaths.addAll(transaction.createdConfigurationData.keySet); + affectedPaths.addAll(transaction.updatedConfigurationData.keySet); + affectedPaths.addAll(transaction.removedConfigurationData); + + affectedPaths.addAll(transaction.createdOperationalData.keySet); + affectedPaths.addAll(transaction.updatedOperationalData.keySet); + affectedPaths.addAll(transaction.removedOperationalData); + + val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths); + + val transactionId = transaction.identifier; + + log.info("Transaction: {} Started.",transactionId); + // requesting commits + val Iterable> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths); + val List> handlerTransactions = new ArrayList(); + try { + for (handler : commitHandlers) { + handlerTransactions.add(handler.requestCommit(transaction)); + } + } catch (Exception e) { + log.error("Transaction: {} Request Commit failed", transactionId,e); + dataBroker.failedTransactionsCount.andIncrement + return rollback(handlerTransactions, e); + } + val List> results = new ArrayList(); + try { + for (subtransaction : handlerTransactions) { + results.add(subtransaction.finish()); + } + listeners.publishDataChangeEvent(); + } catch (Exception e) { + log.error("Transaction: {} Finish Commit failed",transactionId, e); + dataBroker.failedTransactionsCount.andIncrement + return rollback(handlerTransactions, e); + } + log.info("Transaction: {} Finished successfully.",transactionId); + dataBroker.finishedTransactionsCount.andIncrement; + return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet()); + + } + + def void publishDataChangeEvent(ImmutableList> listeners) { + for (listenerSet : listeners) { + val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path); + val updatedOperational = dataBroker.readOperationalData(listenerSet.path); + + val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState, + listenerSet.initialOperationalState, updatedOperational, updatedConfiguration); + for (listener : listenerSet.listeners) { + try { + listener.instance.onDataChanged(changeEvent); + + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + def rollback(List> transactions, Exception e) { + for (transaction : transactions) { + transaction.rollback() + } + + // FIXME return encountered error. + return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet()); + } +} + +public abstract class AbstractDataTransaction

, D> extends AbstractDataModification { + + @Property + private val Object identifier; + + var TransactionStatus status; + + var AbstractDataBroker broker; + + protected new(Object identifier,AbstractDataBroker dataBroker) { + super(dataBroker); + _identifier = identifier; + broker = dataBroker; + status = TransactionStatus.NEW; + + //listeners = new ListenerRegistry<>(); + } + + override commit() { + return broker.commit(this); + } + + override readConfigurationData(P path) { + return broker.readConfigurationData(path); + } + + override readOperationalData(P path) { + return broker.readOperationalData(path); + } + + override hashCode() { + return identifier.hashCode; + } + + override equals(Object obj) { + if (this === obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + val other = (obj as AbstractDataTransaction); + if (broker == null) { + if (other.broker != null) + return false; + } else if (!broker.equals(other.broker)) + return false; + if (identifier == null) { + if (other.identifier != null) + return false; + } else if (!identifier.equals(other.identifier)) + return false; + return true; + } + + override TransactionStatus getStatus() { + return status; + } + + protected abstract def void onStatusChange(TransactionStatus status); + + public def changeStatus(TransactionStatus status) { + this.status = status; + onStatusChange(status); + } + +} diff --git a/opendaylight/md-sal/sal-dom-api/src/main/yang/opendaylight-md-sal-common.yang b/opendaylight/md-sal/sal-dom-api/src/main/yang/opendaylight-md-sal-common.yang index 004574defa..d963da1fc7 100644 --- a/opendaylight/md-sal/sal-dom-api/src/main/yang/opendaylight-md-sal-common.yang +++ b/opendaylight/md-sal/sal-dom-api/src/main/yang/opendaylight-md-sal-common.yang @@ -1,68 +1,71 @@ -module opendaylight-md-sal-common { - yang-version 1; - namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:common"; - prefix "md-sal-common"; - - description - "Common definition for MD-SAL."; - - revision "2013-10-28" { - description - "Initial revision"; - } - - grouping rpc-routing-table { - - leaf routing-context { - type string; - } - list routes { - leaf path { - type string; - } - leaf destination { - type string; - } - } - - } - - grouping rpc-router { - leaf module { - type string; - } - container routing-tables { - list routing-table { - uses rpc-routing-table; - } - } - } - - grouping rpc-state { - list rpc-router { - uses rpc-router; - } - } - - grouping notification-state { - container notifications { - leaf published { - type uint32; - } - } - } - - grouping data-state { - container transactions { - leaf created { - type uint32; - } - leaf successful { - type uint32; - } - leaf failed { - type uint32; - } - } - } +module opendaylight-md-sal-common { + yang-version 1; + namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:common"; + prefix "md-sal-common"; + + description + "Common definition for MD-SAL."; + + revision "2013-10-28" { + description + "Initial revision"; + } + + grouping rpc-routing-table { + + leaf routing-context { + type string; + } + list routes { + leaf path { + type string; + } + leaf destination { + type string; + } + } + + } + + grouping rpc-router { + leaf module { + type string; + } + container routing-tables { + list routing-table { + uses rpc-routing-table; + } + } + } + + grouping rpc-state { + list rpc-router { + uses rpc-router; + } + } + + grouping notification-state { + container notifications { + leaf published { + type uint32; + } + } + } + + grouping data-state { + container transactions { + leaf created { + type uint32; + } + leaf submitted { + type uint32; + } + leaf successful { + type uint32; + } + leaf failed { + type uint32; + } + } + } } \ No newline at end of file -- 2.36.6