/** * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.md.sal.common.impl.service; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.opendaylight.controller.md.sal.common.api.RegistrationListener; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration; import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory; import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService; import org.opendaylight.controller.md.sal.common.api.data.DataReader; import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter; import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; import org.opendaylight.yangtools.concepts.CompositeObjectRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.Path; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.concepts.util.ListenerRegistry; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.FluentIterable; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.MoreExecutors; public abstract class AbstractDataBroker

, D extends Object, DCL extends DataChangeListener> implements DataModificationTransactionFactory, DataReader, DataChangePublisher, DataProvisionService { private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class); private ExecutorService executor; public ExecutorService getExecutor() { return this.executor; } public void setExecutor(final ExecutorService executor) { this.executor = executor; } private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor(); public ExecutorService getNotificationExecutor() { return this.notificationExecutor; } public void setNotificationExecutor(final ExecutorService notificationExecutor) { this.notificationExecutor = notificationExecutor; } private AbstractDataReadRouter dataReadRouter; private final AtomicLong submittedTransactionsCount = new AtomicLong(); private final AtomicLong failedTransactionsCount = new AtomicLong(); private final AtomicLong finishedTransactionsCount = new AtomicLong(); public AbstractDataReadRouter getDataReadRouter() { return this.dataReadRouter; } public void setDataReadRouter(final AbstractDataReadRouter dataReadRouter) { this.dataReadRouter = dataReadRouter; } public AtomicLong getSubmittedTransactionsCount() { return this.submittedTransactionsCount; } public AtomicLong getFailedTransactionsCount() { return this.failedTransactionsCount; } public AtomicLong getFinishedTransactionsCount() { return this.finishedTransactionsCount; } private final Multimap> listeners = Multimaps .synchronizedSetMultimap(HashMultimap.> create()); private final Multimap> commitHandlers = Multimaps .synchronizedSetMultimap(HashMultimap.> create()); private final Lock registrationLock = new ReentrantLock(); private final ListenerRegistry>> commitHandlerRegistrationListeners = new ListenerRegistry>>(); public AbstractDataBroker() { } protected ImmutableList> affectedCommitHandlers(final Set

paths) { final Supplier>> _function = new Supplier>>() { @Override public ImmutableList> get() { Map>> _asMap = commitHandlers.asMap(); Set>>> _entrySet = _asMap.entrySet(); FluentIterable>>> _from = FluentIterable .>>> from(_entrySet); final Predicate>>> _function = new Predicate>>>() { @Override public boolean apply(final Entry>> it) { P _key = it.getKey(); boolean _isAffectedBy = isAffectedBy(_key, paths); return _isAffectedBy; } }; FluentIterable>>> _filter = _from .filter(_function); final Function>>, Collection>> _function_1 = new Function>>, Collection>>() { @Override public Collection> apply( final Entry>> it) { Collection> _value = it.getValue(); return _value; } }; FluentIterable> _transformAndConcat = _filter .> transformAndConcat(_function_1); final Function, DataCommitHandler> _function_2 = new Function, DataCommitHandler>() { @Override public DataCommitHandler apply(final DataCommitHandlerRegistrationImpl it) { DataCommitHandler _instance = it.getInstance(); return _instance; } }; FluentIterable> _transform = _transformAndConcat .> transform(_function_2); return _transform.toList(); } }; return AbstractDataBroker.>> withLock(this.registrationLock, _function); } protected ImmutableList> probablyAffectedCommitHandlers(final HashSet

paths) { final Supplier>> _function = new Supplier>>() { @Override public ImmutableList> get() { Map>> _asMap = commitHandlers.asMap(); Set>>> _entrySet = _asMap.entrySet(); FluentIterable>>> _from = FluentIterable .>>> from(_entrySet); final Predicate>>> _function = new Predicate>>>() { @Override public boolean apply(final Entry>> it) { P _key = it.getKey(); boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths); return _isProbablyAffectedBy; } }; FluentIterable>>> _filter = _from .filter(_function); final Function>>, Collection>> _function_1 = new Function>>, Collection>>() { @Override public Collection> apply( final Entry>> it) { Collection> _value = it.getValue(); return _value; } }; FluentIterable> _transformAndConcat = _filter .> transformAndConcat(_function_1); final Function, DataCommitHandler> _function_2 = new Function, DataCommitHandler>() { @Override public DataCommitHandler apply(final DataCommitHandlerRegistrationImpl it) { DataCommitHandler _instance = it.getInstance(); return _instance; } }; FluentIterable> _transform = _transformAndConcat .> transform(_function_2); return _transform.toList(); } }; return AbstractDataBroker.>> withLock(this.registrationLock, _function); } protected Map deepGetBySubpath(final Map dataSet, final P path) { return Collections. emptyMap(); } @Override public final D readConfigurationData(final P path) { AbstractDataReadRouter _dataReadRouter = this.getDataReadRouter(); return _dataReadRouter.readConfigurationData(path); } @Override public final D readOperationalData(final P path) { AbstractDataReadRouter _dataReadRouter = this.getDataReadRouter(); return _dataReadRouter.readOperationalData(path); } private static T withLock(final Lock lock, final Supplier method) { lock.lock(); try { return method.get(); } finally { lock.unlock(); } } @Override public final Registration> registerCommitHandler(final P path, final DataCommitHandler commitHandler) { synchronized (commitHandler) { final DataCommitHandlerRegistrationImpl registration = new DataCommitHandlerRegistrationImpl( path, commitHandler, this); commitHandlers.put(path, registration); LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path); for (final ListenerRegistration>> listener : commitHandlerRegistrationListeners) { try { listener.getInstance().onRegister(registration); } catch (Exception e) { LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(), e); } } return registration; } } @Override public final ListenerRegistration registerDataChangeListener(final P path, final DCL listener) { synchronized (listeners) { final DataChangeListenerRegistration reg = new DataChangeListenerRegistration(path, listener, AbstractDataBroker.this); listeners.put(path, reg); final D initialConfig = getDataReadRouter().readConfigurationData(path); final D initialOperational = getDataReadRouter().readOperationalData(path); final DataChangeEvent event = createInitialListenerEvent(path, initialConfig, initialOperational); listener.onDataChanged(event); return reg; } } public final CompositeObjectRegistration> registerDataReader(final P path, final DataReader reader) { final Registration> confReg = getDataReadRouter().registerConfigurationReader(path, reader); final Registration> dataReg = getDataReadRouter().registerOperationalReader(path, reader); return new CompositeObjectRegistration>(reader, Arrays.asList(confReg, dataReg)); } @Override public ListenerRegistration>> registerCommitHandlerListener( final RegistrationListener> commitHandlerListener) { final ListenerRegistration>> ret = this.commitHandlerRegistrationListeners .register(commitHandlerListener); return ret; } protected DataChangeEvent createInitialListenerEvent(final P path, final D initialConfig, final D initialOperational) { InitialDataChangeEventImpl _initialDataChangeEventImpl = new InitialDataChangeEventImpl( initialConfig, initialOperational); return _initialDataChangeEventImpl; } protected final void removeListener(final DataChangeListenerRegistration registration) { synchronized (listeners) { listeners.remove(registration.getPath(), registration); } } protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl registration) { synchronized (commitHandlers) { commitHandlers.remove(registration.getPath(), registration); LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath()); for (final ListenerRegistration>> listener : commitHandlerRegistrationListeners) { try { listener.getInstance().onUnregister(registration); } catch (Exception e) { LOG.error("Unexpected exception in listener {} during invoking onUnregister", listener.getInstance(), e); } } } } protected final Collection>> getActiveCommitHandlers() { return commitHandlers.entries(); } protected ImmutableList> affectedListeners(final Set

paths) { synchronized (listeners) { return FluentIterable // .from(listeners.asMap().entrySet()) // .filter(new Predicate>>>() { @Override public boolean apply(final Entry>> it) { return isAffectedBy(it.getKey(), paths); } }) // .transform( new Function>>, ListenerStateCapture>() { @Override public ListenerStateCapture apply( final Entry>> it) { return new ListenerStateCapture(it.getKey(), it.getValue(), createContainsPredicate(it.getKey())); } }) // .toList(); } } protected ImmutableList> probablyAffectedListeners(final Set

paths) { synchronized (listeners) { return FluentIterable // .from(listeners.asMap().entrySet()) // .filter(new Predicate>>>() { @Override public boolean apply(final Entry>> it) { return isProbablyAffectedBy(it.getKey(), paths); } }) // .transform( new Function>>, ListenerStateCapture>() { @Override public ListenerStateCapture apply( final Entry>> it) { return new ListenerStateCapture(it.getKey(), it.getValue(), createIsContainedPredicate(it.getKey())); } }) // .toList(); } } protected Predicate

createContainsPredicate(final P key) { return new Predicate

() { @Override public boolean apply(final P other) { return key.contains(other); } }; } protected Predicate

createIsContainedPredicate(final P key) { return new Predicate

() { @Override public boolean apply(final P other) { return other.contains(key); } }; } protected boolean isAffectedBy(final P key, final Set

paths) { final Predicate

contains = this.createContainsPredicate(key); if (paths.contains(key)) { return true; } for (final P path : paths) { if (contains.apply(path)) { return true; } } return false; } protected boolean isProbablyAffectedBy(final P key, final Set

paths) { final Predicate

isContained = this.createIsContainedPredicate(key); for (final P path : paths) { if (isContained.apply(path)) { return true; } } return false; } final Future> commit(final AbstractDataTransaction transaction) { Preconditions.checkNotNull(transaction); final TwoPhaseCommit task = new TwoPhaseCommit(transaction, this); this.getSubmittedTransactionsCount().getAndIncrement(); return this.getExecutor().submit(task); } private static class DataCommitHandlerRegistrationImpl

, D extends Object> // extends AbstractObjectRegistration> // implements DataCommitHandlerRegistration { private AbstractDataBroker dataBroker; private final P path; @Override public P getPath() { return this.path; } public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler instance, final AbstractDataBroker broker) { super(instance); this.dataBroker = broker; this.path = path; } @Override protected void removeRegistration() { this.dataBroker.removeCommitHandler(this); this.dataBroker = null; } } }