/** * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.md.sal.common.impl.service; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.opendaylight.controller.md.sal.common.api.RegistrationListener; import org.opendaylight.controller.md.sal.common.api.TransactionStatus; import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener; import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration; import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory; import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService; import org.opendaylight.controller.md.sal.common.api.data.DataReader; import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter; import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; import org.opendaylight.yangtools.concepts.CompositeObjectRegistration; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.Path; import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.util.ListenerRegistry; import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Supplier; import com.google.common.collect.FluentIterable; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.MoreExecutors; public abstract class AbstractDataBroker
, D extends Object, DCL extends DataChangeListener
> implements DataModificationTransactionFactory
, DataReader
, DataChangePublisher
, DataProvisionService
{ private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class); private ExecutorService executor; public ExecutorService getExecutor() { return this.executor; } public void setExecutor(final ExecutorService executor) { this.executor = executor; } private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor(); public ExecutorService getNotificationExecutor() { return this.notificationExecutor; } public void setNotificationExecutor(final ExecutorService notificationExecutor) { this.notificationExecutor = notificationExecutor; } private AbstractDataReadRouter
dataReadRouter; private final AtomicLong submittedTransactionsCount = new AtomicLong(); private final AtomicLong failedTransactionsCount = new AtomicLong(); private final AtomicLong finishedTransactionsCount = new AtomicLong(); public AbstractDataReadRouter
getDataReadRouter() { return this.dataReadRouter; } public void setDataReadRouter(final AbstractDataReadRouter
dataReadRouter) { this.dataReadRouter = dataReadRouter; } public AtomicLong getSubmittedTransactionsCount() { return this.submittedTransactionsCount; } public AtomicLong getFailedTransactionsCount() { return this.failedTransactionsCount; } public AtomicLong getFinishedTransactionsCount() { return this.finishedTransactionsCount; } private final Multimap
> listeners = Multimaps .synchronizedSetMultimap(HashMultimap.
> create()); private final Multimap
> commitHandlers = Multimaps .synchronizedSetMultimap(HashMultimap.
> create());
private final Lock registrationLock = new ReentrantLock();
private final ListenerRegistry paths) {
final Supplier >> _asMap = commitHandlers.asMap();
Set >> it) {
P _key = it.getKey();
boolean _isAffectedBy = isAffectedBy(_key, paths);
return _isAffectedBy;
}
};
FluentIterable >> it) {
Collection > _function_2 = new Function >() {
@Override
public DataCommitHandler apply(final DataCommitHandlerRegistrationImpl it) {
DataCommitHandler _instance = it.getInstance();
return _instance;
}
};
FluentIterable paths) {
final Supplier >> _asMap = commitHandlers.asMap();
Set >> it) {
P _key = it.getKey();
boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths);
return _isProbablyAffectedBy;
}
};
FluentIterable >> it) {
Collection > _function_2 = new Function >() {
@Override
public DataCommitHandler apply(final DataCommitHandlerRegistrationImpl it) {
DataCommitHandler _instance = it.getInstance();
return _instance;
}
};
FluentIterable deepGetBySubpath(final Map dataSet, final P path) {
return Collections. emptyMap();
}
@Override
public final D readConfigurationData(final P path) {
AbstractDataReadRouter _dataReadRouter = this.getDataReadRouter();
return _dataReadRouter.readConfigurationData(path);
}
@Override
public final D readOperationalData(final P path) {
AbstractDataReadRouter _dataReadRouter = this.getDataReadRouter();
return _dataReadRouter.readOperationalData(path);
}
private static commitHandler) {
synchronized (commitHandler) {
final DataCommitHandlerRegistrationImpl registration = new DataCommitHandlerRegistrationImpl (
path, commitHandler, this);
commitHandlers.put(path, registration);
LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path);
for (final ListenerRegistration reg = new DataChangeListenerRegistration (path,
listener, AbstractDataBroker.this);
listeners.put(path, reg);
final D initialConfig = getDataReadRouter().readConfigurationData(path);
final D initialOperational = getDataReadRouter().readOperationalData(path);
final DataChangeEvent event = createInitialListenerEvent(path, initialConfig, initialOperational);
listener.onDataChanged(event);
return reg;
}
}
public final CompositeObjectRegistration reader) {
final Registration confReg = getDataReadRouter().registerConfigurationReader(path, reader);
final Registration dataReg = getDataReadRouter().registerOperationalReader(path, reader);
return new CompositeObjectRegistration createInitialListenerEvent(final P path, final D initialConfig,
final D initialOperational) {
InitialDataChangeEventImpl _initialDataChangeEventImpl = new InitialDataChangeEventImpl (
initialConfig, initialOperational);
return _initialDataChangeEventImpl;
}
protected final void removeListener(final DataChangeListenerRegistration registration) {
synchronized (listeners) {
listeners.remove(registration.getPath(), registration);
}
}
protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl registration) {
synchronized (commitHandlers) {
commitHandlers.remove(registration.getPath(), registration);
LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath());
for (final ListenerRegistration paths) {
synchronized (listeners) {
return FluentIterable //
.from(listeners.asMap().entrySet()) //
.filter(new Predicate >> it) {
return isAffectedBy(it.getKey(), paths);
}
}) //
.transform(
new Function >() {
@Override
public ListenerStateCapture apply(
final Entry >> it) {
return new ListenerStateCapture (it.getKey(), it.getValue(),
createContainsPredicate(it.getKey()));
}
}) //
.toList();
}
}
protected ImmutableList paths) {
synchronized (listeners) {
return FluentIterable //
.from(listeners.asMap().entrySet()) //
.filter(new Predicate >> it) {
return isProbablyAffectedBy(it.getKey(), paths);
}
}) //
.transform(
new Function >() {
@Override
public ListenerStateCapture apply(
final Entry >> it) {
return new ListenerStateCapture (it.getKey(), it.getValue(),
createIsContainedPredicate(it.getKey()));
}
}) //
.toList();
}
}
protected Predicate createContainsPredicate(final P key) {
return new Predicate () {
@Override
public boolean apply(final P other) {
return key.contains(other);
}
};
}
protected Predicate createIsContainedPredicate(final P key) {
return new Predicate () {
@Override
public boolean apply(final P other) {
return other.contains(key);
}
};
}
protected boolean isAffectedBy(final P key, final Set paths) {
final Predicate contains = this.createContainsPredicate(key);
if (paths.contains(key)) {
return true;
}
for (final P path : paths) {
if (contains.apply(path)) {
return true;
}
}
return false;
}
protected boolean isProbablyAffectedBy(final P key, final Set paths) {
final Predicate isContained = this.createIsContainedPredicate(key);
for (final P path : paths) {
if (isContained.apply(path)) {
return true;
}
}
return false;
}
final Future transaction) {
Preconditions.checkNotNull(transaction);
final TwoPhaseCommit task = new TwoPhaseCommit (transaction, this);
this.getSubmittedTransactionsCount().getAndIncrement();
return this.getExecutor().submit(task);
}
private static class DataCommitHandlerRegistrationImpl , D extends Object> //
extends AbstractObjectRegistration {
private AbstractDataBroker dataBroker;
private final P path;
@Override
public P getPath() {
return this.path;
}
public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler instance,
final AbstractDataBroker broker) {
super(instance);
this.dataBroker = broker;
this.path = path;
}
@Override
protected void removeRegistration() {
this.dataBroker.removeCommitHandler(this);
this.dataBroker = null;
}
}
}