*/
package org.opendaylight.controller.config.yang.md.sal.binding.impl;\r
\r
-import java.util.concurrent.ExecutorService;\r
-\r
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;\r
-import org.opendaylight.controller.sal.binding.impl.RootDataBrokerImpl;\r
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingDomConnectorDeployer;\r
-import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentConnector;\r
-import org.opendaylight.controller.sal.binding.impl.forward.DomForwardedDataBrokerImpl;\r
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;\r
-import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;\r
-import org.osgi.framework.BundleContext;\r
-import org.osgi.framework.ServiceReference;\r
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
+import org.opendaylight.controller.sal.binding.impl.RootDataBrokerImpl;
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingDomConnectorDeployer;
+import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentConnector;
+import org.opendaylight.controller.sal.binding.impl.forward.DomForwardedDataBrokerImpl;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
\r
/**\r
*\r
dataBindingBroker = createStandAloneBroker(listeningExecutor);\r
}\r
dataBindingBroker.registerRuntimeBean(getRootRuntimeBeanRegistratorWrapper());\r
-\r
+ dataBindingBroker.setNotificationExecutor(SingletonHolder.getDefaultChangeEventExecutor());\r
return dataBindingBroker;\r
}\r
private BindingIndependentMappingService resolveMappingServiceDependency() {\r
if(getMappingService() != null) {\r
return getMappingServiceDependency();\r
}\r
- \r
+\r
ServiceReference<BindingIndependentMappingService> potentialMappingService = bundleContext.getServiceReference(BindingIndependentMappingService.class);\r
if(potentialMappingService != null) {\r
return bundleContext.getService(potentialMappingService);\r
public static final NotificationInvokerFactory INVOKER_FACTORY = RPC_GENERATOR_IMPL.getInvokerFactory();
private static ListeningExecutorService NOTIFICATION_EXECUTOR = null;
private static ListeningExecutorService COMMIT_EXECUTOR = null;
+ private static ListeningExecutorService CHANGE_EVENT_EXECUTOR = null;
public static synchronized final ListeningExecutorService getDefaultNotificationExecutor() {
if (NOTIFICATION_EXECUTOR == null) {
ExecutorService executor = Executors.newCachedThreadPool(factory);
return MoreExecutors.listeningDecorator(executor);
}
+
+ public static ExecutorService getDefaultChangeEventExecutor() {
+ if (CHANGE_EVENT_EXECUTOR == null) {
+ ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-change-%d").build();
+ /*
+ * FIXME: this used to be newCacheThreadPool(), but MD-SAL does not have transaction
+ * ordering guarantees, which means that using a concurrent threadpool results
+ * in application data being committed in random order, potentially resulting
+ * in inconsistent data being present. Once proper primitives are introduced,
+ * concurrency can be reintroduced.
+ */
+ ExecutorService executor = Executors.newSingleThreadExecutor(factory);
+ CHANGE_EVENT_EXECUTOR = MoreExecutors.listeningDecorator(executor);
+ }
+
+ return CHANGE_EVENT_EXECUTOR;
+ }
}
*/
package org.opendaylight.controller.sal.binding.impl;\r
\r
-import java.util.Set;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.DataRoot;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.util.DataObjectReadingUtil;
import org.opendaylight.yangtools.yang.common.RpcResult;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+import com.google.common.collect.Maps;
\r
\r
public class DataBrokerImpl extends AbstractDataBroker<InstanceIdentifier<? extends DataObject>, DataObject, DataChangeListener> //\r
implements DataProviderService, AutoCloseable {\r
\r
+ private final static class ContainsWildcarded implements Predicate<InstanceIdentifier<? extends DataObject>> {
+
+ private final InstanceIdentifier<? extends DataObject> key;
+
+ public ContainsWildcarded(InstanceIdentifier<? extends DataObject> key) {
+ this.key = key;
+ }
+
+ @Override
+ public boolean apply(InstanceIdentifier<? extends DataObject> input) {
+ return key.containsWildcarded(input);
+ }
+ }
+
+ private final static class IsContainedWildcarded implements Predicate<InstanceIdentifier<? extends DataObject>> {
+
+ private final InstanceIdentifier<? extends DataObject> key;
+
+ public IsContainedWildcarded(InstanceIdentifier<? extends DataObject> key) {
+ this.key = key;
+ }
+
+ @Override
+ public boolean apply(InstanceIdentifier<? extends DataObject> input) {
+ return input.containsWildcarded(key);
+ }
+ }
+
private final AtomicLong nextTransaction = new AtomicLong();\r
private final AtomicLong createdTransactionsCount = new AtomicLong();\r
\r
}
@Override
- protected boolean isAffectedBy(InstanceIdentifier<? extends DataObject> key,
- Set<InstanceIdentifier<? extends DataObject>> paths) {
- if (paths.contains(key)) {
- return true;
- }
- for (InstanceIdentifier<?> path : paths) {
- if (key.containsWildcarded(path)) {
- return true;
+ protected Predicate<InstanceIdentifier<? extends DataObject>> createContainsPredicate(final
+ InstanceIdentifier<? extends DataObject> key) {
+ return new ContainsWildcarded(key);
+ }
+
+ @Override
+ protected Predicate<InstanceIdentifier<? extends DataObject>> createIsContainedPredicate(final
+ InstanceIdentifier<? extends DataObject> key) {
+ return new IsContainedWildcarded(key);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ protected Map<InstanceIdentifier<? extends DataObject>, DataObject> deepGetBySubpath(
+ Map<InstanceIdentifier<? extends DataObject>, DataObject> dataSet,
+ InstanceIdentifier<? extends DataObject> path) {
+ Builder<InstanceIdentifier<? extends DataObject>, DataObject> builder = ImmutableMap.builder();
+ Map<InstanceIdentifier<? extends DataObject>, DataObject> potential = Maps.filterKeys(dataSet, createIsContainedPredicate(path));
+ for(Entry<InstanceIdentifier<? extends DataObject>, DataObject> entry : potential.entrySet()) {
+ try {
+ builder.putAll(DataObjectReadingUtil.readData(entry.getValue(),(InstanceIdentifier)entry.getKey(),path));
+ } catch (Exception e) {
+ // FIXME : Log exception;
}
}
- return false;
- }\r
+ return builder.build();
+
+ }
+\r
}
import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider;
import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter;
-import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
-import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl;
import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener;
import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.opendaylight.yangtools.yang.data.api.Node;
import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode;
+import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService;
+import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private DataProviderService baDataService;
- private ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
- private ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Object, BindingToDomTransaction> domOpenedTransactions = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Object, DomToBindingTransaction> bindingOpenedTransactions = new ConcurrentHashMap<>();
- private BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
- private DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
+ private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler();
+ private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler();
private Registration<DataCommitHandler<InstanceIdentifier<? extends DataObject>, DataObject>> baCommitHandlerRegistration;
// private ListenerRegistration<BindingToDomRpcForwardingManager>
// bindingToDomRpcManager;
- private Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
+ private final Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function<InstanceIdentifier<?>, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() {
@Override
public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(InstanceIdentifier<?> input) {
private class BindingToDomTransaction implements
DataCommitTransaction<InstanceIdentifier<? extends DataObject>, DataObject> {
- private DataModificationTransaction backing;
- private DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
+ private final DataModificationTransaction backing;
+ private final DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification;
public BindingToDomTransaction(DataModificationTransaction backing,
DataModification<InstanceIdentifier<? extends DataObject>, DataObject> modification) {
// FIXME: do registration based on only active commit handlers.
}
+ @Override
public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> requestCommit(
DataModification<org.opendaylight.yangtools.yang.data.api.InstanceIdentifier, CompositeNode> domTransaction) {
Object identifier = domTransaction.getIdentifier();
private final Set<QName> supportedRpcs;
private final WeakReference<Class<? extends RpcService>> rpcServiceType;
- private Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
- private Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
- private WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
+ private final Set<org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration> registrations;
+ private final Map<QName, RpcInvocationStrategy> strategiesByQName = new HashMap<>();
+ private final WeakHashMap<Method, RpcInvocationStrategy> strategiesByMethod = new WeakHashMap<>();
public DomToBindingRpcForwarder(Class<? extends RpcService> service) {
this.rpcServiceType = new WeakReference<Class<? extends RpcService>>(service);
private class DefaultInvocationStrategy extends RpcInvocationStrategy {
@SuppressWarnings("rawtypes")
- private WeakReference<Class> inputClass;
+ private final WeakReference<Class> inputClass;
@SuppressWarnings("rawtypes")
- private WeakReference<Class> outputClass;
+ private final WeakReference<Class> outputClass;
@SuppressWarnings({ "rawtypes", "unchecked" })
public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class<?> outputClass,
super(rpc, targetMethod);
}
+ @Override
public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
@SuppressWarnings("unchecked")
Future<RpcResult<Void>> result = (Future<RpcResult<Void>>) targetMethod.invoke(rpcService);
return Futures.immediateFuture(null);
}
}
-
+
private class NoOutputInvocationStrategy extends RpcInvocationStrategy {
-
+
@SuppressWarnings("rawtypes")
- private WeakReference<Class> inputClass;
+ private final WeakReference<Class> inputClass;
@SuppressWarnings({ "rawtypes", "unchecked" })
- public NoOutputInvocationStrategy(QName rpc, Method targetMethod,
+ public NoOutputInvocationStrategy(QName rpc, Method targetMethod,
Class<? extends DataContainer> inputClass) {
super(rpc,targetMethod);
this.inputClass = new WeakReference(inputClass);
}
-
-
+
+
@Override
public RpcResult<CompositeNode> uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception {
DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput);
public void setDomNotificationService(NotificationPublishService domService) {
this.domNotificationService = domService;
}
-
+
private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener {
- private ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
- private Set<QName> supportedNotifications = new HashSet<>();
-
+ private final ConcurrentMap<QName, WeakReference<Class<? extends Notification>>> notifications = new ConcurrentHashMap<>();
+ private final Set<QName> supportedNotifications = new HashSet<>();
+
@Override
public Set<QName> getSupportedNotifications() {
return Collections.unmodifiableSet(supportedNotifications);
if (potentialClass != null) {
final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass,
notification);
-
+
if (baNotification instanceof Notification) {
baNotifyService.publish((Notification) baNotification);
}
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.sal.binding.test.bugfix;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.Map;
private static final InstanceIdentifier<Nodes> NODES_INSTANCE_ID_BA = InstanceIdentifier.builder(Nodes.class) //
.toInstance();
-
private static final InstanceIdentifier<Node> NODE_INSTANCE_ID_BA = InstanceIdentifier//
.builder(NODES_INSTANCE_ID_BA) //
.child(Node.class, NODE_KEY).toInstance();
-
private static final InstanceIdentifier<SupportedActions> SUPPORTED_ACTIONS_INSTANCE_ID_BA = InstanceIdentifier//
.builder(NODES_INSTANCE_ID_BA) //
.child(Node.class, NODE_KEY) //
.augmentation(FlowCapableNode.class) //
- .child(SupportedActions.class)
- .toInstance();
+ .child(SupportedActions.class).toInstance();
+ private static final InstanceIdentifier<FlowCapableNode> ALL_FLOW_CAPABLE_NODES = InstanceIdentifier //
+ .builder(NODES_INSTANCE_ID_BA) //
+ .child(Node.class) //
+ .augmentation(FlowCapableNode.class) //
+ .build();
private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier NODE_INSTANCE_ID_BI = //
org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() //
.node(Nodes.QNAME) //
.nodeWithKey(Node.QNAME, NODE_KEY_BI) //
.toInstance();
- private static final QName SUPPORTED_ACTIONS_QNAME = QName.create(FlowCapableNode.QNAME, SupportedActions.QNAME.getLocalName());
-
+ private static final QName SUPPORTED_ACTIONS_QNAME = QName.create(FlowCapableNode.QNAME,
+ SupportedActions.QNAME.getLocalName());
private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier SUPPORTED_ACTIONS_INSTANCE_ID_BI = //
- org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() //
- .node(Nodes.QNAME) //
- .nodeWithKey(Node.QNAME, NODE_KEY_BI) //
- .node(SUPPORTED_ACTIONS_QNAME) //
- .toInstance();
-
- private DataChangeEvent<InstanceIdentifier<?>, DataObject> receivedChangeEvent;
-
+ org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() //
+ .node(Nodes.QNAME) //
+ .nodeWithKey(Node.QNAME, NODE_KEY_BI) //
+ .node(SUPPORTED_ACTIONS_QNAME) //
+ .toInstance();
+ private static final InstanceIdentifier<FlowCapableNode> FLOW_AUGMENTATION_PATH = InstanceIdentifier //
+ .builder(NODE_INSTANCE_ID_BA) //
+ .augmentation(FlowCapableNode.class) //
+ .build();
+ private DataChangeEvent<InstanceIdentifier<?>, DataObject> lastReceivedChangeEvent;
/**
* Test for Bug 148
@Test
public void putNodeAndAugmentation() throws Exception {
- baDataService.registerDataChangeListener(NODES_INSTANCE_ID_BA, this);
+ baDataService.registerDataChangeListener(ALL_FLOW_CAPABLE_NODES, this);
+
NodeBuilder nodeBuilder = new NodeBuilder();
nodeBuilder.setId(new NodeId(NODE_ID));
baseTransaction.putOperationalData(NODE_INSTANCE_ID_BA, nodeBuilder.build());
RpcResult<TransactionStatus> result = baseTransaction.commit().get();
assertEquals(TransactionStatus.COMMITED, result.getResult());
- assertNotNull(receivedChangeEvent);
+
Node node = (Node) baDataService.readOperationalData(NODE_INSTANCE_ID_BA);
assertNotNull(node);
assertEquals(NODE_KEY, node.getKey());
fnub.setDescription("Description Foo");
fnub.setSoftware("JUnit emulated");
FlowCapableNode fnu = fnub.build();
- InstanceIdentifier<FlowCapableNode> augmentIdentifier = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance();
+ InstanceIdentifier<FlowCapableNode> augmentIdentifier = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA)
+ .augmentation(FlowCapableNode.class).toInstance();
DataModificationTransaction augmentedTransaction = baDataService.beginTransaction();
augmentedTransaction.putOperationalData(augmentIdentifier, fnu);
result = augmentedTransaction.commit().get();
assertEquals(TransactionStatus.COMMITED, result.getResult());
+ assertNotNull(lastReceivedChangeEvent);
+ assertTrue(lastReceivedChangeEvent.getCreatedOperationalData().containsKey(FLOW_AUGMENTATION_PATH));
Node augmentedNode = (Node) baDataService.readOperationalData(NODE_INSTANCE_ID_BA);
assertNotNull(node);
assertEquals(fnu.getDescription(), readedAugmentation.getDescription());
assertBindingIndependentVersion(NODE_INSTANCE_ID_BI);
testNodeRemove();
+ assertTrue(lastReceivedChangeEvent.getRemovedOperationalData().contains(FLOW_AUGMENTATION_PATH));
}
@Test
public void putNodeWithAugmentation() throws Exception {
+ baDataService.registerDataChangeListener(ALL_FLOW_CAPABLE_NODES, this);
+
NodeBuilder nodeBuilder = new NodeBuilder();
nodeBuilder.setId(new NodeId(NODE_ID));
nodeBuilder.setKey(NODE_KEY);
DataModificationTransaction baseTransaction = baDataService.beginTransaction();
baseTransaction.putOperationalData(NODE_INSTANCE_ID_BA, nodeBuilder.build());
RpcResult<TransactionStatus> result = baseTransaction.commit().get();
+
+ assertNotNull(lastReceivedChangeEvent);
+ assertTrue(lastReceivedChangeEvent.getCreatedOperationalData().containsKey(FLOW_AUGMENTATION_PATH));
+ lastReceivedChangeEvent = null;
assertEquals(TransactionStatus.COMMITED, result.getResult());
- FlowCapableNode readedAugmentation = (FlowCapableNode) baDataService.readOperationalData(InstanceIdentifier.builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance());
+ FlowCapableNode readedAugmentation = (FlowCapableNode) baDataService.readOperationalData(InstanceIdentifier
+ .builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance());
assertNotNull(readedAugmentation);
+
assertEquals(fnu.getHardware(), readedAugmentation.getHardware());
testPutNodeConnectorWithAugmentation();
+ lastReceivedChangeEvent = null;
testNodeRemove();
- }
+ assertTrue(lastReceivedChangeEvent.getRemovedOperationalData().contains(FLOW_AUGMENTATION_PATH));
+ }
private void testPutNodeConnectorWithAugmentation() throws Exception {
NodeConnectorKey ncKey = new NodeConnectorKey(new NodeConnectorId("test:0:0"));
InstanceIdentifier<NodeConnector> ncPath = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA)
- .child(NodeConnector.class, ncKey).toInstance();
+ .child(NodeConnector.class, ncKey).toInstance();
InstanceIdentifier<FlowCapableNodeConnector> ncAugmentPath = InstanceIdentifier.builder(ncPath)
- .augmentation(FlowCapableNodeConnector.class).toInstance();
+ .augmentation(FlowCapableNodeConnector.class).toInstance();
NodeConnectorBuilder nc = new NodeConnectorBuilder();
nc.setKey(ncKey);
RpcResult<TransactionStatus> result = baseTransaction.commit().get();
assertEquals(TransactionStatus.COMMITED, result.getResult());
- FlowCapableNodeConnector readedAugmentation = (FlowCapableNodeConnector) baDataService.readOperationalData(ncAugmentPath);
+ FlowCapableNodeConnector readedAugmentation = (FlowCapableNodeConnector) baDataService
+ .readOperationalData(ncAugmentPath);
assertNotNull(readedAugmentation);
assertEquals(fncb.getName(), readedAugmentation.getName());
}
assertNull(node);
}
- private void verifyNodes(Nodes nodes,Node original) {
+ private void verifyNodes(Nodes nodes, Node original) {
assertNotNull(nodes);
assertNotNull(nodes.getNode());
assertEquals(1, nodes.getNode().size());
}
- private void assertBindingIndependentVersion(
- org.opendaylight.yangtools.yang.data.api.InstanceIdentifier nodeId) {
+ private void assertBindingIndependentVersion(org.opendaylight.yangtools.yang.data.api.InstanceIdentifier nodeId) {
CompositeNode node = biDataService.readOperationalData(nodeId);
assertNotNull(node);
}
@Override
public void onDataChanged(DataChangeEvent<InstanceIdentifier<?>, DataObject> change) {
- receivedChangeEvent = change;
+ lastReceivedChangeEvent = change;
}
}
--- /dev/null
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.eclipse.xtext.xbase.lib.Exceptions;
+import org.opendaylight.controller.md.sal.common.api.RegistrationListener;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration;
+import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory;
+import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService;
+import org.opendaylight.controller.md.sal.common.api.data.DataReader;
+import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.MoreExecutors;
+
+public abstract class AbstractDataBroker<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>>
+ implements DataModificationTransactionFactory<P, D>, DataReader<P, D>, DataChangePublisher<P, D, DCL>,
+ DataProvisionService<P, D> {
+ private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class);
+
+ private ExecutorService executor;
+
+ public ExecutorService getExecutor() {
+ return this.executor;
+ }
+
+ public void setExecutor(final ExecutorService executor) {
+ this.executor = executor;
+ }
+
+ private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor();
+
+ public ExecutorService getNotificationExecutor() {
+ return this.notificationExecutor;
+ }
+
+ public void setNotificationExecutor(final ExecutorService notificationExecutor) {
+ this.notificationExecutor = notificationExecutor;
+ }
+
+ private AbstractDataReadRouter<P, D> dataReadRouter;
+
+ private final AtomicLong submittedTransactionsCount = new AtomicLong();
+
+ private final AtomicLong failedTransactionsCount = new AtomicLong();
+
+ private final AtomicLong finishedTransactionsCount = new AtomicLong();
+
+ public AbstractDataReadRouter<P, D> getDataReadRouter() {
+ return this.dataReadRouter;
+ }
+
+ public void setDataReadRouter(final AbstractDataReadRouter<P, D> dataReadRouter) {
+ this.dataReadRouter = dataReadRouter;
+ }
+
+ public AtomicLong getSubmittedTransactionsCount() {
+ return this.submittedTransactionsCount;
+ }
+
+ public AtomicLong getFailedTransactionsCount() {
+ return this.failedTransactionsCount;
+ }
+
+ public AtomicLong getFinishedTransactionsCount() {
+ return this.finishedTransactionsCount;
+ }
+
+ private final Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps
+ .synchronizedSetMultimap(HashMultimap.<P, DataChangeListenerRegistration<P, D, DCL>> create());
+
+ private final Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps
+ .synchronizedSetMultimap(HashMultimap.<P, DataCommitHandlerRegistrationImpl<P, D>> create());
+
+ private final Lock registrationLock = new ReentrantLock();
+
+ private final ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>> commitHandlerRegistrationListeners = new ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P, D>>>();
+
+ public AbstractDataBroker() {
+ }
+
+ protected ImmutableList<DataCommitHandler<P, D>> affectedCommitHandlers(final Set<P> paths) {
+ final Callable<ImmutableList<DataCommitHandler<P, D>>> _function = new Callable<ImmutableList<DataCommitHandler<P, D>>>() {
+ @Override
+ public ImmutableList<DataCommitHandler<P, D>> call() throws Exception {
+ Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
+ Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
+ FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
+ .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
+ final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
+ @Override
+ public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+ P _key = it.getKey();
+ boolean _isAffectedBy = isAffectedBy(_key, paths);
+ return _isAffectedBy;
+ }
+ };
+ FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
+ .filter(_function);
+ final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
+ @Override
+ public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
+ final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+ Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
+ return _value;
+ }
+ };
+ FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
+ .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
+ final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
+ @Override
+ public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
+ DataCommitHandler<P, D> _instance = it.getInstance();
+ return _instance;
+ }
+ };
+ FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
+ .<DataCommitHandler<P, D>> transform(_function_2);
+ return _transform.toList();
+ }
+ };
+ return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
+ }
+
+ protected ImmutableList<DataCommitHandler<P, D>> probablyAffectedCommitHandlers(final HashSet<P> paths) {
+ final Callable<ImmutableList<DataCommitHandler<P, D>>> _function = new Callable<ImmutableList<DataCommitHandler<P, D>>>() {
+ @Override
+ public ImmutableList<DataCommitHandler<P, D>> call() throws Exception {
+ Map<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _asMap = commitHandlers.asMap();
+ Set<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _entrySet = _asMap.entrySet();
+ FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _from = FluentIterable
+ .<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> from(_entrySet);
+ final Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _function = new Predicate<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>>() {
+ @Override
+ public boolean apply(final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+ P _key = it.getKey();
+ boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths);
+ return _isProbablyAffectedBy;
+ }
+ };
+ FluentIterable<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>> _filter = _from
+ .filter(_function);
+ final Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>> _function_1 = new Function<Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>>, Collection<DataCommitHandlerRegistrationImpl<P, D>>>() {
+ @Override
+ public Collection<DataCommitHandlerRegistrationImpl<P, D>> apply(
+ final Entry<P, Collection<DataCommitHandlerRegistrationImpl<P, D>>> it) {
+ Collection<DataCommitHandlerRegistrationImpl<P, D>> _value = it.getValue();
+ return _value;
+ }
+ };
+ FluentIterable<DataCommitHandlerRegistrationImpl<P, D>> _transformAndConcat = _filter
+ .<DataCommitHandlerRegistrationImpl<P, D>> transformAndConcat(_function_1);
+ final Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>> _function_2 = new Function<DataCommitHandlerRegistrationImpl<P, D>, DataCommitHandler<P, D>>() {
+ @Override
+ public DataCommitHandler<P, D> apply(final DataCommitHandlerRegistrationImpl<P, D> it) {
+ DataCommitHandler<P, D> _instance = it.getInstance();
+ return _instance;
+ }
+ };
+ FluentIterable<DataCommitHandler<P, D>> _transform = _transformAndConcat
+ .<DataCommitHandler<P, D>> transform(_function_2);
+ return _transform.toList();
+ }
+ };
+ return AbstractDataBroker.<ImmutableList<DataCommitHandler<P, D>>> withLock(this.registrationLock, _function);
+ }
+
+ protected Map<P, D> deepGetBySubpath(final Map<P, D> dataSet, final P path) {
+ return Collections.<P, D> emptyMap();
+ }
+
+ @Override
+ public final D readConfigurationData(final P path) {
+ AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
+ return _dataReadRouter.readConfigurationData(path);
+ }
+
+ @Override
+ public final D readOperationalData(final P path) {
+ AbstractDataReadRouter<P, D> _dataReadRouter = this.getDataReadRouter();
+ return _dataReadRouter.readOperationalData(path);
+ }
+
+ private static <T extends Object> T withLock(final Lock lock, final Callable<T> method) {
+ lock.lock();
+ try {
+ return method.call();
+ } catch (Exception e) {
+ throw Exceptions.sneakyThrow(e);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public final Registration<DataCommitHandler<P, D>> registerCommitHandler(final P path,
+ final DataCommitHandler<P, D> commitHandler) {
+ synchronized (commitHandler) {
+ final DataCommitHandlerRegistrationImpl<P, D> registration = new DataCommitHandlerRegistrationImpl<P, D>(
+ path, commitHandler, this);
+ commitHandlers.put(path, registration);
+ LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path);
+ for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.getInstance().onRegister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(),
+ e);
+ }
+ }
+ return registration;
+ }
+ }
+
+ @Override
+ public final ListenerRegistration<DCL> registerDataChangeListener(final P path, final DCL listener) {
+ synchronized (listeners) {
+ final DataChangeListenerRegistration<P, D, DCL> reg = new DataChangeListenerRegistration<P, D, DCL>(path,
+ listener, AbstractDataBroker.this);
+ listeners.put(path, reg);
+ final D initialConfig = getDataReadRouter().readConfigurationData(path);
+ final D initialOperational = getDataReadRouter().readOperationalData(path);
+ final DataChangeEvent<P, D> event = createInitialListenerEvent(path, initialConfig, initialOperational);
+ listener.onDataChanged(event);
+ return reg;
+ }
+ }
+
+ public final CompositeObjectRegistration<DataReader<P, D>> registerDataReader(final P path,
+ final DataReader<P, D> reader) {
+
+ final Registration<DataReader<P, D>> confReg = getDataReadRouter().registerConfigurationReader(path, reader);
+ final Registration<DataReader<P, D>> dataReg = getDataReadRouter().registerOperationalReader(path, reader);
+ return new CompositeObjectRegistration<DataReader<P, D>>(reader, Arrays.asList(confReg, dataReg));
+ }
+
+ @Override
+ public ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> registerCommitHandlerListener(
+ final RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {
+ final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> ret = this.commitHandlerRegistrationListeners
+ .register(commitHandlerListener);
+ return ret;
+ }
+
+ protected DataChangeEvent<P, D> createInitialListenerEvent(final P path, final D initialConfig,
+ final D initialOperational) {
+ InitialDataChangeEventImpl<P, D> _initialDataChangeEventImpl = new InitialDataChangeEventImpl<P, D>(
+ initialConfig, initialOperational);
+ return _initialDataChangeEventImpl;
+ }
+
+ protected final void removeListener(final DataChangeListenerRegistration<P, D, DCL> registration) {
+ synchronized (listeners) {
+ listeners.remove(registration.getPath(), registration);
+ }
+ }
+
+ protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl<P, D> registration) {
+ synchronized (commitHandlers) {
+
+ commitHandlers.remove(registration.getPath(), registration);
+ LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath());
+ for (final ListenerRegistration<RegistrationListener<DataCommitHandlerRegistration<P, D>>> listener : commitHandlerRegistrationListeners) {
+ try {
+ listener.getInstance().onUnregister(registration);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in listener {} during invoking onUnregister",
+ listener.getInstance(), e);
+ }
+ }
+ }
+
+ }
+
+ protected final Collection<Entry<P, DataCommitHandlerRegistrationImpl<P, D>>> getActiveCommitHandlers() {
+ return commitHandlers.entries();
+ }
+
+ protected ImmutableList<ListenerStateCapture<P, D, DCL>> affectedListeners(final Set<P> paths) {
+
+ synchronized (listeners) {
+ return FluentIterable //
+ .from(listeners.asMap().entrySet()) //
+ .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
+ @Override
+ public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+ return isAffectedBy(it.getKey(), paths);
+ }
+ }) //
+ .transform(
+ new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
+ @Override
+ public ListenerStateCapture<P, D, DCL> apply(
+ final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+ return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
+ createContainsPredicate(it.getKey()));
+ }
+ }) //
+ .toList();
+ }
+ }
+
+ protected ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners(final Set<P> paths) {
+ synchronized (listeners) {
+ return FluentIterable //
+ .from(listeners.asMap().entrySet()) //
+ .filter(new Predicate<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>>() {
+ @Override
+ public boolean apply(final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+ return isProbablyAffectedBy(it.getKey(), paths);
+ }
+ }) //
+ .transform(
+ new Function<Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>>, ListenerStateCapture<P, D, DCL>>() {
+ @Override
+ public ListenerStateCapture<P, D, DCL> apply(
+ final Entry<P, Collection<DataChangeListenerRegistration<P, D, DCL>>> it) {
+ return new ListenerStateCapture<P, D, DCL>(it.getKey(), it.getValue(),
+ createIsContainedPredicate(it.getKey()));
+ }
+ }) //
+ .toList();
+ }
+ }
+
+ protected Predicate<P> createContainsPredicate(final P key) {
+ return new Predicate<P>() {
+ @Override
+ public boolean apply(final P other) {
+ return key.contains(other);
+ }
+ };
+ }
+
+ protected Predicate<P> createIsContainedPredicate(final P key) {
+ return new Predicate<P>() {
+ @Override
+ public boolean apply(final P other) {
+ return other.contains(key);
+ }
+ };
+ }
+
+ protected boolean isAffectedBy(final P key, final Set<P> paths) {
+ final Predicate<P> contains = this.createContainsPredicate(key);
+ if (paths.contains(key)) {
+ return true;
+ }
+ for (final P path : paths) {
+ if (contains.apply(path)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ protected boolean isProbablyAffectedBy(final P key, final Set<P> paths) {
+ final Predicate<P> isContained = this.createIsContainedPredicate(key);
+ for (final P path : paths) {
+ if (isContained.apply(path)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ final Future<RpcResult<TransactionStatus>> commit(final AbstractDataTransaction<P, D> transaction) {
+ Preconditions.checkNotNull(transaction);
+ transaction.changeStatus(TransactionStatus.SUBMITED);
+ final TwoPhaseCommit<P, D, DCL> task = new TwoPhaseCommit<P, D, DCL>(transaction, this);
+ ;
+ this.getSubmittedTransactionsCount().getAndIncrement();
+ return this.getExecutor().submit(task);
+ }
+
+ private static class DataCommitHandlerRegistrationImpl<P extends Path<P>, D extends Object> //
+ extends AbstractObjectRegistration<DataCommitHandler<P, D>> //
+ implements DataCommitHandlerRegistration<P, D> {
+
+ private AbstractDataBroker<P, D, ? extends Object> dataBroker;
+ private final P path;
+
+ @Override
+ public P getPath() {
+ return this.path;
+ }
+
+ public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler<P, D> instance,
+ final AbstractDataBroker<P, D, ? extends Object> broker) {
+ super(instance);
+ this.dataBroker = broker;
+ this.path = path;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ this.dataBroker.removeCommitHandler(this);
+ this.dataBroker = null;
+ }
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.md.sal.common.impl.service\r
-\r
-import com.google.common.collect.FluentIterable\r
-import com.google.common.collect.HashMultimap\r
-import com.google.common.collect.ImmutableList\r
-import com.google.common.collect.Multimap\r
-import java.util.ArrayList\r
-import java.util.Arrays\r
-import java.util.Collection\r
-import java.util.Collections\r
-import java.util.HashSet\r
-import java.util.List\r
-import java.util.Set\r
-import java.util.concurrent.Callable\r
-import java.util.concurrent.ExecutorService\r
-import java.util.concurrent.Future\r
-import java.util.concurrent.atomic.AtomicLong\r
-import org.opendaylight.controller.md.sal.common.api.RegistrationListener\r
-import org.opendaylight.controller.md.sal.common.api.TransactionStatus\r
-import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener\r
-import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction\r
-import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration\r
-import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory\r
-import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService\r
-import org.opendaylight.controller.md.sal.common.api.data.DataReader\r
-import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification\r
-import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter\r
-import org.opendaylight.controller.sal.common.util.Rpcs\r
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration\r
-import org.opendaylight.yangtools.concepts.CompositeObjectRegistration\r
-import org.opendaylight.yangtools.concepts.ListenerRegistration\r
-import org.opendaylight.yangtools.concepts.Path\r
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry\r
-import org.opendaylight.yangtools.yang.common.RpcResult\r
-import org.slf4j.LoggerFactory\r
-\r
-import static com.google.common.base.Preconditions.*\rimport org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent
-import com.google.common.collect.Multimaps
-import java.util.concurrent.locks.Lock
-import java.util.concurrent.locks.ReentrantLock
-
-abstract class AbstractDataBroker<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements DataModificationTransactionFactory<P, D>, //\r
-DataReader<P, D>, //\r
-DataChangePublisher<P, D, DCL>, //\r
-DataProvisionService<P, D> {\r
-\r
- private static val LOG = LoggerFactory.getLogger(AbstractDataBroker);\r
-\r
- @Property\r
- var ExecutorService executor;\r
-\r
- @Property\r
- var AbstractDataReadRouter<P, D> dataReadRouter;\r
- \r
- @Property\r
- private val AtomicLong submittedTransactionsCount = new AtomicLong;\r
- \r
- @Property\r
- private val AtomicLong failedTransactionsCount = new AtomicLong\r
- \r
- @Property\r
- private val AtomicLong finishedTransactionsCount = new AtomicLong\r
-\r
- Multimap<P, DataChangeListenerRegistration<P, D, DCL>> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
- Multimap<P, DataCommitHandlerRegistrationImpl<P, D>> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create());\r
-
- private val Lock registrationLock = new ReentrantLock;
- \r
- val ListenerRegistry<RegistrationListener<DataCommitHandlerRegistration<P,D>>> commitHandlerRegistrationListeners = new ListenerRegistry();\r
- public new() {\r
- }\r
-\r
- protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedCommitHandlers(\r
- HashSet<P> paths) {
- return withLock(registrationLock) [|\r
- return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] //\r
- .transformAndConcat[value] //\r
- .transform[instance].toList()
- ]\r
- }\r
-\r
- override final readConfigurationData(P path) {\r
- return dataReadRouter.readConfigurationData(path);\r
- }\r
-\r
- override final readOperationalData(P path) {\r
- return dataReadRouter.readOperationalData(path);\r
- }
-
- private static def <T> withLock(Lock lock,Callable<T> method) {
- lock.lock
- try {
- return method.call
- } finally {
- lock.unlock
- }
- } \r
-\r
- override final registerCommitHandler(P path, DataCommitHandler<P, D> commitHandler) {
- return withLock(registrationLock) [|\r
- val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this);\r
- commitHandlers.put(path, registration)\r
- LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path);\r
- for(listener : commitHandlerRegistrationListeners) {\r
- try {\r
- listener.instance.onRegister(registration);\r
- } catch (Exception e) {\r
- LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e);\r
- }\r
- }
- return registration;
- ]\r
- }\r
-\r
- override final def registerDataChangeListener(P path, DCL listener) {\r
- return withLock(registrationLock) [|
- val reg = new DataChangeListenerRegistration(path, listener, this);\r
- listeners.put(path, reg);\r
- val initialConfig = dataReadRouter.readConfigurationData(path);\r
- val initialOperational = dataReadRouter.readOperationalData(path);\r
- val event = createInitialListenerEvent(path,initialConfig,initialOperational);\r
- listener.onDataChanged(event);\r
- return reg;
- ]\r
- }\r
-\r
- final def registerDataReader(P path, DataReader<P, D> reader) {\r
- return withLock(registrationLock) [|\r
- val confReg = dataReadRouter.registerConfigurationReader(path, reader);\r
- val dataReg = dataReadRouter.registerOperationalReader(path, reader);\r
- \r
- return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg));
- ]\r
- }\r
- \r
- override registerCommitHandlerListener(RegistrationListener<DataCommitHandlerRegistration<P, D>> commitHandlerListener) {\r
- val ret = commitHandlerRegistrationListeners.register(commitHandlerListener);\r
- return ret;\r
- }\r
- \r
- protected def DataChangeEvent<P,D> createInitialListenerEvent(P path,D initialConfig,D initialOperational) {\r
- return new InitialDataChangeEventImpl<P, D>(initialConfig,initialOperational);\r
- \r
- }\r
-\r
- protected final def removeListener(DataChangeListenerRegistration<P, D, DCL> registration) {
- return withLock(registrationLock) [|\r
- listeners.remove(registration.path, registration);
- ]\r
- }\r
-\r
- protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl<P, D> registration) {\r
- return withLock(registrationLock) [|
- commitHandlers.remove(registration.path, registration);\r
- LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path);\r
- for(listener : commitHandlerRegistrationListeners) {\r
- try {\r
- listener.instance.onUnregister(registration);\r
- } catch (Exception e) {\r
- LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e);\r
- }\r
- }
- return null;
- ]\r
- }\r
-\r
- protected final def getActiveCommitHandlers() {\r
- return commitHandlers.entries;\r
- }\r
-\r
- protected def /*Iterator<Entry<Collection<DataChangeListenerRegistration<P,D,DCL>>,D>>*/ affectedListenersWithInitialState(\r
- HashSet<P> paths) {
- return withLock(registrationLock) [|\r
- return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [\r
- val operationalState = readOperationalData(key)\r
- val configurationState = readConfigurationData(key)\r
- return new ListenerStateCapture(key, value, operationalState, configurationState)\r
- ].toList()
- ]\r
- }\r
-\r
- protected def boolean isAffectedBy(P key, Set<P> paths) {\r
- if (paths.contains(key)) {\r
- return true;\r
- }\r
- for (path : paths) {\r
- if (key.contains(path)) {\r
- return true;\r
- }\r
- }\r
-\r
- return false;\r
- }\r
-\r
- package final def Future<RpcResult<TransactionStatus>> commit(AbstractDataTransaction<P, D> transaction) {\r
- checkNotNull(transaction);\r
- transaction.changeStatus(TransactionStatus.SUBMITED);\r
- val task = new TwoPhaseCommit(transaction, this);\r
- submittedTransactionsCount.andIncrement;\r
- return executor.submit(task);\r
- }\r
-\r
-}\r
-\r
-@Data\r
-package class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {\r
-\r
- @Property\r
- P path;\r
-\r
- @Property\r
- Collection<DataChangeListenerRegistration<P, D, DCL>> listeners;\r
-\r
- @Property\r
- D initialOperationalState;\r
-\r
- @Property\r
- D initialConfigurationState;\r
-}\r
-\r
-package class DataChangeListenerRegistration<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> extends AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {\r
-\r
- AbstractDataBroker<P, D, DCL> dataBroker;\r
-\r
- @Property\r
- val P path;\r
-\r
- new(P path, DCL instance, AbstractDataBroker<P, D, DCL> broker) {\r
- super(instance)\r
- dataBroker = broker;\r
- _path = path;\r
- }\r
-\r
- override protected removeRegistration() {\r
- dataBroker.removeListener(this);\r
- dataBroker = null;\r
- }\r
-\r
-}\r
-\r
-package class DataCommitHandlerRegistrationImpl<P extends Path<P>, D> //\r
-extends AbstractObjectRegistration<DataCommitHandler<P, D>> //\r
-implements DataCommitHandlerRegistration<P, D> {\r
-\r
- AbstractDataBroker<P, D, ?> dataBroker;\r
-\r
- @Property\r
- val P path;\r
-\r
- new(P path, DataCommitHandler<P, D> instance, AbstractDataBroker<P, D, ?> broker) {\r
- super(instance)\r
- dataBroker = broker;\r
- _path = path;\r
- }\r
-\r
- override protected removeRegistration() {\r
- dataBroker.removeCommitHandler(this);\r
- dataBroker = null;\r
- }\r
-}\r
-\r
-package class TwoPhaseCommit<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> implements Callable<RpcResult<TransactionStatus>> {\r
-\r
- private static val log = LoggerFactory.getLogger(TwoPhaseCommit);\r
-\r
- val AbstractDataTransaction<P, D> transaction;\r
- val AbstractDataBroker<P, D, DCL> dataBroker;\r
- \r
- new(AbstractDataTransaction<P, D> transaction, AbstractDataBroker<P, D, DCL> broker) {\r
- this.transaction = transaction;\r
- this.dataBroker = broker;\r
- }\r
-\r
- override call() throws Exception {\r
-\r
- // get affected paths\r
- val affectedPaths = new HashSet<P>();\r
-\r
- affectedPaths.addAll(transaction.createdConfigurationData.keySet);\r
- affectedPaths.addAll(transaction.updatedConfigurationData.keySet);\r
- affectedPaths.addAll(transaction.removedConfigurationData);\r
-\r
- affectedPaths.addAll(transaction.createdOperationalData.keySet);\r
- affectedPaths.addAll(transaction.updatedOperationalData.keySet);\r
- affectedPaths.addAll(transaction.removedOperationalData);\r
-
- val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths);\r
-\r
- val transactionId = transaction.identifier;\r
-\r
- log.trace("Transaction: {} Started.",transactionId);\r
- log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths);
- // requesting commits\r
- val Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths);\r
- val List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList();\r
- try {\r
- for (handler : commitHandlers) {\r
- handlerTransactions.add(handler.requestCommit(transaction));\r
- }\r
- } catch (Exception e) {\r
- log.error("Transaction: {} Request Commit failed", transactionId,e);\r
- dataBroker.failedTransactionsCount.andIncrement\r
- transaction.changeStatus(TransactionStatus.FAILED)
- return rollback(handlerTransactions, e);\r
- }\r
- val List<RpcResult<Void>> results = new ArrayList();\r
- try {\r
- for (subtransaction : handlerTransactions) {\r
- results.add(subtransaction.finish());\r
- }\r
- listeners.publishDataChangeEvent();\r
- } catch (Exception e) {\r
- log.error("Transaction: {} Finish Commit failed",transactionId, e);\r
- dataBroker.failedTransactionsCount.andIncrement
- transaction.changeStatus(TransactionStatus.FAILED)\r
- return rollback(handlerTransactions, e);\r
- }\r
- log.trace("Transaction: {} Finished successfully.",transactionId);\r
- dataBroker.finishedTransactionsCount.andIncrement;
- transaction.changeStatus(TransactionStatus.COMMITED)\r
- return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet());\r
-\r
- }\r
-\r
- def void publishDataChangeEvent(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {\r
- dataBroker.executor.submit [|\r
- for (listenerSet : listeners) {
- val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path);
- val updatedOperational = dataBroker.readOperationalData(listenerSet.path);
-
- val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState,
- listenerSet.initialOperationalState, updatedOperational, updatedConfiguration);
- for (listener : listenerSet.listeners) {
- try {
- listener.instance.onDataChanged(changeEvent);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- } \r
- ]\r
- }\r
-\r
- def rollback(List<DataCommitTransaction<P, D>> transactions, Exception e) {\r
- for (transaction : transactions) {\r
- transaction.rollback()\r
- }\r
-\r
- // FIXME return encountered error.\r
- return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet());\r
- }\r
-}\r
-\r
-public abstract class AbstractDataTransaction<P extends Path<P>, D> extends AbstractDataModification<P, D> {\r
-\r
- private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction);
-
- @Property\r
- private val Object identifier;\r
-\r
- var TransactionStatus status;\r
-\r
- var AbstractDataBroker<P, D, ?> broker;\r
-\r
- protected new(Object identifier,AbstractDataBroker<P, D, ?> dataBroker) {\r
- super(dataBroker);\r
- _identifier = identifier;\r
- broker = dataBroker;\r
- status = TransactionStatus.NEW;\r
- LOG.debug("Transaction {} Allocated.", identifier);
-\r
- //listeners = new ListenerRegistry<>();\r
- }\r
-\r
- override commit() {\r
- return broker.commit(this);\r
- }\r
-\r
- override readConfigurationData(P path) {\r
- val local = this.updatedConfigurationData.get(path);\r
- if(local != null) {\r
- return local;\r
- }\r
- \r
- return broker.readConfigurationData(path);\r
- }\r
-\r
- override readOperationalData(P path) {\r
- val local = this.updatedOperationalData.get(path);\r
- if(local != null) {\r
- return local;\r
- }\r
- return broker.readOperationalData(path);\r
- }\r
-\r
- override hashCode() {\r
- return identifier.hashCode;\r
- }\r
-\r
- override equals(Object obj) {\r
- if (this === obj)\r
- return true;\r
- if (obj == null)\r
- return false;\r
- if (getClass() != obj.getClass())\r
- return false;\r
- val other = (obj as AbstractDataTransaction<P,D>);\r
- if (broker == null) {\r
- if (other.broker != null)\r
- return false;\r
- } else if (!broker.equals(other.broker))\r
- return false;\r
- if (identifier == null) {\r
- if (other.identifier != null)\r
- return false;\r
- } else if (!identifier.equals(other.identifier))\r
- return false;\r
- return true;\r
- }\r
-\r
- override TransactionStatus getStatus() {\r
- return status;\r
- }\r
-\r
- protected abstract def void onStatusChange(TransactionStatus status);\r
-\r
- public def changeStatus(TransactionStatus status) {\r
- LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status);
- this.status = status;\r
- onStatusChange(status);\r
- }\r
-\r
-}\r
--- /dev/null
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.concurrent.Future;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all")
+public abstract class AbstractDataTransaction<P extends Path<P>, D extends Object> extends
+ AbstractDataModification<P, D> {
+ private final static Logger LOG = LoggerFactory.getLogger(AbstractDataTransaction.class);
+
+ private final Object identifier;
+
+ @Override
+ public Object getIdentifier() {
+ return this.identifier;
+ }
+
+ private TransactionStatus status;
+
+ private final AbstractDataBroker<P, D, ? extends Object> broker;
+
+ protected AbstractDataTransaction(final Object identifier,
+ final AbstractDataBroker<P, D, ? extends Object> dataBroker) {
+ super(dataBroker);
+ this.identifier = identifier;
+ this.broker = dataBroker;
+ this.status = TransactionStatus.NEW;
+ AbstractDataTransaction.LOG.debug("Transaction {} Allocated.", identifier);
+ }
+
+ @Override
+ public Future<RpcResult<TransactionStatus>> commit() {
+ return this.broker.commit(this);
+ }
+
+ @Override
+ public D readConfigurationData(final P path) {
+ final D local = getUpdatedConfigurationData().get(path);
+ if (local != null) {
+ return local;
+ }
+ return this.broker.readConfigurationData(path);
+ }
+
+ @Override
+ public D readOperationalData(final P path) {
+ final D local = this.getUpdatedOperationalData().get(path);
+ if (local != null) {
+ return local;
+ }
+ return this.broker.readOperationalData(path);
+ }
+
+
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((identifier == null) ? 0 : identifier.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ AbstractDataTransaction other = (AbstractDataTransaction) obj;
+ if (identifier == null) {
+ if (other.identifier != null)
+ return false;
+ } else if (!identifier.equals(other.identifier))
+ return false;
+ return true;
+ }
+
+ @Override
+ public TransactionStatus getStatus() {
+ return this.status;
+ }
+
+ protected abstract void onStatusChange(final TransactionStatus status);
+
+ public void changeStatus(final TransactionStatus status) {
+ Object _identifier = this.getIdentifier();
+ AbstractDataTransaction.LOG
+ .debug("Transaction {} transitioned from {} to {}", _identifier, this.status, status);
+ this.status = status;
+ this.onStatusChange(status);
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Path;
+
+@SuppressWarnings("all")
+class DataChangeListenerRegistration<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>> extends
+ AbstractObjectRegistration<DCL> implements ListenerRegistration<DCL> {
+ private AbstractDataBroker<P, D, DCL> dataBroker;
+
+ private final P path;
+
+ public P getPath() {
+ return this.path;
+ }
+
+ public DataChangeListenerRegistration(final P path, final DCL instance, final AbstractDataBroker<P, D, DCL> broker) {
+ super(instance);
+ this.dataBroker = broker;
+ this.path = path;
+ }
+
+ @Override
+ protected void removeRegistration() {
+ this.dataBroker.removeListener(this);
+ this.dataBroker = null;
+ }
+}
--- /dev/null
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.yangtools.concepts.Path;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+final class ImmutableDataChangeEvent<P extends Path<P>, D> implements DataChangeEvent<P,D> {
+
+ private final D updatedOperationalSubtree;
+ private final Map<P, D> updatedOperational;
+ private final D updatedConfigurationSubtree;
+ private final Map<P, D> updatedConfiguration;
+ private final Set<P> removedOperational;
+ private final Set<P> removedConfiguration;
+ private final D originalOperationalSubtree;
+ private final Map<P, D> originalOperational;
+ private final D originalConfigurationSubtree;
+ private final Map<P, D> originalConfiguration;
+ private final Map<P, D> createdOperational;
+ private final Map<P, D> createdConfiguration;
+
+
+ public ImmutableDataChangeEvent(Builder<P, D> builder) {
+
+ createdConfiguration = builder.getCreatedConfiguration().build();
+ createdOperational = builder.getCreatedOperational().build();
+ originalConfiguration = builder.getOriginalConfiguration().build();
+ originalConfigurationSubtree = builder.getOriginalConfigurationSubtree();
+ originalOperational = builder.getOriginalOperational().build();
+ originalOperationalSubtree = builder.getOriginalOperationalSubtree();
+ removedConfiguration = builder.getRemovedConfiguration().build();
+ removedOperational = builder.getRemovedOperational().build();
+ updatedConfiguration = builder.getUpdatedConfiguration().build();
+ updatedConfigurationSubtree = builder.getUpdatedConfigurationSubtree();
+ updatedOperational = builder.getUpdatedOperational().build();
+ updatedOperationalSubtree = builder.getUpdatedOperationalSubtree();
+ }
+
+ @Override
+ public Map<P, D> getCreatedConfigurationData() {
+ return createdConfiguration;
+ }
+
+ @Override
+ public Map<P, D> getCreatedOperationalData() {
+ return createdOperational;
+ }
+
+ @Override
+ public Map<P, D> getOriginalConfigurationData() {
+ return originalConfiguration;
+ }
+ @Override
+ public D getOriginalConfigurationSubtree() {
+ return originalConfigurationSubtree;
+ }
+ @Override
+ public Map<P, D> getOriginalOperationalData() {
+ return originalOperational;
+ }
+ @Override
+ public D getOriginalOperationalSubtree() {
+ return originalOperationalSubtree;
+ }
+ @Override
+ public Set<P> getRemovedConfigurationData() {
+ return removedConfiguration;
+ }
+ @Override
+ public Set<P> getRemovedOperationalData() {
+ return removedOperational;
+ }
+ @Override
+ public Map<P, D> getUpdatedConfigurationData() {
+ return updatedConfiguration;
+ }
+ @Override
+ public D getUpdatedConfigurationSubtree() {
+ return updatedConfigurationSubtree;
+ }
+ @Override
+ public Map<P, D> getUpdatedOperationalData() {
+ return updatedOperational;
+ }
+ @Override
+ public D getUpdatedOperationalSubtree() {
+ return updatedOperationalSubtree;
+ }
+
+ static final <P extends Path<P>,D> Builder<P, D> builder() {
+ return new Builder<>();
+ }
+
+ static final class Builder<P extends Path<P>,D> {
+
+ private D updatedOperationalSubtree;
+ private D originalOperationalSubtree;
+ private D originalConfigurationSubtree;
+ private D updatedConfigurationSubtree;
+
+ private final ImmutableMap.Builder<P, D> updatedOperational = ImmutableMap.builder();
+ private final ImmutableMap.Builder<P, D> updatedConfiguration = ImmutableMap.builder();
+ private final ImmutableSet.Builder<P> removedOperational = ImmutableSet.builder();
+ private final ImmutableSet.Builder<P> removedConfiguration = ImmutableSet.builder();
+ private final ImmutableMap.Builder<P, D> originalOperational = ImmutableMap.builder();
+
+ private final ImmutableMap.Builder<P, D> originalConfiguration = ImmutableMap.builder();
+ private final ImmutableMap.Builder<P, D> createdOperational = ImmutableMap.builder();
+ private final ImmutableMap.Builder<P, D> createdConfiguration = ImmutableMap.builder();
+
+
+ protected Builder<P,D> addTransaction(DataModification<P, D> data, Predicate<P> keyFilter) {
+ updatedOperational.putAll(Maps.filterKeys(data.getUpdatedOperationalData(), keyFilter));
+ updatedConfiguration.putAll(Maps.filterKeys(data.getUpdatedConfigurationData(), keyFilter));
+ originalConfiguration.putAll(Maps.filterKeys(data.getOriginalConfigurationData(), keyFilter));
+ originalOperational.putAll(Maps.filterKeys(data.getOriginalOperationalData(), keyFilter));
+ createdOperational.putAll(Maps.filterKeys(data.getCreatedOperationalData(), keyFilter));
+ createdConfiguration.putAll(Maps.filterKeys(data.getCreatedConfigurationData(), keyFilter));
+ return this;
+ }
+
+ protected Builder<P, D> addConfigurationChangeSet(RootedChangeSet<P, D> changeSet) {
+ if(changeSet == null) {
+ return this;
+ }
+
+ originalConfiguration.putAll(changeSet.getOriginal());
+ createdConfiguration.putAll(changeSet.getCreated());
+ updatedConfiguration.putAll(changeSet.getUpdated());
+ removedConfiguration.addAll(changeSet.getRemoved());
+ return this;
+ }
+
+ protected Builder<P, D> addOperationalChangeSet(RootedChangeSet<P, D> changeSet) {
+ if(changeSet == null) {
+ return this;
+ }
+ originalOperational.putAll(changeSet.getOriginal());
+ createdOperational.putAll(changeSet.getCreated());
+ updatedOperational.putAll(changeSet.getUpdated());
+ removedOperational.addAll(changeSet.getRemoved());
+ return this;
+ }
+
+ protected ImmutableDataChangeEvent<P, D> build() {
+ return new ImmutableDataChangeEvent<P,D>(this);
+ }
+
+ protected D getUpdatedOperationalSubtree() {
+ return updatedOperationalSubtree;
+ }
+
+ protected Builder<P, D> setUpdatedOperationalSubtree(D updatedOperationalSubtree) {
+ this.updatedOperationalSubtree = updatedOperationalSubtree;
+ return this;
+ }
+
+ protected D getOriginalOperationalSubtree() {
+ return originalOperationalSubtree;
+ }
+
+ protected Builder<P,D> setOriginalOperationalSubtree(D originalOperationalSubtree) {
+ this.originalOperationalSubtree = originalOperationalSubtree;
+ return this;
+ }
+
+ protected D getOriginalConfigurationSubtree() {
+ return originalConfigurationSubtree;
+ }
+
+ protected Builder<P, D> setOriginalConfigurationSubtree(D originalConfigurationSubtree) {
+ this.originalConfigurationSubtree = originalConfigurationSubtree;
+ return this;
+ }
+
+ protected D getUpdatedConfigurationSubtree() {
+ return updatedConfigurationSubtree;
+ }
+
+ protected Builder<P,D> setUpdatedConfigurationSubtree(D updatedConfigurationSubtree) {
+ this.updatedConfigurationSubtree = updatedConfigurationSubtree;
+ return this;
+ }
+
+ protected ImmutableMap.Builder<P, D> getUpdatedOperational() {
+ return updatedOperational;
+ }
+
+ protected ImmutableMap.Builder<P, D> getUpdatedConfiguration() {
+ return updatedConfiguration;
+ }
+
+ protected ImmutableSet.Builder<P> getRemovedOperational() {
+ return removedOperational;
+ }
+
+ protected ImmutableSet.Builder<P> getRemovedConfiguration() {
+ return removedConfiguration;
+ }
+
+ protected ImmutableMap.Builder<P, D> getOriginalOperational() {
+ return originalOperational;
+ }
+
+ protected ImmutableMap.Builder<P, D> getOriginalConfiguration() {
+ return originalConfiguration;
+ }
+
+ protected ImmutableMap.Builder<P, D> getCreatedOperational() {
+ return createdOperational;
+ }
+
+ protected ImmutableMap.Builder<P, D> getCreatedConfiguration() {
+ return createdConfiguration;
+ }
+ }
+
+}
--- /dev/null
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.Map;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.DataModification;
+import org.opendaylight.yangtools.concepts.Path;
+
+import com.google.common.base.Predicate;
+
+public final class ListenerStateCapture<P extends Path<P>, D, DCL extends DataChangeListener<P, D>> {
+
+ final P path;
+
+ final Iterable<DataChangeListenerRegistration<P, D, DCL>> listeners;
+
+ D initialOperationalState;
+
+ D initialConfigurationState;
+
+ D finalConfigurationState;
+
+ D finalOperationalState;
+
+ Map<P, D> additionalConfigOriginal;
+ Map<P, D> additionalConfigCreated;
+ Map<P, D> additionalConfigUpdated;
+ Map<P, D> additionalConfigDeleted;
+
+ Map<P, D> additionalOperOriginal;
+ Map<P, D> additionalOperCreated;
+ Map<P, D> additionalOperUpdated;
+ Map<P, D> additionalOperDeleted;
+
+ RootedChangeSet<P, D> normalizedConfigurationChanges;
+ RootedChangeSet<P, D> normalizedOperationalChanges;
+
+ private final Predicate<P> containsPredicate;
+
+ public ListenerStateCapture(P path, Iterable<DataChangeListenerRegistration<P, D, DCL>> listeners,
+ Predicate<P> containsPredicate) {
+ super();
+ this.path = path;
+ this.listeners = listeners;
+ this.containsPredicate = containsPredicate;
+ }
+
+ protected D getInitialOperationalState() {
+ return initialOperationalState;
+ }
+
+ protected void setInitialOperationalState(D initialOperationalState) {
+ this.initialOperationalState = initialOperationalState;
+ }
+
+ protected D getInitialConfigurationState() {
+ return initialConfigurationState;
+ }
+
+ protected void setInitialConfigurationState(D initialConfigurationState) {
+ this.initialConfigurationState = initialConfigurationState;
+ }
+
+ protected P getPath() {
+ return path;
+ }
+
+ protected Iterable<DataChangeListenerRegistration<P, D, DCL>> getListeners() {
+ return listeners;
+ }
+
+ protected D getFinalConfigurationState() {
+ return finalConfigurationState;
+ }
+
+ protected void setFinalConfigurationState(D finalConfigurationState) {
+ this.finalConfigurationState = finalConfigurationState;
+ }
+
+ protected D getFinalOperationalState() {
+ return finalOperationalState;
+ }
+
+ protected void setFinalOperationalState(D finalOperationalState) {
+ this.finalOperationalState = finalOperationalState;
+ }
+
+ protected RootedChangeSet<P, D> getNormalizedConfigurationChanges() {
+ return normalizedConfigurationChanges;
+ }
+
+ protected void setNormalizedConfigurationChanges(RootedChangeSet<P, D> normalizedConfigurationChanges) {
+ this.normalizedConfigurationChanges = normalizedConfigurationChanges;
+ }
+
+ protected RootedChangeSet<P, D> getNormalizedOperationalChanges() {
+ return normalizedOperationalChanges;
+ }
+
+ protected void setNormalizedOperationalChanges(RootedChangeSet<P, D> normalizedOperationalChange) {
+ this.normalizedOperationalChanges = normalizedOperationalChange;
+ }
+
+ protected DataChangeEvent<P, D> createEvent(DataModification<P, D> modification) {
+ return ImmutableDataChangeEvent.<P, D> builder()//
+ .addTransaction(modification, containsPredicate) //
+ .addConfigurationChangeSet(normalizedConfigurationChanges) //
+ .addOperationalChangeSet(normalizedOperationalChanges) //
+ .setOriginalConfigurationSubtree(initialConfigurationState) //
+ .setOriginalOperationalSubtree(initialOperationalState) //
+ .setUpdatedConfigurationSubtree(finalConfigurationState) //
+ .setUpdatedOperationalSubtree(finalOperationalState) //
+ .build();
+
+ }
+
+}
--- /dev/null
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.opendaylight.yangtools.concepts.Path;
+
+public class RootedChangeSet<P extends Path<P>,D> {
+
+ private final P root;
+ private final Map<P,D> original;
+ private final Map<P,D> created = new HashMap<>();
+ private final Map<P,D> updated = new HashMap<>();
+ private final Set<P> removed = new HashSet<>();
+
+
+
+ public RootedChangeSet(P root,Map<P, D> original) {
+ super();
+ this.root = root;
+ this.original = original;
+ }
+
+ protected P getRoot() {
+ return root;
+ }
+
+ protected Map<P, D> getOriginal() {
+ return original;
+ }
+
+ protected Map<P, D> getCreated() {
+ return created;
+ }
+
+ protected Map<P, D> getUpdated() {
+ return updated;
+ }
+
+ protected Set<P> getRemoved() {
+ return removed;
+ }
+
+ public void addCreated(Map<P,D> created) {
+ this.created.putAll(created);
+ }
+
+ public void addCreated(Entry<P,D> entry) {
+ created.put(entry.getKey(), entry.getValue());
+ }
+
+ public void addUpdated(Entry<P,D> entry) {
+ updated.put(entry.getKey(), entry.getValue());
+ }
+
+ public void addRemoval(P path) {
+ removed.add(path);
+ }
+
+ public boolean isChange() {
+ return !created.isEmpty() || !updated.isEmpty() || !removed.isEmpty();
+ }
+}
--- /dev/null
+/**
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler;
+import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.yangtools.concepts.Path;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableList.Builder;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TwoPhaseCommit<P extends Path<P>, D extends Object, DCL extends DataChangeListener<P, D>> implements
+ Callable<RpcResult<TransactionStatus>> {
+ private final static Logger log = LoggerFactory.getLogger(TwoPhaseCommit.class);
+
+ private final AbstractDataTransaction<P, D> transaction;
+
+ private final AbstractDataBroker<P, D, DCL> dataBroker;
+
+ public TwoPhaseCommit(final AbstractDataTransaction<P, D> transaction, final AbstractDataBroker<P, D, DCL> broker) {
+ this.transaction = transaction;
+ this.dataBroker = broker;
+ }
+
+ @Override
+ public RpcResult<TransactionStatus> call() throws Exception {
+ final Object transactionId = this.transaction.getIdentifier();
+
+ Set<P> changedPaths = ImmutableSet.<P> builder().addAll(transaction.getUpdatedConfigurationData().keySet())
+ .addAll(transaction.getCreatedConfigurationData().keySet())
+ .addAll(transaction.getRemovedConfigurationData())
+ .addAll(transaction.getUpdatedOperationalData().keySet())
+ .addAll(transaction.getCreatedOperationalData().keySet())
+ .addAll(transaction.getRemovedOperationalData()).build();
+
+ log.trace("Transaction: {} Affected Subtrees:", transactionId, changedPaths);
+
+ final ImmutableList.Builder<ListenerStateCapture<P, D, DCL>> listenersBuilder = ImmutableList.builder();
+ listenersBuilder.addAll(dataBroker.affectedListeners(changedPaths));
+ filterProbablyAffectedListeners(dataBroker.probablyAffectedListeners(changedPaths),listenersBuilder);
+
+
+
+ final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners = listenersBuilder.build();
+ final Iterable<DataCommitHandler<P, D>> commitHandlers = dataBroker.affectedCommitHandlers(changedPaths);
+ captureInitialState(listeners);
+
+
+ log.trace("Transaction: {} Starting Request Commit.",transactionId);
+ final List<DataCommitTransaction<P, D>> handlerTransactions = new ArrayList<>();
+ try {
+ for (final DataCommitHandler<P, D> handler : commitHandlers) {
+ DataCommitTransaction<P, D> requestCommit = handler.requestCommit(this.transaction);
+ if (requestCommit != null) {
+ handlerTransactions.add(requestCommit);
+ } else {
+ log.debug("Transaction: {}, Handler {} is not participating in transaction.", transactionId,
+ handler);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Transaction: {} Request Commit failed", transactionId, e);
+ dataBroker.getFailedTransactionsCount().getAndIncrement();
+ this.transaction.changeStatus(TransactionStatus.FAILED);
+ return this.rollback(handlerTransactions, e);
+
+ }
+
+ log.trace("Transaction: {} Starting Finish.",transactionId);
+ final List<RpcResult<Void>> results = new ArrayList<RpcResult<Void>>();
+ try {
+ for (final DataCommitTransaction<P, D> subtransaction : handlerTransactions) {
+ results.add(subtransaction.finish());
+ }
+ } catch (Exception e) {
+ log.error("Transaction: {} Finish Commit failed", transactionId, e);
+ dataBroker.getFailedTransactionsCount().getAndIncrement();
+ transaction.changeStatus(TransactionStatus.FAILED);
+ return this.rollback(handlerTransactions, e);
+ }
+
+
+ dataBroker.getFinishedTransactionsCount().getAndIncrement();
+ transaction.changeStatus(TransactionStatus.COMMITED);
+
+ log.trace("Transaction: {} Finished successfully.", transactionId);
+
+ captureFinalState(listeners);
+
+ log.trace("Transaction: {} Notifying listeners.");
+
+ publishDataChangeEvent(listeners);
+ return Rpcs.<TransactionStatus> getRpcResult(true, TransactionStatus.COMMITED,
+ Collections.<RpcError> emptySet());
+ }
+
+ private void captureInitialState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+ for (ListenerStateCapture<P, D, DCL> state : listeners) {
+ state.setInitialConfigurationState(dataBroker.readConfigurationData(state.getPath()));
+ state.setInitialOperationalState(dataBroker.readOperationalData(state.getPath()));
+ }
+ }
+
+
+ private void captureFinalState(ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+ for (ListenerStateCapture<P, D, DCL> state : listeners) {
+ state.setFinalConfigurationState(dataBroker.readConfigurationData(state.getPath()));
+ state.setFinalOperationalState(dataBroker.readOperationalData(state.getPath()));
+ }
+ }
+
+ private void filterProbablyAffectedListeners(
+ ImmutableList<ListenerStateCapture<P, D, DCL>> probablyAffectedListeners, Builder<ListenerStateCapture<P, D, DCL>> reallyAffected) {
+
+ for(ListenerStateCapture<P, D, DCL> listenerSet : probablyAffectedListeners) {
+ P affectedPath = listenerSet.getPath();
+ Optional<RootedChangeSet<P,D>> configChange = resolveConfigChange(affectedPath);
+ Optional<RootedChangeSet<P, D>> operChange = resolveOperChange(affectedPath);
+
+ if(configChange.isPresent() || operChange.isPresent()) {
+ reallyAffected.add(listenerSet);
+ if(configChange.isPresent()) {
+ listenerSet.setNormalizedConfigurationChanges(configChange.get());
+ }
+
+ if(operChange.isPresent()) {
+ listenerSet.setNormalizedOperationalChanges(operChange.get());
+ }
+ }
+ }
+ }
+
+ private Optional<RootedChangeSet<P, D>> resolveOperChange(P affectedPath) {
+ Map<P, D> originalOper = dataBroker.deepGetBySubpath(transaction.getOriginalOperationalData(),affectedPath);
+ Map<P, D> createdOper = dataBroker.deepGetBySubpath(transaction.getCreatedOperationalData(),affectedPath);
+ Map<P, D> updatedOper = dataBroker.deepGetBySubpath(transaction.getUpdatedOperationalData(),affectedPath);
+ Set<P> removedOper = Sets.filter(transaction.getRemovedOperationalData(), dataBroker.createIsContainedPredicate(affectedPath));
+ return resolveChanges(affectedPath,originalOper,createdOper,updatedOper,removedOper);
+ }
+
+ private Optional<RootedChangeSet<P, D>> resolveConfigChange(P affectedPath) {
+ Map<P, D> originalConfig = dataBroker.deepGetBySubpath(transaction.getOriginalConfigurationData(),affectedPath);
+ Map<P, D> createdConfig = dataBroker.deepGetBySubpath(transaction.getCreatedConfigurationData(),affectedPath);
+ Map<P, D> updatedConfig = dataBroker.deepGetBySubpath(transaction.getUpdatedConfigurationData(),affectedPath);
+ Set<P> removedConfig = Sets.filter(transaction.getRemovedConfigurationData(), dataBroker.createIsContainedPredicate(affectedPath));
+ return resolveChanges(affectedPath,originalConfig,createdConfig,updatedConfig,removedConfig);
+ }
+
+ private Optional<RootedChangeSet<P,D>> resolveChanges(P affectedPath, Map<P, D> originalConfig, Map<P, D> createdConfig, Map<P, D> updatedConfig,Set<P> potentialDeletions) {
+ Predicate<P> isContained = dataBroker.createIsContainedPredicate(affectedPath);
+
+ if(createdConfig.isEmpty() && updatedConfig.isEmpty() && potentialDeletions.isEmpty()) {
+ return Optional.absent();
+ }
+ RootedChangeSet<P, D> changeSet = new RootedChangeSet<P,D>(affectedPath,originalConfig);
+ changeSet.addCreated(createdConfig);
+
+ for(Entry<P, D> entry : updatedConfig.entrySet()) {
+ if(originalConfig.containsKey(entry.getKey())) {
+ changeSet.addUpdated(entry);
+ } else {
+ changeSet.addCreated(entry);
+ }
+ }
+
+ for(Entry<P,D> entry : originalConfig.entrySet()) {
+ for(P deletion : potentialDeletions) {
+ if(isContained.apply(deletion)) {
+ changeSet.addRemoval(entry.getKey());
+ }
+ }
+ }
+
+ if(changeSet.isChange()) {
+ return Optional.of(changeSet);
+ } else {
+ return Optional.absent();
+ }
+
+ }
+
+ public void publishDataChangeEvent(final ImmutableList<ListenerStateCapture<P, D, DCL>> listeners) {
+ ExecutorService executor = this.dataBroker.getExecutor();
+ final Runnable notifyTask = new Runnable() {
+ @Override
+ public void run() {
+ for (final ListenerStateCapture<P, D, DCL> listenerSet : listeners) {
+ {
+ DataChangeEvent<P, D> changeEvent = listenerSet.createEvent(transaction);
+ for (final DataChangeListenerRegistration<P, D, DCL> listener : listenerSet.getListeners()) {
+ try {
+ listener.getInstance().onDataChanged(changeEvent);
+ } catch (Exception e) {
+ log.error("Unhandled exception when invoking listener {}", listener);
+ }
+ }
+ }
+ }
+ }
+ };
+ executor.submit(notifyTask);
+ }
+
+ public RpcResult<TransactionStatus> rollback(final List<DataCommitTransaction<P, D>> transactions, final Exception e) {
+ for (final DataCommitTransaction<P, D> transaction : transactions) {
+ transaction.rollback();
+ }
+ Set<RpcError> _emptySet = Collections.<RpcError> emptySet();
+ return Rpcs.<TransactionStatus> getRpcResult(false, TransactionStatus.FAILED, _emptySet);
+ }
+}
import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl;
import org.opendaylight.yangtools.yang.model.api.DataSchemaNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
import org.opendaylight.yangtools.yang.model.api.SchemaServiceListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SchemaAwareDataStoreAdapter extends AbstractLockableDelegator<DataStore> implements //
DataStore, //
SchemaServiceListener, //
+ SchemaContextListener, //
AutoCloseable {
private final static Logger LOG = LoggerFactory.getLogger(SchemaAwareDataStoreAdapter.class);