-/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.common.impl.service;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher;
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
-import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory;
-import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
-import org.opendaylight.controller.md.sal.common.api.data.DataReader;
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
-import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.Path;
-import org.opendaylight.yangtools.concepts.Registration;
-import org.opendaylight.yangtools.util.ListenerRegistry;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Supplier;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
-import com.google.common.util.concurrent.MoreExecutors;
-
-@Deprecated
-public abstract class AbstractDataBroker<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>>
- implements DataModificationTransactionFactory<P, D>, DataReader<P, D>, DataChangePublisher<P, D, DCL>,
- DataProvisionService<P, D> {
- private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class);
-
- private ExecutorService executor;
-
- public ExecutorService getExecutor() {
- return this.executor;
- }
-
- public void setExecutor(final ExecutorService executor) {
- this.executor = executor;
- }
-
- private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor();
-
- public ExecutorService getNotificationExecutor() {
- return this.notificationExecutor;
- }
-
- public void setNotificationExecutor(final ExecutorService notificationExecutor) {
- this.notificationExecutor = notificationExecutor;
- }
-
- private AbstractDataReadRouter<P, D> dataReadRouter;
-
- private final AtomicLong submittedTransactionsCount = new AtomicLong();
-
- private final AtomicLong failedTransactionsCount = new AtomicLong();
-
- private final AtomicLong finishedTransactionsCount = new AtomicLong();
-
- public AbstractDataReadRouter<P, D> getDataReadRouter() {
- return this.dataReadRouter;
- }
-
- public void setDataReadRouter(final AbstractDataReadRouter<P, D> dataReadRouter) {
- this.dataReadRouter = dataReadRouter;
- }
-
- public AtomicLong getSubmittedTransactionsCount() {
- return this.submittedTransactionsCount;
- }
-
- public AtomicLong getFailedTransactionsCount() {
- return this.failedTransactionsCount;
- }
-
- public AtomicLong getFinishedTransactionsCount() {
- return this.finishedTransactionsCount;
- }
-
- private final Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps
- .synchronizedSetMultimap(HashMultimap.<P, DataChangeListenerRegistration<P, D, DCL>> create());
-
- private final Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps
- .synchronizedSetMultimap(HashMultimap.<P, DataCommitHandlerRegistrationImpl<P, D>> create());
-
- private final Lock registrationLock = new ReentrantLock();
-
- private final ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>> commitHandlerRegistrationListeners = new ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>>();
-
- public AbstractDataBroker() {
- }
-
- protected ImmutableList<DataCommitHandler<P, D>> affectedCommitHandlers(final Set<P> paths) {
- final Supplier<ImmutableList<DataCommitHandler<P, D>>> _function = new Supplier<ImmutableList<DataCommitHandler<P, D>>>() {
- @Override
- public ImmutableList<DataCommitHandler<P, D>> get() {
- Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
- Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
- FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
- .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
- final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
- @Override
- public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
- P _key = it.getKey();
- boolean _isAffectedBy = isAffectedBy(_key, paths);
- return _isAffectedBy;
- }
- };
- FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
- .filter(_function);
- final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
- @Override
- public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
- final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
- Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
- return _value;
- }
- };
- FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
- .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
- final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
- @Override
- public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
- DataCommitHandler<P, D> _instance = it.getInstance();
- return _instance;
- }
- };
- FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
- .<DataCommitHandler<P, D>> transform(_function_2);
- return _transform.toList();
- }
- };
- return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
- }
-
- protected ImmutableList<DataCommitHandler<P, D>> probablyAffectedCommitHandlers(final HashSet<P> paths) {
- final Supplier<ImmutableList<DataCommitHandler<P, D>>> _function = new Supplier<ImmutableList<DataCommitHandler<P, D>>>() {
- @Override
- public ImmutableList<DataCommitHandler<P, D>> get() {
- Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
- Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
- FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
- .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
- final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
- @Override
- public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
- P _key = it.getKey();
- boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths);
- return _isProbablyAffectedBy;
- }
- };
- FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
- .filter(_function);
- final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
- @Override
- public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
- final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
- Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
- return _value;
- }
- };
- FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
- .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
- final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
- @Override
- public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
- DataCommitHandler<P, D> _instance = it.getInstance();
- return _instance;
- }
- };
- FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
- .<DataCommitHandler<P, D>> transform(_function_2);
- return _transform.toList();
- }
- };
- return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
- }
-
- protected Map<P, D> deepGetBySubpath(final Map<P, D> dataSet, final P path) {
- return Collections.<P, D> emptyMap();
- }
-
- @Override
- public final D readConfigurationData(final P path) {
- AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
- return _dataReadRouter.readConfigurationData(path);
- }
-
- @Override
- public final D readOperationalData(final P path) {
- AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
- return _dataReadRouter.readOperationalData(path);
- }
-
- private static <T extends Object> T withLock(final Lock lock, final Supplier<T> method) {
- lock.lock();
- try {
- return method.get();
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public final Registration registerCommitHandler(final P path,
- final DataCommitHandler<P, D> commitHandler) {
- synchronized (commitHandler) {
- final DataCommitHandlerRegistrationImpl<P, D> registration = new DataCommitHandlerRegistrationImpl<P, D>(
- path, commitHandler, this);
- commitHandlers.put(path, registration);
- LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path);
- for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
- try {
- listener.getInstance().onRegister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(),
- e);
- }
- }
- return registration;
- }
- }
-
- @Override
- public final ListenerRegistration<DCL> registerDataChangeListener(final P path, final DCL listener) {
- synchronized (listeners) {
- final DataChangeListenerRegistration<P, D, DCL> reg = new DataChangeListenerRegistration<P, D, DCL>(path,
- listener, AbstractDataBroker.this);
- listeners.put(path, reg);
- final D initialConfig = getDataReadRouter().readConfigurationData(path);
- final D initialOperational = getDataReadRouter().readOperationalData(path);
- final DataChangeEvent<P, D> event = createInitialListenerEvent(path, initialConfig, initialOperational);
- listener.onDataChanged(event);
- return reg;
- }
- }
-
- public final CompositeObjectRegistration<DataReader<P, D>> registerDataReader(final P path,
- final DataReader<P, D> reader) {
-
- final Registration confReg = getDataReadRouter().registerConfigurationReader(path, reader);
- final Registration dataReg = getDataReadRouter().registerOperationalReader(path, reader);
- return new CompositeObjectRegistration<DataReader<P, D>>(reader, Arrays.asList(confReg, dataReg));
- }
-
- @Override
- public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> registerCommitHandlerListener(
- final RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
- final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> ret = this.commitHandlerRegistrationListeners
- .register(commitHandlerListener);
- return ret;
- }
-
- protected DataChangeEvent<P, D> createInitialListenerEvent(final P path, final D initialConfig,
- final D initialOperational) {
- InitialDataChangeEventImpl<P, D> _initialDataChangeEventImpl = new InitialDataChangeEventImpl<P, D>(
- initialConfig, initialOperational);
- return _initialDataChangeEventImpl;
- }
-
- protected final void removeListener(final DataChangeListenerRegistration<P, D, DCL> registration) {
- synchronized (listeners) {
- listeners.remove(registration.getPath(), registration);
- }
- }
-
- protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl<P, D> registration) {
- synchronized (commitHandlers) {
-
- commitHandlers.remove(registration.getPath(), registration);
- LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath());
- for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
- try {
- listener.getInstance().onUnregister(registration);
- } catch (Exception e) {
- LOG.error("Unexpected exception in listener {} during invoking onUnregister",
- listener.getInstance(), e);
- }
- }
- }
-
- }
-
- protected final Collection<Entry<P, DataCommitHandlerRegistrationImpl<P, D>>> getActiveCommitHandlers() {
- return commitHandlers.entries();
- }
-
- protected ImmutableList<ListenerStateCapture<P, D, DCL>> affectedListeners(final Set<P> paths) {
-
- synchronized (listeners) {
- return FluentIterable //
- .from(listeners.asMap().entrySet()) //
- .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
- @Override
- public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
- return isAffectedBy(it.getKey(), paths);
- }
- }) //
- .transform(
- new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
- @Override
- public ListenerStateCapture<P, D, DCL> apply(
- final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
- return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
- createContainsPredicate(it.getKey()));
- }
- }) //
- .toList();
- }
- }
-
- protected ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners(final Set<P> paths) {
- synchronized (listeners) {
- return FluentIterable //
- .from(listeners.asMap().entrySet()) //
- .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
- @Override
- public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
- return isProbablyAffectedBy(it.getKey(), paths);
- }
- }) //
- .transform(
- new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
- @Override
- public ListenerStateCapture<P, D, DCL> apply(
- final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
- return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
- createIsContainedPredicate(it.getKey()));
- }
- }) //
- .toList();
- }
- }
-
- protected Predicate<P> createContainsPredicate(final P key) {
- return new Predicate<P>() {
- @Override
- public boolean apply(final P other) {
- return key.contains(other);
- }
- };
- }
-
- protected Predicate<P> createIsContainedPredicate(final P key) {
- return new Predicate<P>() {
- @Override
- public boolean apply(final P other) {
- return other.contains(key);
- }
- };
- }
-
- protected boolean isAffectedBy(final P key, final Set<P> paths) {
- final Predicate<P> contains = this.createContainsPredicate(key);
- if (paths.contains(key)) {
- return true;
- }
- for (final P path : paths) {
- if (contains.apply(path)) {
- return true;
- }
- }
- return false;
- }
-
- protected boolean isProbablyAffectedBy(final P key, final Set<P> paths) {
- final Predicate<P> isContained = this.createIsContainedPredicate(key);
- for (final P path : paths) {
- if (isContained.apply(path)) {
- return true;
- }
- }
- return false;
- }
-
- final Future<RpcResult<TransactionStatus>> commit(final AbstractDataTransaction<P, D> transaction) {
- Preconditions.checkNotNull(transaction);
- final TwoPhaseCommit<P, D, DCL> task = new TwoPhaseCommit<P, D, DCL>(transaction, this);
-
- this.getSubmittedTransactionsCount().getAndIncrement();
- return this.getExecutor().submit(task);
- }
-
- private static class DataCommitHandlerRegistrationImpl<P extends Path<P>, D extends Object> //
- extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
- implements DataCommitHandlerRegistration<P, D> {
-
- private AbstractDataBroker<P, D, ? extends Object> dataBroker;
- private final P path;
-
- @Override
- public P getPath() {
- return this.path;
- }
-
- public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler<P, D> instance,
- final AbstractDataBroker<P, D, ? extends Object> broker) {
- super(instance);
- this.dataBroker = broker;
- this.path = path;
- }
-
- @Override
- protected void removeRegistration() {
- this.dataBroker.removeCommitHandler(this);
- this.dataBroker = null;
- }
- }
-}