/* * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.md.sal.common.impl.service import com.google.common.collect.FluentIterable import com.google.common.collect.HashMultimap import com.google.common.collect.ImmutableList import com.google.common.collect.Multimap import java.util.ArrayList import java.util.Arrays import java.util.Collection import java.util.Collections import java.util.HashSet import java.util.List import java.util.Set import java.util.concurrent.Callable import java.util.concurrent.ExecutorService import java.util.concurrent.Future import java.util.concurrent.atomic.AtomicLong import org.opendaylight.controller.md.sal.common.api.RegistrationListener import org.opendaylight.controller.md.sal.common.api.TransactionStatus import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService import org.opendaylight.controller.md.sal.common.api.data.DataReader import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter import org.opendaylight.controller.sal.common.util.Rpcs import org.opendaylight.yangtools.concepts.AbstractObjectRegistration import org.opendaylight.yangtools.concepts.CompositeObjectRegistration import org.opendaylight.yangtools.concepts.ListenerRegistration import org.opendaylight.yangtools.concepts.Path import org.opendaylight.yangtools.concepts.util.ListenerRegistry import org.opendaylight.yangtools.yang.common.RpcResult import org.slf4j.LoggerFactory import static com.google.common.base.Preconditions.* import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent abstract class AbstractDataBroker
, D, DCL extends DataChangeListener
> implements DataModificationTransactionFactory
, // DataReader
, // DataChangePublisher
, // DataProvisionService
{ private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); @Property var ExecutorService executor; @Property var AbstractDataReadRouter
dataReadRouter; @Property private val AtomicLong submittedTransactionsCount = new AtomicLong; @Property private val AtomicLong failedTransactionsCount = new AtomicLong @Property private val AtomicLong finishedTransactionsCount = new AtomicLong Multimap
> listeners = HashMultimap.create(); Multimap
> commitHandlers = HashMultimap.create();
val ListenerRegistry paths) {
return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //
.transformAndConcat[value] //
.transform[instance].toList()
}
override final readConfigurationData(P path) {
return dataReadRouter.readConfigurationData(path);
}
override final readOperationalData(P path) {
return dataReadRouter.readOperationalData(path);
}
override final registerCommitHandler(P path, DataCommitHandler commitHandler) {
val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);
commitHandlers.put(path, registration)
LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);
for(listener : commitHandlerRegistrationListeners) {
try {
listener.instance.onRegister(registration);
} catch (Exception e) {
LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);
}
}
return registration;
}
override final def registerDataChangeListener(P path, DCL listener) {
val reg = new DataChangeListenerRegistration(path, listener, this);
listeners.put(path, reg);
val initialConfig = dataReadRouter.readConfigurationData(path);
val initialOperational = dataReadRouter.readOperationalData(path);
val event = createInitialListenerEvent(path,initialConfig,initialOperational);
listener.onDataChanged(event);
return reg;
}
final def registerDataReader(P path, DataReader reader) {
val confReg = dataReadRouter.registerConfigurationReader(path, reader);
val dataReg = dataReadRouter.registerOperationalReader(path, reader);
return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
}
override registerCommitHandlerListener(RegistrationListener createInitialListenerEvent(P path,D initialConfig,D initialOperational) {
return new InitialDataChangeEventImpl (initialConfig,initialOperational);
}
protected final def removeListener(DataChangeListenerRegistration registration) {
listeners.remove(registration.path, registration);
}
protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) {
commitHandlers.remove(registration.path, registration);
LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);
for(listener : commitHandlerRegistrationListeners) {
try {
listener.instance.onUnregister(registration);
} catch (Exception e) {
LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);
}
}
}
protected final def getActiveCommitHandlers() {
return commitHandlers.entries;
}
protected def /*Iterator paths) {
return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [
val operationalState = readOperationalData(key)
val configurationState = readConfigurationData(key)
return new ListenerStateCapture(key, value, operationalState, configurationState)
].toList()
}
protected def boolean isAffectedBy(P key, Set paths) {
if (paths.contains(key)) {
return true;
}
for (path : paths) {
if (key.contains(path)) {
return true;
}
}
return false;
}
package final def Future transaction) {
checkNotNull(transaction);
transaction.changeStatus(TransactionStatus.SUBMITED);
val task = new TwoPhaseCommit(transaction, this);
submittedTransactionsCount.andIncrement;
return executor.submit(task);
}
}
@Data
package class ListenerStateCapture , D, DCL extends DataChangeListener > {
@Property
P path;
@Property
Collection , D, DCL extends DataChangeListener > extends AbstractObjectRegistration dataBroker;
@Property
val P path;
new(P path, DCL instance, AbstractDataBroker broker) {
super(instance)
dataBroker = broker;
_path = path;
}
override protected removeRegistration() {
dataBroker.removeListener(this);
dataBroker = null;
}
}
package class DataCommitHandlerRegistrationImpl , D> //
extends AbstractObjectRegistration {
AbstractDataBroker dataBroker;
@Property
val P path;
new(P path, DataCommitHandler instance, AbstractDataBroker broker) {
super(instance)
dataBroker = broker;
_path = path;
}
override protected removeRegistration() {
dataBroker.removeCommitHandler(this);
dataBroker = null;
}
}
package class TwoPhaseCommit , D, DCL extends DataChangeListener > implements Callable transaction;
val AbstractDataBroker dataBroker;
new(AbstractDataTransaction transaction, AbstractDataBroker broker) {
this.transaction = transaction;
this.dataBroker = broker;
}
override call() throws Exception {
// get affected paths
val affectedPaths = new HashSet ();
affectedPaths.addAll(transaction.createdConfigurationData.keySet);
affectedPaths.addAll(transaction.updatedConfigurationData.keySet);
affectedPaths.addAll(transaction.removedConfigurationData);
affectedPaths.addAll(transaction.createdOperationalData.keySet);
affectedPaths.addAll(transaction.updatedOperationalData.keySet);
affectedPaths.addAll(transaction.removedOperationalData);
val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);
val transactionId = transaction.identifier;
log.trace("Transaction: {} Started.",transactionId);
// requesting commits
val Iterable , D> extends AbstractDataModification {
private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
@Property
private val Object identifier;
var TransactionStatus status;
var AbstractDataBroker broker;
protected new(Object identifier,AbstractDataBroker dataBroker) {
super(dataBroker);
_identifier = identifier;
broker = dataBroker;
status = TransactionStatus.NEW;
LOG.debug("Transaction {} Allocated.", identifier);
//listeners = new ListenerRegistry<>();
}
override commit() {
return broker.commit(this);
}
override readConfigurationData(P path) {
val local = this.updatedConfigurationData.get(path);
if(local != null) {
return local;
}
return broker.readConfigurationData(path);
}
override readOperationalData(P path) {
val local = this.updatedOperationalData.get(path);
if(local != null) {
return local;
}
return broker.readOperationalData(path);
}
override hashCode() {
return identifier.hashCode;
}
override equals(Object obj) {
if (this === obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
val other = (obj as AbstractDataTransaction );
if (broker == null) {
if (other.broker != null)
return false;
} else if (!broker.equals(other.broker))
return false;
if (identifier == null) {
if (other.identifier != null)
return false;
} else if (!identifier.equals(other.identifier))
return false;
return true;
}
override TransactionStatus getStatus() {
return status;
}
protected abstract def void onStatusChange(TransactionStatus status);
public def changeStatus(TransactionStatus status) {
LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
this.status = status;
onStatusChange(status);
}
}