From: Ed Warnicke Date: Fri, 14 Feb 2014 15:36:23 +0000 (+0000) Subject: Merge "Fixed bug when Binding-Aware Data Change Listeners we're not triggered." X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~456 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=f08d2774c06e55e377191b027ec9131921977e70;hp=6f593900d49acf4e04eb6fc0bbaeaeb0f47a4552 Merge "Fixed bug when Binding-Aware Data Change Listeners we're not triggered." --- diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java index 286b0c378c..7357926b9e 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/config/yang/md/sal/binding/impl/DataBrokerImplModule.java @@ -7,17 +7,17 @@ */ package org.opendaylight.controller.config.yang.md.sal.binding.impl; -import java.util.concurrent.ExecutorService; - -import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; -import org.opendaylight.controller.sal.binding.impl.RootDataBrokerImpl; -import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingDomConnectorDeployer; -import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentConnector; -import org.opendaylight.controller.sal.binding.impl.forward.DomForwardedDataBrokerImpl; -import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; -import org.osgi.framework.BundleContext; -import org.osgi.framework.ServiceReference; +import java.util.concurrent.ExecutorService; + +import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder; +import org.opendaylight.controller.sal.binding.impl.RootDataBrokerImpl; +import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingDomConnectorDeployer; +import org.opendaylight.controller.sal.binding.impl.connect.dom.BindingIndependentConnector; +import org.opendaylight.controller.sal.binding.impl.forward.DomForwardedDataBrokerImpl; +import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; +import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; /** * @@ -57,14 +57,14 @@ public final class DataBrokerImplModule extends dataBindingBroker = createStandAloneBroker(listeningExecutor); } dataBindingBroker.registerRuntimeBean(getRootRuntimeBeanRegistratorWrapper()); - + dataBindingBroker.setNotificationExecutor(SingletonHolder.getDefaultChangeEventExecutor()); return dataBindingBroker; } private BindingIndependentMappingService resolveMappingServiceDependency() { if(getMappingService() != null) { return getMappingServiceDependency(); } - + ServiceReference potentialMappingService = bundleContext.getServiceReference(BindingIndependentMappingService.class); if(potentialMappingService != null) { return bundleContext.getService(potentialMappingService); diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java index 291677a79a..a0bbb28d9e 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/codegen/impl/SingletonHolder.java @@ -29,6 +29,7 @@ public class SingletonHolder { public static final NotificationInvokerFactory INVOKER_FACTORY = RPC_GENERATOR_IMPL.getInvokerFactory(); private static ListeningExecutorService NOTIFICATION_EXECUTOR = null; private static ListeningExecutorService COMMIT_EXECUTOR = null; + private static ListeningExecutorService CHANGE_EVENT_EXECUTOR = null; public static synchronized final ListeningExecutorService getDefaultNotificationExecutor() { if (NOTIFICATION_EXECUTOR == null) { @@ -64,4 +65,21 @@ public class SingletonHolder { ExecutorService executor = Executors.newCachedThreadPool(factory); return MoreExecutors.listeningDecorator(executor); } + + public static ExecutorService getDefaultChangeEventExecutor() { + if (CHANGE_EVENT_EXECUTOR == null) { + ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("md-sal-binding-change-%d").build(); + /* + * FIXME: this used to be newCacheThreadPool(), but MD-SAL does not have transaction + * ordering guarantees, which means that using a concurrent threadpool results + * in application data being committed in random order, potentially resulting + * in inconsistent data being present. Once proper primitives are introduced, + * concurrency can be reintroduced. + */ + ExecutorService executor = Executors.newSingleThreadExecutor(factory); + CHANGE_EVENT_EXECUTOR = MoreExecutors.listeningDecorator(executor); + } + + return CHANGE_EVENT_EXECUTOR; + } } diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.java index ddf67719dc..16d5a24cb5 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/DataBrokerImpl.java @@ -7,7 +7,8 @@ */ package org.opendaylight.controller.sal.binding.impl; -import java.util.Set; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; @@ -19,12 +20,46 @@ import org.opendaylight.controller.sal.common.DataStoreIdentifier; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.DataRoot; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.util.DataObjectReadingUtil; import org.opendaylight.yangtools.yang.common.RpcResult; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; +import com.google.common.collect.Maps; public class DataBrokerImpl extends AbstractDataBroker, DataObject, DataChangeListener> // implements DataProviderService, AutoCloseable { + private final static class ContainsWildcarded implements Predicate> { + + private final InstanceIdentifier key; + + public ContainsWildcarded(InstanceIdentifier key) { + this.key = key; + } + + @Override + public boolean apply(InstanceIdentifier input) { + return key.containsWildcarded(input); + } + } + + private final static class IsContainedWildcarded implements Predicate> { + + private final InstanceIdentifier key; + + public IsContainedWildcarded(InstanceIdentifier key) { + this.key = key; + } + + @Override + public boolean apply(InstanceIdentifier input) { + return input.containsWildcarded(key); + } + } + private final AtomicLong nextTransaction = new AtomicLong(); private final AtomicLong createdTransactionsCount = new AtomicLong(); @@ -110,16 +145,33 @@ public class DataBrokerImpl extends AbstractDataBroker key, - Set> paths) { - if (paths.contains(key)) { - return true; - } - for (InstanceIdentifier path : paths) { - if (key.containsWildcarded(path)) { - return true; + protected Predicate> createContainsPredicate(final + InstanceIdentifier key) { + return new ContainsWildcarded(key); + } + + @Override + protected Predicate> createIsContainedPredicate(final + InstanceIdentifier key) { + return new IsContainedWildcarded(key); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + protected Map, DataObject> deepGetBySubpath( + Map, DataObject> dataSet, + InstanceIdentifier path) { + Builder, DataObject> builder = ImmutableMap.builder(); + Map, DataObject> potential = Maps.filterKeys(dataSet, createIsContainedPredicate(path)); + for(Entry, DataObject> entry : potential.entrySet()) { + try { + builder.putAll(DataObjectReadingUtil.readData(entry.getValue(),(InstanceIdentifier)entry.getKey(),path)); + } catch (Exception e) { + // FIXME : Log exception; } } - return false; - } + return builder.build(); + + } + } diff --git a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java index 5630664a67..5b947a5922 100644 --- a/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java +++ b/opendaylight/md-sal/sal-binding-broker/src/main/java/org/opendaylight/controller/sal/binding/impl/connect/dom/BindingIndependentConnector.java @@ -45,8 +45,6 @@ import org.opendaylight.controller.sal.binding.api.data.DataProviderService; import org.opendaylight.controller.sal.binding.api.data.RuntimeDataProvider; import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier; import org.opendaylight.controller.sal.binding.api.rpc.RpcRouter; -import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; -import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException; import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl; import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.GlobalRpcRegistrationListener; import org.opendaylight.controller.sal.binding.impl.RpcProviderRegistryImpl.RouterInstantiationListener; @@ -79,6 +77,8 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; import org.opendaylight.yangtools.yang.data.api.Node; import org.opendaylight.yangtools.yang.data.impl.ImmutableCompositeNode; +import org.opendaylight.yangtools.yang.data.impl.codec.BindingIndependentMappingService; +import org.opendaylight.yangtools.yang.data.impl.codec.DeserializationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,11 +113,11 @@ public class BindingIndependentConnector implements // private DataProviderService baDataService; - private ConcurrentMap domOpenedTransactions = new ConcurrentHashMap<>(); - private ConcurrentMap bindingOpenedTransactions = new ConcurrentHashMap<>(); + private final ConcurrentMap domOpenedTransactions = new ConcurrentHashMap<>(); + private final ConcurrentMap bindingOpenedTransactions = new ConcurrentHashMap<>(); - private BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler(); - private DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler(); + private final BindingToDomCommitHandler bindingToDomCommitHandler = new BindingToDomCommitHandler(); + private final DomToBindingCommitHandler domToBindingCommitHandler = new DomToBindingCommitHandler(); private Registration, DataObject>> baCommitHandlerRegistration; @@ -130,7 +130,7 @@ public class BindingIndependentConnector implements // // private ListenerRegistration // bindingToDomRpcManager; - private Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() { + private final Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier> toDOMInstanceIdentifier = new Function, org.opendaylight.yangtools.yang.data.api.InstanceIdentifier>() { @Override public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier apply(InstanceIdentifier input) { @@ -413,8 +413,8 @@ public class BindingIndependentConnector implements // private class BindingToDomTransaction implements DataCommitTransaction, DataObject> { - private DataModificationTransaction backing; - private DataModification, DataObject> modification; + private final DataModificationTransaction backing; + private final DataModification, DataObject> modification; public BindingToDomTransaction(DataModificationTransaction backing, DataModification, DataObject> modification) { @@ -491,6 +491,7 @@ public class BindingIndependentConnector implements // // FIXME: do registration based on only active commit handlers. } + @Override public org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction requestCommit( DataModification domTransaction) { Object identifier = domTransaction.getIdentifier(); @@ -587,9 +588,9 @@ public class BindingIndependentConnector implements // private final Set supportedRpcs; private final WeakReference> rpcServiceType; - private Set registrations; - private Map strategiesByQName = new HashMap<>(); - private WeakHashMap strategiesByMethod = new WeakHashMap<>(); + private final Set registrations; + private final Map strategiesByQName = new HashMap<>(); + private final WeakHashMap strategiesByMethod = new WeakHashMap<>(); public DomToBindingRpcForwarder(Class service) { this.rpcServiceType = new WeakReference>(service); @@ -771,10 +772,10 @@ public class BindingIndependentConnector implements // private class DefaultInvocationStrategy extends RpcInvocationStrategy { @SuppressWarnings("rawtypes") - private WeakReference inputClass; + private final WeakReference inputClass; @SuppressWarnings("rawtypes") - private WeakReference outputClass; + private final WeakReference outputClass; @SuppressWarnings({ "rawtypes", "unchecked" }) public DefaultInvocationStrategy(QName rpc, Method targetMethod, Class outputClass, @@ -825,6 +826,7 @@ public class BindingIndependentConnector implements // super(rpc, targetMethod); } + @Override public RpcResult uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception { @SuppressWarnings("unchecked") Future> result = (Future>) targetMethod.invoke(rpcService); @@ -837,21 +839,21 @@ public class BindingIndependentConnector implements // return Futures.immediateFuture(null); } } - + private class NoOutputInvocationStrategy extends RpcInvocationStrategy { - + @SuppressWarnings("rawtypes") - private WeakReference inputClass; + private final WeakReference inputClass; @SuppressWarnings({ "rawtypes", "unchecked" }) - public NoOutputInvocationStrategy(QName rpc, Method targetMethod, + public NoOutputInvocationStrategy(QName rpc, Method targetMethod, Class inputClass) { super(rpc,targetMethod); this.inputClass = new WeakReference(inputClass); } - - + + @Override public RpcResult uncheckedInvoke(RpcService rpcService, CompositeNode domInput) throws Exception { DataContainer bindingInput = mappingService.dataObjectFromDataDom(inputClass.get(), domInput); @@ -902,12 +904,12 @@ public class BindingIndependentConnector implements // public void setDomNotificationService(NotificationPublishService domService) { this.domNotificationService = domService; } - + private class DomToBindingNotificationForwarder implements NotificationInterestListener, NotificationListener { - private ConcurrentMap>> notifications = new ConcurrentHashMap<>(); - private Set supportedNotifications = new HashSet<>(); - + private final ConcurrentMap>> notifications = new ConcurrentHashMap<>(); + private final Set supportedNotifications = new HashSet<>(); + @Override public Set getSupportedNotifications() { return Collections.unmodifiableSet(supportedNotifications); @@ -922,7 +924,7 @@ public class BindingIndependentConnector implements // if (potentialClass != null) { final DataContainer baNotification = mappingService.dataObjectFromDataDom(potentialClass, notification); - + if (baNotification instanceof Notification) { baNotifyService.publish((Notification) baNotification); } diff --git a/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/PutAugmentationTest.java b/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/PutAugmentationTest.java index 598743af90..90fa2be211 100644 --- a/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/PutAugmentationTest.java +++ b/opendaylight/md-sal/sal-binding-dom-it/src/test/java/org/opendaylight/controller/sal/binding/test/bugfix/PutAugmentationTest.java @@ -6,9 +6,11 @@ * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.sal.binding.test.bugfix; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.util.Collections; import java.util.Map; @@ -52,38 +54,42 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data private static final InstanceIdentifier NODES_INSTANCE_ID_BA = InstanceIdentifier.builder(Nodes.class) // .toInstance(); - private static final InstanceIdentifier NODE_INSTANCE_ID_BA = InstanceIdentifier// .builder(NODES_INSTANCE_ID_BA) // .child(Node.class, NODE_KEY).toInstance(); - private static final InstanceIdentifier SUPPORTED_ACTIONS_INSTANCE_ID_BA = InstanceIdentifier// .builder(NODES_INSTANCE_ID_BA) // .child(Node.class, NODE_KEY) // .augmentation(FlowCapableNode.class) // - .child(SupportedActions.class) - .toInstance(); + .child(SupportedActions.class).toInstance(); + private static final InstanceIdentifier ALL_FLOW_CAPABLE_NODES = InstanceIdentifier // + .builder(NODES_INSTANCE_ID_BA) // + .child(Node.class) // + .augmentation(FlowCapableNode.class) // + .build(); private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier NODE_INSTANCE_ID_BI = // org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() // .node(Nodes.QNAME) // .nodeWithKey(Node.QNAME, NODE_KEY_BI) // .toInstance(); - private static final QName SUPPORTED_ACTIONS_QNAME = QName.create(FlowCapableNode.QNAME, SupportedActions.QNAME.getLocalName()); - + private static final QName SUPPORTED_ACTIONS_QNAME = QName.create(FlowCapableNode.QNAME, + SupportedActions.QNAME.getLocalName()); private static final org.opendaylight.yangtools.yang.data.api.InstanceIdentifier SUPPORTED_ACTIONS_INSTANCE_ID_BI = // - org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() // - .node(Nodes.QNAME) // - .nodeWithKey(Node.QNAME, NODE_KEY_BI) // - .node(SUPPORTED_ACTIONS_QNAME) // - .toInstance(); - - private DataChangeEvent, DataObject> receivedChangeEvent; - + org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.builder() // + .node(Nodes.QNAME) // + .nodeWithKey(Node.QNAME, NODE_KEY_BI) // + .node(SUPPORTED_ACTIONS_QNAME) // + .toInstance(); + private static final InstanceIdentifier FLOW_AUGMENTATION_PATH = InstanceIdentifier // + .builder(NODE_INSTANCE_ID_BA) // + .augmentation(FlowCapableNode.class) // + .build(); + private DataChangeEvent, DataObject> lastReceivedChangeEvent; /** * Test for Bug 148 @@ -93,7 +99,8 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data @Test public void putNodeAndAugmentation() throws Exception { - baDataService.registerDataChangeListener(NODES_INSTANCE_ID_BA, this); + baDataService.registerDataChangeListener(ALL_FLOW_CAPABLE_NODES, this); + NodeBuilder nodeBuilder = new NodeBuilder(); nodeBuilder.setId(new NodeId(NODE_ID)); @@ -102,7 +109,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data baseTransaction.putOperationalData(NODE_INSTANCE_ID_BA, nodeBuilder.build()); RpcResult result = baseTransaction.commit().get(); assertEquals(TransactionStatus.COMMITED, result.getResult()); - assertNotNull(receivedChangeEvent); + Node node = (Node) baDataService.readOperationalData(NODE_INSTANCE_ID_BA); assertNotNull(node); assertEquals(NODE_KEY, node.getKey()); @@ -114,13 +121,16 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data fnub.setDescription("Description Foo"); fnub.setSoftware("JUnit emulated"); FlowCapableNode fnu = fnub.build(); - InstanceIdentifier augmentIdentifier = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance(); + InstanceIdentifier augmentIdentifier = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA) + .augmentation(FlowCapableNode.class).toInstance(); DataModificationTransaction augmentedTransaction = baDataService.beginTransaction(); augmentedTransaction.putOperationalData(augmentIdentifier, fnu); result = augmentedTransaction.commit().get(); assertEquals(TransactionStatus.COMMITED, result.getResult()); + assertNotNull(lastReceivedChangeEvent); + assertTrue(lastReceivedChangeEvent.getCreatedOperationalData().containsKey(FLOW_AUGMENTATION_PATH)); Node augmentedNode = (Node) baDataService.readOperationalData(NODE_INSTANCE_ID_BA); assertNotNull(node); @@ -131,11 +141,14 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data assertEquals(fnu.getDescription(), readedAugmentation.getDescription()); assertBindingIndependentVersion(NODE_INSTANCE_ID_BI); testNodeRemove(); + assertTrue(lastReceivedChangeEvent.getRemovedOperationalData().contains(FLOW_AUGMENTATION_PATH)); } @Test public void putNodeWithAugmentation() throws Exception { + baDataService.registerDataChangeListener(ALL_FLOW_CAPABLE_NODES, this); + NodeBuilder nodeBuilder = new NodeBuilder(); nodeBuilder.setId(new NodeId(NODE_ID)); nodeBuilder.setKey(NODE_KEY); @@ -151,23 +164,31 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data DataModificationTransaction baseTransaction = baDataService.beginTransaction(); baseTransaction.putOperationalData(NODE_INSTANCE_ID_BA, nodeBuilder.build()); RpcResult result = baseTransaction.commit().get(); + + assertNotNull(lastReceivedChangeEvent); + assertTrue(lastReceivedChangeEvent.getCreatedOperationalData().containsKey(FLOW_AUGMENTATION_PATH)); + lastReceivedChangeEvent = null; assertEquals(TransactionStatus.COMMITED, result.getResult()); - FlowCapableNode readedAugmentation = (FlowCapableNode) baDataService.readOperationalData(InstanceIdentifier.builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance()); + FlowCapableNode readedAugmentation = (FlowCapableNode) baDataService.readOperationalData(InstanceIdentifier + .builder(NODE_INSTANCE_ID_BA).augmentation(FlowCapableNode.class).toInstance()); assertNotNull(readedAugmentation); + assertEquals(fnu.getHardware(), readedAugmentation.getHardware()); testPutNodeConnectorWithAugmentation(); + lastReceivedChangeEvent = null; testNodeRemove(); - } + assertTrue(lastReceivedChangeEvent.getRemovedOperationalData().contains(FLOW_AUGMENTATION_PATH)); + } private void testPutNodeConnectorWithAugmentation() throws Exception { NodeConnectorKey ncKey = new NodeConnectorKey(new NodeConnectorId("test:0:0")); InstanceIdentifier ncPath = InstanceIdentifier.builder(NODE_INSTANCE_ID_BA) - .child(NodeConnector.class, ncKey).toInstance(); + .child(NodeConnector.class, ncKey).toInstance(); InstanceIdentifier ncAugmentPath = InstanceIdentifier.builder(ncPath) - .augmentation(FlowCapableNodeConnector.class).toInstance(); + .augmentation(FlowCapableNodeConnector.class).toInstance(); NodeConnectorBuilder nc = new NodeConnectorBuilder(); nc.setKey(ncKey); @@ -181,7 +202,8 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data RpcResult result = baseTransaction.commit().get(); assertEquals(TransactionStatus.COMMITED, result.getResult()); - FlowCapableNodeConnector readedAugmentation = (FlowCapableNodeConnector) baDataService.readOperationalData(ncAugmentPath); + FlowCapableNodeConnector readedAugmentation = (FlowCapableNodeConnector) baDataService + .readOperationalData(ncAugmentPath); assertNotNull(readedAugmentation); assertEquals(fncb.getName(), readedAugmentation.getName()); } @@ -196,7 +218,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data assertNull(node); } - private void verifyNodes(Nodes nodes,Node original) { + private void verifyNodes(Nodes nodes, Node original) { assertNotNull(nodes); assertNotNull(nodes.getNode()); assertEquals(1, nodes.getNode().size()); @@ -212,8 +234,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data } - private void assertBindingIndependentVersion( - org.opendaylight.yangtools.yang.data.api.InstanceIdentifier nodeId) { + private void assertBindingIndependentVersion(org.opendaylight.yangtools.yang.data.api.InstanceIdentifier nodeId) { CompositeNode node = biDataService.readOperationalData(nodeId); assertNotNull(node); } @@ -224,7 +245,7 @@ public class PutAugmentationTest extends AbstractDataServiceTest implements Data @Override public void onDataChanged(DataChangeEvent, DataObject> change) { - receivedChangeEvent = change; + lastReceivedChangeEvent = change; } } diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.java new file mode 100644 index 0000000000..bfffb594cb --- /dev/null +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.java @@ -0,0 +1,441 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.common.impl.service; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.eclipse.xtext.xbase.lib.Exceptions; +import org.opendaylight.controller.md.sal.common.api.RegistrationListener; +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher; +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration; +import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory; +import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService; +import org.opendaylight.controller.md.sal.common.api.data.DataReader; +import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter; +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; +import org.opendaylight.yangtools.concepts.CompositeObjectRegistration; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.concepts.Path; +import org.opendaylight.yangtools.concepts.Registration; +import org.opendaylight.yangtools.concepts.util.ListenerRegistry; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.MoreExecutors; + +public abstract class AbstractDataBroker

, D extends Object, DCL extends DataChangeListener> + implements DataModificationTransactionFactory, DataReader, DataChangePublisher, + DataProvisionService { + private final static Logger LOG = LoggerFactory.getLogger(AbstractDataBroker.class); + + private ExecutorService executor; + + public ExecutorService getExecutor() { + return this.executor; + } + + public void setExecutor(final ExecutorService executor) { + this.executor = executor; + } + + private ExecutorService notificationExecutor = MoreExecutors.sameThreadExecutor(); + + public ExecutorService getNotificationExecutor() { + return this.notificationExecutor; + } + + public void setNotificationExecutor(final ExecutorService notificationExecutor) { + this.notificationExecutor = notificationExecutor; + } + + private AbstractDataReadRouter dataReadRouter; + + private final AtomicLong submittedTransactionsCount = new AtomicLong(); + + private final AtomicLong failedTransactionsCount = new AtomicLong(); + + private final AtomicLong finishedTransactionsCount = new AtomicLong(); + + public AbstractDataReadRouter getDataReadRouter() { + return this.dataReadRouter; + } + + public void setDataReadRouter(final AbstractDataReadRouter dataReadRouter) { + this.dataReadRouter = dataReadRouter; + } + + public AtomicLong getSubmittedTransactionsCount() { + return this.submittedTransactionsCount; + } + + public AtomicLong getFailedTransactionsCount() { + return this.failedTransactionsCount; + } + + public AtomicLong getFinishedTransactionsCount() { + return this.finishedTransactionsCount; + } + + private final Multimap> listeners = Multimaps + .synchronizedSetMultimap(HashMultimap.> create()); + + private final Multimap> commitHandlers = Multimaps + .synchronizedSetMultimap(HashMultimap.> create()); + + private final Lock registrationLock = new ReentrantLock(); + + private final ListenerRegistry>> commitHandlerRegistrationListeners = new ListenerRegistry>>(); + + public AbstractDataBroker() { + } + + protected ImmutableList> affectedCommitHandlers(final Set

paths) { + final Callable>> _function = new Callable>>() { + @Override + public ImmutableList> call() throws Exception { + Map>> _asMap = commitHandlers.asMap(); + Set>>> _entrySet = _asMap.entrySet(); + FluentIterable>>> _from = FluentIterable + .>>> from(_entrySet); + final Predicate>>> _function = new Predicate>>>() { + @Override + public boolean apply(final Entry>> it) { + P _key = it.getKey(); + boolean _isAffectedBy = isAffectedBy(_key, paths); + return _isAffectedBy; + } + }; + FluentIterable>>> _filter = _from + .filter(_function); + final Function>>, Collection>> _function_1 = new Function>>, Collection>>() { + @Override + public Collection> apply( + final Entry>> it) { + Collection> _value = it.getValue(); + return _value; + } + }; + FluentIterable> _transformAndConcat = _filter + .> transformAndConcat(_function_1); + final Function, DataCommitHandler> _function_2 = new Function, DataCommitHandler>() { + @Override + public DataCommitHandler apply(final DataCommitHandlerRegistrationImpl it) { + DataCommitHandler _instance = it.getInstance(); + return _instance; + } + }; + FluentIterable> _transform = _transformAndConcat + .> transform(_function_2); + return _transform.toList(); + } + }; + return AbstractDataBroker.>> withLock(this.registrationLock, _function); + } + + protected ImmutableList> probablyAffectedCommitHandlers(final HashSet

paths) { + final Callable>> _function = new Callable>>() { + @Override + public ImmutableList> call() throws Exception { + Map>> _asMap = commitHandlers.asMap(); + Set>>> _entrySet = _asMap.entrySet(); + FluentIterable>>> _from = FluentIterable + .>>> from(_entrySet); + final Predicate>>> _function = new Predicate>>>() { + @Override + public boolean apply(final Entry>> it) { + P _key = it.getKey(); + boolean _isProbablyAffectedBy = isProbablyAffectedBy(_key, paths); + return _isProbablyAffectedBy; + } + }; + FluentIterable>>> _filter = _from + .filter(_function); + final Function>>, Collection>> _function_1 = new Function>>, Collection>>() { + @Override + public Collection> apply( + final Entry>> it) { + Collection> _value = it.getValue(); + return _value; + } + }; + FluentIterable> _transformAndConcat = _filter + .> transformAndConcat(_function_1); + final Function, DataCommitHandler> _function_2 = new Function, DataCommitHandler>() { + @Override + public DataCommitHandler apply(final DataCommitHandlerRegistrationImpl it) { + DataCommitHandler _instance = it.getInstance(); + return _instance; + } + }; + FluentIterable> _transform = _transformAndConcat + .> transform(_function_2); + return _transform.toList(); + } + }; + return AbstractDataBroker.>> withLock(this.registrationLock, _function); + } + + protected Map deepGetBySubpath(final Map dataSet, final P path) { + return Collections. emptyMap(); + } + + @Override + public final D readConfigurationData(final P path) { + AbstractDataReadRouter _dataReadRouter = this.getDataReadRouter(); + return _dataReadRouter.readConfigurationData(path); + } + + @Override + public final D readOperationalData(final P path) { + AbstractDataReadRouter _dataReadRouter = this.getDataReadRouter(); + return _dataReadRouter.readOperationalData(path); + } + + private static T withLock(final Lock lock, final Callable method) { + lock.lock(); + try { + return method.call(); + } catch (Exception e) { + throw Exceptions.sneakyThrow(e); + } finally { + lock.unlock(); + } + } + + @Override + public final Registration> registerCommitHandler(final P path, + final DataCommitHandler commitHandler) { + synchronized (commitHandler) { + final DataCommitHandlerRegistrationImpl registration = new DataCommitHandlerRegistrationImpl( + path, commitHandler, this); + commitHandlers.put(path, registration); + LOG.trace("Registering Commit Handler {} for path: {}", commitHandler, path); + for (final ListenerRegistration>> listener : commitHandlerRegistrationListeners) { + try { + listener.getInstance().onRegister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onRegister", listener.getInstance(), + e); + } + } + return registration; + } + } + + @Override + public final ListenerRegistration registerDataChangeListener(final P path, final DCL listener) { + synchronized (listeners) { + final DataChangeListenerRegistration reg = new DataChangeListenerRegistration(path, + listener, AbstractDataBroker.this); + listeners.put(path, reg); + final D initialConfig = getDataReadRouter().readConfigurationData(path); + final D initialOperational = getDataReadRouter().readOperationalData(path); + final DataChangeEvent event = createInitialListenerEvent(path, initialConfig, initialOperational); + listener.onDataChanged(event); + return reg; + } + } + + public final CompositeObjectRegistration> registerDataReader(final P path, + final DataReader reader) { + + final Registration> confReg = getDataReadRouter().registerConfigurationReader(path, reader); + final Registration> dataReg = getDataReadRouter().registerOperationalReader(path, reader); + return new CompositeObjectRegistration>(reader, Arrays.asList(confReg, dataReg)); + } + + @Override + public ListenerRegistration>> registerCommitHandlerListener( + final RegistrationListener> commitHandlerListener) { + final ListenerRegistration>> ret = this.commitHandlerRegistrationListeners + .register(commitHandlerListener); + return ret; + } + + protected DataChangeEvent createInitialListenerEvent(final P path, final D initialConfig, + final D initialOperational) { + InitialDataChangeEventImpl _initialDataChangeEventImpl = new InitialDataChangeEventImpl( + initialConfig, initialOperational); + return _initialDataChangeEventImpl; + } + + protected final void removeListener(final DataChangeListenerRegistration registration) { + synchronized (listeners) { + listeners.remove(registration.getPath(), registration); + } + } + + protected final void removeCommitHandler(final DataCommitHandlerRegistrationImpl registration) { + synchronized (commitHandlers) { + + commitHandlers.remove(registration.getPath(), registration); + LOG.trace("Removing Commit Handler {} for path: {}", registration.getInstance(), registration.getPath()); + for (final ListenerRegistration>> listener : commitHandlerRegistrationListeners) { + try { + listener.getInstance().onUnregister(registration); + } catch (Exception e) { + LOG.error("Unexpected exception in listener {} during invoking onUnregister", + listener.getInstance(), e); + } + } + } + + } + + protected final Collection>> getActiveCommitHandlers() { + return commitHandlers.entries(); + } + + protected ImmutableList> affectedListeners(final Set

paths) { + + synchronized (listeners) { + return FluentIterable // + .from(listeners.asMap().entrySet()) // + .filter(new Predicate>>>() { + @Override + public boolean apply(final Entry>> it) { + return isAffectedBy(it.getKey(), paths); + } + }) // + .transform( + new Function>>, ListenerStateCapture>() { + @Override + public ListenerStateCapture apply( + final Entry>> it) { + return new ListenerStateCapture(it.getKey(), it.getValue(), + createContainsPredicate(it.getKey())); + } + }) // + .toList(); + } + } + + protected ImmutableList> probablyAffectedListeners(final Set

paths) { + synchronized (listeners) { + return FluentIterable // + .from(listeners.asMap().entrySet()) // + .filter(new Predicate>>>() { + @Override + public boolean apply(final Entry>> it) { + return isProbablyAffectedBy(it.getKey(), paths); + } + }) // + .transform( + new Function>>, ListenerStateCapture>() { + @Override + public ListenerStateCapture apply( + final Entry>> it) { + return new ListenerStateCapture(it.getKey(), it.getValue(), + createIsContainedPredicate(it.getKey())); + } + }) // + .toList(); + } + } + + protected Predicate

createContainsPredicate(final P key) { + return new Predicate

() { + @Override + public boolean apply(final P other) { + return key.contains(other); + } + }; + } + + protected Predicate

createIsContainedPredicate(final P key) { + return new Predicate

() { + @Override + public boolean apply(final P other) { + return other.contains(key); + } + }; + } + + protected boolean isAffectedBy(final P key, final Set

paths) { + final Predicate

contains = this.createContainsPredicate(key); + if (paths.contains(key)) { + return true; + } + for (final P path : paths) { + if (contains.apply(path)) { + return true; + } + } + return false; + } + + protected boolean isProbablyAffectedBy(final P key, final Set

paths) { + final Predicate

isContained = this.createIsContainedPredicate(key); + for (final P path : paths) { + if (isContained.apply(path)) { + return true; + } + } + return false; + } + + final Future> commit(final AbstractDataTransaction transaction) { + Preconditions.checkNotNull(transaction); + transaction.changeStatus(TransactionStatus.SUBMITED); + final TwoPhaseCommit task = new TwoPhaseCommit(transaction, this); + ; + this.getSubmittedTransactionsCount().getAndIncrement(); + return this.getExecutor().submit(task); + } + + private static class DataCommitHandlerRegistrationImpl

, D extends Object> // + extends AbstractObjectRegistration> // + implements DataCommitHandlerRegistration { + + private AbstractDataBroker dataBroker; + private final P path; + + @Override + public P getPath() { + return this.path; + } + + public DataCommitHandlerRegistrationImpl(final P path, final DataCommitHandler instance, + final AbstractDataBroker broker) { + super(instance); + this.dataBroker = broker; + this.path = path; + } + + @Override + protected void removeRegistration() { + this.dataBroker.removeCommitHandler(this); + this.dataBroker = null; + } + } +} diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend deleted file mode 100644 index 7c6f52f110..0000000000 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataBroker.xtend +++ /dev/null @@ -1,443 +0,0 @@ -/* - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.md.sal.common.impl.service - -import com.google.common.collect.FluentIterable -import com.google.common.collect.HashMultimap -import com.google.common.collect.ImmutableList -import com.google.common.collect.Multimap -import java.util.ArrayList -import java.util.Arrays -import java.util.Collection -import java.util.Collections -import java.util.HashSet -import java.util.List -import java.util.Set -import java.util.concurrent.Callable -import java.util.concurrent.ExecutorService -import java.util.concurrent.Future -import java.util.concurrent.atomic.AtomicLong -import org.opendaylight.controller.md.sal.common.api.RegistrationListener -import org.opendaylight.controller.md.sal.common.api.TransactionStatus -import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener -import org.opendaylight.controller.md.sal.common.api.data.DataChangePublisher -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction -import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandlerRegistration -import org.opendaylight.controller.md.sal.common.api.data.DataModificationTransactionFactory -import org.opendaylight.controller.md.sal.common.api.data.DataProvisionService -import org.opendaylight.controller.md.sal.common.api.data.DataReader -import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification -import org.opendaylight.controller.md.sal.common.impl.routing.AbstractDataReadRouter -import org.opendaylight.controller.sal.common.util.Rpcs -import org.opendaylight.yangtools.concepts.AbstractObjectRegistration -import org.opendaylight.yangtools.concepts.CompositeObjectRegistration -import org.opendaylight.yangtools.concepts.ListenerRegistration -import org.opendaylight.yangtools.concepts.Path -import org.opendaylight.yangtools.concepts.util.ListenerRegistry -import org.opendaylight.yangtools.yang.common.RpcResult -import org.slf4j.LoggerFactory - -import static com.google.common.base.Preconditions.* import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent -import com.google.common.collect.Multimaps -import java.util.concurrent.locks.Lock -import java.util.concurrent.locks.ReentrantLock - -abstract class AbstractDataBroker

, D, DCL extends DataChangeListener> implements DataModificationTransactionFactory, // -DataReader, // -DataChangePublisher, // -DataProvisionService { - - private static val LOG = LoggerFactory.getLogger(AbstractDataBroker); - - @Property - var ExecutorService executor; - - @Property - var AbstractDataReadRouter dataReadRouter; - - @Property - private val AtomicLong submittedTransactionsCount = new AtomicLong; - - @Property - private val AtomicLong failedTransactionsCount = new AtomicLong - - @Property - private val AtomicLong finishedTransactionsCount = new AtomicLong - - Multimap> listeners = Multimaps.synchronizedSetMultimap(HashMultimap.create()); - Multimap> commitHandlers = Multimaps.synchronizedSetMultimap(HashMultimap.create()); - - private val Lock registrationLock = new ReentrantLock; - - val ListenerRegistry>> commitHandlerRegistrationListeners = new ListenerRegistry(); - public new() { - } - - protected def /*Iterator>,D>>*/ affectedCommitHandlers( - HashSet

paths) { - return withLock(registrationLock) [| - return FluentIterable.from(commitHandlers.asMap.entrySet).filter[key.isAffectedBy(paths)] // - .transformAndConcat[value] // - .transform[instance].toList() - ] - } - - override final readConfigurationData(P path) { - return dataReadRouter.readConfigurationData(path); - } - - override final readOperationalData(P path) { - return dataReadRouter.readOperationalData(path); - } - - private static def withLock(Lock lock,Callable method) { - lock.lock - try { - return method.call - } finally { - lock.unlock - } - } - - override final registerCommitHandler(P path, DataCommitHandler commitHandler) { - return withLock(registrationLock) [| - val registration = new DataCommitHandlerRegistrationImpl(path, commitHandler, this); - commitHandlers.put(path, registration) - LOG.trace("Registering Commit Handler {} for path: {}",commitHandler,path); - for(listener : commitHandlerRegistrationListeners) { - try { - listener.instance.onRegister(registration); - } catch (Exception e) { - LOG.error("Unexpected exception in listener {} during invoking onRegister",listener.instance,e); - } - } - return registration; - ] - } - - override final def registerDataChangeListener(P path, DCL listener) { - return withLock(registrationLock) [| - val reg = new DataChangeListenerRegistration(path, listener, this); - listeners.put(path, reg); - val initialConfig = dataReadRouter.readConfigurationData(path); - val initialOperational = dataReadRouter.readOperationalData(path); - val event = createInitialListenerEvent(path,initialConfig,initialOperational); - listener.onDataChanged(event); - return reg; - ] - } - - final def registerDataReader(P path, DataReader reader) { - return withLock(registrationLock) [| - val confReg = dataReadRouter.registerConfigurationReader(path, reader); - val dataReg = dataReadRouter.registerOperationalReader(path, reader); - - return new CompositeObjectRegistration(reader, Arrays.asList(confReg, dataReg)); - ] - } - - override registerCommitHandlerListener(RegistrationListener> commitHandlerListener) { - val ret = commitHandlerRegistrationListeners.register(commitHandlerListener); - return ret; - } - - protected def DataChangeEvent createInitialListenerEvent(P path,D initialConfig,D initialOperational) { - return new InitialDataChangeEventImpl(initialConfig,initialOperational); - - } - - protected final def removeListener(DataChangeListenerRegistration registration) { - return withLock(registrationLock) [| - listeners.remove(registration.path, registration); - ] - } - - protected final def removeCommitHandler(DataCommitHandlerRegistrationImpl registration) { - return withLock(registrationLock) [| - commitHandlers.remove(registration.path, registration); - LOG.trace("Removing Commit Handler {} for path: {}",registration.instance,registration.path); - for(listener : commitHandlerRegistrationListeners) { - try { - listener.instance.onUnregister(registration); - } catch (Exception e) { - LOG.error("Unexpected exception in listener {} during invoking onUnregister",listener.instance,e); - } - } - return null; - ] - } - - protected final def getActiveCommitHandlers() { - return commitHandlers.entries; - } - - protected def /*Iterator>,D>>*/ affectedListenersWithInitialState( - HashSet

paths) { - return withLock(registrationLock) [| - return FluentIterable.from(listeners.asMap.entrySet).filter[key.isAffectedBy(paths)].transform [ - val operationalState = readOperationalData(key) - val configurationState = readConfigurationData(key) - return new ListenerStateCapture(key, value, operationalState, configurationState) - ].toList() - ] - } - - protected def boolean isAffectedBy(P key, Set

paths) { - if (paths.contains(key)) { - return true; - } - for (path : paths) { - if (key.contains(path)) { - return true; - } - } - - return false; - } - - package final def Future> commit(AbstractDataTransaction transaction) { - checkNotNull(transaction); - transaction.changeStatus(TransactionStatus.SUBMITED); - val task = new TwoPhaseCommit(transaction, this); - submittedTransactionsCount.andIncrement; - return executor.submit(task); - } - -} - -@Data -package class ListenerStateCapture

, D, DCL extends DataChangeListener> { - - @Property - P path; - - @Property - Collection> listeners; - - @Property - D initialOperationalState; - - @Property - D initialConfigurationState; -} - -package class DataChangeListenerRegistration

, D, DCL extends DataChangeListener> extends AbstractObjectRegistration implements ListenerRegistration { - - AbstractDataBroker dataBroker; - - @Property - val P path; - - new(P path, DCL instance, AbstractDataBroker broker) { - super(instance) - dataBroker = broker; - _path = path; - } - - override protected removeRegistration() { - dataBroker.removeListener(this); - dataBroker = null; - } - -} - -package class DataCommitHandlerRegistrationImpl

, D> // -extends AbstractObjectRegistration> // -implements DataCommitHandlerRegistration { - - AbstractDataBroker dataBroker; - - @Property - val P path; - - new(P path, DataCommitHandler instance, AbstractDataBroker broker) { - super(instance) - dataBroker = broker; - _path = path; - } - - override protected removeRegistration() { - dataBroker.removeCommitHandler(this); - dataBroker = null; - } -} - -package class TwoPhaseCommit

, D, DCL extends DataChangeListener> implements Callable> { - - private static val log = LoggerFactory.getLogger(TwoPhaseCommit); - - val AbstractDataTransaction transaction; - val AbstractDataBroker dataBroker; - - new(AbstractDataTransaction transaction, AbstractDataBroker broker) { - this.transaction = transaction; - this.dataBroker = broker; - } - - override call() throws Exception { - - // get affected paths - val affectedPaths = new HashSet

(); - - affectedPaths.addAll(transaction.createdConfigurationData.keySet); - affectedPaths.addAll(transaction.updatedConfigurationData.keySet); - affectedPaths.addAll(transaction.removedConfigurationData); - - affectedPaths.addAll(transaction.createdOperationalData.keySet); - affectedPaths.addAll(transaction.updatedOperationalData.keySet); - affectedPaths.addAll(transaction.removedOperationalData); - - val listeners = dataBroker.affectedListenersWithInitialState(affectedPaths); - - val transactionId = transaction.identifier; - - log.trace("Transaction: {} Started.",transactionId); - log.trace("Transaction: {} Affected Subtrees:",transactionId,affectedPaths); - // requesting commits - val Iterable> commitHandlers = dataBroker.affectedCommitHandlers(affectedPaths); - val List> handlerTransactions = new ArrayList(); - try { - for (handler : commitHandlers) { - handlerTransactions.add(handler.requestCommit(transaction)); - } - } catch (Exception e) { - log.error("Transaction: {} Request Commit failed", transactionId,e); - dataBroker.failedTransactionsCount.andIncrement - transaction.changeStatus(TransactionStatus.FAILED) - return rollback(handlerTransactions, e); - } - val List> results = new ArrayList(); - try { - for (subtransaction : handlerTransactions) { - results.add(subtransaction.finish()); - } - listeners.publishDataChangeEvent(); - } catch (Exception e) { - log.error("Transaction: {} Finish Commit failed",transactionId, e); - dataBroker.failedTransactionsCount.andIncrement - transaction.changeStatus(TransactionStatus.FAILED) - return rollback(handlerTransactions, e); - } - log.trace("Transaction: {} Finished successfully.",transactionId); - dataBroker.finishedTransactionsCount.andIncrement; - transaction.changeStatus(TransactionStatus.COMMITED) - return Rpcs.getRpcResult(true, TransactionStatus.COMMITED, Collections.emptySet()); - - } - - def void publishDataChangeEvent(ImmutableList> listeners) { - dataBroker.executor.submit [| - for (listenerSet : listeners) { - val updatedConfiguration = dataBroker.readConfigurationData(listenerSet.path); - val updatedOperational = dataBroker.readOperationalData(listenerSet.path); - - val changeEvent = new DataChangeEventImpl(transaction, listenerSet.initialConfigurationState, - listenerSet.initialOperationalState, updatedOperational, updatedConfiguration); - for (listener : listenerSet.listeners) { - try { - listener.instance.onDataChanged(changeEvent); - - } catch (Exception e) { - e.printStackTrace(); - } - } - } - ] - } - - def rollback(List> transactions, Exception e) { - for (transaction : transactions) { - transaction.rollback() - } - - // FIXME return encountered error. - return Rpcs.getRpcResult(false, TransactionStatus.FAILED, Collections.emptySet()); - } -} - -public abstract class AbstractDataTransaction

, D> extends AbstractDataModification { - - private static val LOG = LoggerFactory.getLogger(AbstractDataTransaction); - - @Property - private val Object identifier; - - var TransactionStatus status; - - var AbstractDataBroker broker; - - protected new(Object identifier,AbstractDataBroker dataBroker) { - super(dataBroker); - _identifier = identifier; - broker = dataBroker; - status = TransactionStatus.NEW; - LOG.debug("Transaction {} Allocated.", identifier); - - //listeners = new ListenerRegistry<>(); - } - - override commit() { - return broker.commit(this); - } - - override readConfigurationData(P path) { - val local = this.updatedConfigurationData.get(path); - if(local != null) { - return local; - } - - return broker.readConfigurationData(path); - } - - override readOperationalData(P path) { - val local = this.updatedOperationalData.get(path); - if(local != null) { - return local; - } - return broker.readOperationalData(path); - } - - override hashCode() { - return identifier.hashCode; - } - - override equals(Object obj) { - if (this === obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - val other = (obj as AbstractDataTransaction); - if (broker == null) { - if (other.broker != null) - return false; - } else if (!broker.equals(other.broker)) - return false; - if (identifier == null) { - if (other.identifier != null) - return false; - } else if (!identifier.equals(other.identifier)) - return false; - return true; - } - - override TransactionStatus getStatus() { - return status; - } - - protected abstract def void onStatusChange(TransactionStatus status); - - public def changeStatus(TransactionStatus status) { - LOG.debug("Transaction {} transitioned from {} to {}", identifier, this.status, status); - this.status = status; - onStatusChange(status); - } - -} diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataTransaction.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataTransaction.java new file mode 100644 index 0000000000..c73a627799 --- /dev/null +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/AbstractDataTransaction.java @@ -0,0 +1,108 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.common.impl.service; + +import java.util.concurrent.Future; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.impl.AbstractDataModification; +import org.opendaylight.yangtools.concepts.Path; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("all") +public abstract class AbstractDataTransaction

, D extends Object> extends + AbstractDataModification { + private final static Logger LOG = LoggerFactory.getLogger(AbstractDataTransaction.class); + + private final Object identifier; + + @Override + public Object getIdentifier() { + return this.identifier; + } + + private TransactionStatus status; + + private final AbstractDataBroker broker; + + protected AbstractDataTransaction(final Object identifier, + final AbstractDataBroker dataBroker) { + super(dataBroker); + this.identifier = identifier; + this.broker = dataBroker; + this.status = TransactionStatus.NEW; + AbstractDataTransaction.LOG.debug("Transaction {} Allocated.", identifier); + } + + @Override + public Future> commit() { + return this.broker.commit(this); + } + + @Override + public D readConfigurationData(final P path) { + final D local = getUpdatedConfigurationData().get(path); + if (local != null) { + return local; + } + return this.broker.readConfigurationData(path); + } + + @Override + public D readOperationalData(final P path) { + final D local = this.getUpdatedOperationalData().get(path); + if (local != null) { + return local; + } + return this.broker.readOperationalData(path); + } + + + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((identifier == null) ? 0 : identifier.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + AbstractDataTransaction other = (AbstractDataTransaction) obj; + if (identifier == null) { + if (other.identifier != null) + return false; + } else if (!identifier.equals(other.identifier)) + return false; + return true; + } + + @Override + public TransactionStatus getStatus() { + return this.status; + } + + protected abstract void onStatusChange(final TransactionStatus status); + + public void changeStatus(final TransactionStatus status) { + Object _identifier = this.getIdentifier(); + AbstractDataTransaction.LOG + .debug("Transaction {} transitioned from {} to {}", _identifier, this.status, status); + this.status = status; + this.onStatusChange(status); + } +} diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/DataChangeListenerRegistration.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/DataChangeListenerRegistration.java new file mode 100644 index 0000000000..57d511ecf2 --- /dev/null +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/DataChangeListenerRegistration.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.common.impl.service; + +import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener; +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.concepts.Path; + +@SuppressWarnings("all") +class DataChangeListenerRegistration

, D extends Object, DCL extends DataChangeListener> extends + AbstractObjectRegistration implements ListenerRegistration { + private AbstractDataBroker dataBroker; + + private final P path; + + public P getPath() { + return this.path; + } + + public DataChangeListenerRegistration(final P path, final DCL instance, final AbstractDataBroker broker) { + super(instance); + this.dataBroker = broker; + this.path = path; + } + + @Override + protected void removeRegistration() { + this.dataBroker.removeListener(this); + this.dataBroker = null; + } +} diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ImmutableDataChangeEvent.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ImmutableDataChangeEvent.java new file mode 100644 index 0000000000..776ff7bfb2 --- /dev/null +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ImmutableDataChangeEvent.java @@ -0,0 +1,226 @@ +package org.opendaylight.controller.md.sal.common.impl.service; + +import java.util.Map; +import java.util.Set; + +import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.DataModification; +import org.opendaylight.yangtools.concepts.Path; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; + +final class ImmutableDataChangeEvent

, D> implements DataChangeEvent { + + private final D updatedOperationalSubtree; + private final Map updatedOperational; + private final D updatedConfigurationSubtree; + private final Map updatedConfiguration; + private final Set

removedOperational; + private final Set

removedConfiguration; + private final D originalOperationalSubtree; + private final Map originalOperational; + private final D originalConfigurationSubtree; + private final Map originalConfiguration; + private final Map createdOperational; + private final Map createdConfiguration; + + + public ImmutableDataChangeEvent(Builder builder) { + + createdConfiguration = builder.getCreatedConfiguration().build(); + createdOperational = builder.getCreatedOperational().build(); + originalConfiguration = builder.getOriginalConfiguration().build(); + originalConfigurationSubtree = builder.getOriginalConfigurationSubtree(); + originalOperational = builder.getOriginalOperational().build(); + originalOperationalSubtree = builder.getOriginalOperationalSubtree(); + removedConfiguration = builder.getRemovedConfiguration().build(); + removedOperational = builder.getRemovedOperational().build(); + updatedConfiguration = builder.getUpdatedConfiguration().build(); + updatedConfigurationSubtree = builder.getUpdatedConfigurationSubtree(); + updatedOperational = builder.getUpdatedOperational().build(); + updatedOperationalSubtree = builder.getUpdatedOperationalSubtree(); + } + + @Override + public Map getCreatedConfigurationData() { + return createdConfiguration; + } + + @Override + public Map getCreatedOperationalData() { + return createdOperational; + } + + @Override + public Map getOriginalConfigurationData() { + return originalConfiguration; + } + @Override + public D getOriginalConfigurationSubtree() { + return originalConfigurationSubtree; + } + @Override + public Map getOriginalOperationalData() { + return originalOperational; + } + @Override + public D getOriginalOperationalSubtree() { + return originalOperationalSubtree; + } + @Override + public Set

getRemovedConfigurationData() { + return removedConfiguration; + } + @Override + public Set

getRemovedOperationalData() { + return removedOperational; + } + @Override + public Map getUpdatedConfigurationData() { + return updatedConfiguration; + } + @Override + public D getUpdatedConfigurationSubtree() { + return updatedConfigurationSubtree; + } + @Override + public Map getUpdatedOperationalData() { + return updatedOperational; + } + @Override + public D getUpdatedOperationalSubtree() { + return updatedOperationalSubtree; + } + + static final

,D> Builder builder() { + return new Builder<>(); + } + + static final class Builder

,D> { + + private D updatedOperationalSubtree; + private D originalOperationalSubtree; + private D originalConfigurationSubtree; + private D updatedConfigurationSubtree; + + private final ImmutableMap.Builder updatedOperational = ImmutableMap.builder(); + private final ImmutableMap.Builder updatedConfiguration = ImmutableMap.builder(); + private final ImmutableSet.Builder

removedOperational = ImmutableSet.builder(); + private final ImmutableSet.Builder

removedConfiguration = ImmutableSet.builder(); + private final ImmutableMap.Builder originalOperational = ImmutableMap.builder(); + + private final ImmutableMap.Builder originalConfiguration = ImmutableMap.builder(); + private final ImmutableMap.Builder createdOperational = ImmutableMap.builder(); + private final ImmutableMap.Builder createdConfiguration = ImmutableMap.builder(); + + + protected Builder addTransaction(DataModification data, Predicate

keyFilter) { + updatedOperational.putAll(Maps.filterKeys(data.getUpdatedOperationalData(), keyFilter)); + updatedConfiguration.putAll(Maps.filterKeys(data.getUpdatedConfigurationData(), keyFilter)); + originalConfiguration.putAll(Maps.filterKeys(data.getOriginalConfigurationData(), keyFilter)); + originalOperational.putAll(Maps.filterKeys(data.getOriginalOperationalData(), keyFilter)); + createdOperational.putAll(Maps.filterKeys(data.getCreatedOperationalData(), keyFilter)); + createdConfiguration.putAll(Maps.filterKeys(data.getCreatedConfigurationData(), keyFilter)); + return this; + } + + protected Builder addConfigurationChangeSet(RootedChangeSet changeSet) { + if(changeSet == null) { + return this; + } + + originalConfiguration.putAll(changeSet.getOriginal()); + createdConfiguration.putAll(changeSet.getCreated()); + updatedConfiguration.putAll(changeSet.getUpdated()); + removedConfiguration.addAll(changeSet.getRemoved()); + return this; + } + + protected Builder addOperationalChangeSet(RootedChangeSet changeSet) { + if(changeSet == null) { + return this; + } + originalOperational.putAll(changeSet.getOriginal()); + createdOperational.putAll(changeSet.getCreated()); + updatedOperational.putAll(changeSet.getUpdated()); + removedOperational.addAll(changeSet.getRemoved()); + return this; + } + + protected ImmutableDataChangeEvent build() { + return new ImmutableDataChangeEvent(this); + } + + protected D getUpdatedOperationalSubtree() { + return updatedOperationalSubtree; + } + + protected Builder setUpdatedOperationalSubtree(D updatedOperationalSubtree) { + this.updatedOperationalSubtree = updatedOperationalSubtree; + return this; + } + + protected D getOriginalOperationalSubtree() { + return originalOperationalSubtree; + } + + protected Builder setOriginalOperationalSubtree(D originalOperationalSubtree) { + this.originalOperationalSubtree = originalOperationalSubtree; + return this; + } + + protected D getOriginalConfigurationSubtree() { + return originalConfigurationSubtree; + } + + protected Builder setOriginalConfigurationSubtree(D originalConfigurationSubtree) { + this.originalConfigurationSubtree = originalConfigurationSubtree; + return this; + } + + protected D getUpdatedConfigurationSubtree() { + return updatedConfigurationSubtree; + } + + protected Builder setUpdatedConfigurationSubtree(D updatedConfigurationSubtree) { + this.updatedConfigurationSubtree = updatedConfigurationSubtree; + return this; + } + + protected ImmutableMap.Builder getUpdatedOperational() { + return updatedOperational; + } + + protected ImmutableMap.Builder getUpdatedConfiguration() { + return updatedConfiguration; + } + + protected ImmutableSet.Builder

getRemovedOperational() { + return removedOperational; + } + + protected ImmutableSet.Builder

getRemovedConfiguration() { + return removedConfiguration; + } + + protected ImmutableMap.Builder getOriginalOperational() { + return originalOperational; + } + + protected ImmutableMap.Builder getOriginalConfiguration() { + return originalConfiguration; + } + + protected ImmutableMap.Builder getCreatedOperational() { + return createdOperational; + } + + protected ImmutableMap.Builder getCreatedConfiguration() { + return createdConfiguration; + } + } + +} diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ListenerStateCapture.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ListenerStateCapture.java new file mode 100644 index 0000000000..502ca90ab9 --- /dev/null +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/ListenerStateCapture.java @@ -0,0 +1,118 @@ +package org.opendaylight.controller.md.sal.common.impl.service; + +import java.util.Map; + +import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.DataModification; +import org.opendaylight.yangtools.concepts.Path; + +import com.google.common.base.Predicate; + +public final class ListenerStateCapture

, D, DCL extends DataChangeListener> { + + final P path; + + final Iterable> listeners; + + D initialOperationalState; + + D initialConfigurationState; + + D finalConfigurationState; + + D finalOperationalState; + + Map additionalConfigOriginal; + Map additionalConfigCreated; + Map additionalConfigUpdated; + Map additionalConfigDeleted; + + Map additionalOperOriginal; + Map additionalOperCreated; + Map additionalOperUpdated; + Map additionalOperDeleted; + + RootedChangeSet normalizedConfigurationChanges; + RootedChangeSet normalizedOperationalChanges; + + private final Predicate

containsPredicate; + + public ListenerStateCapture(P path, Iterable> listeners, + Predicate

containsPredicate) { + super(); + this.path = path; + this.listeners = listeners; + this.containsPredicate = containsPredicate; + } + + protected D getInitialOperationalState() { + return initialOperationalState; + } + + protected void setInitialOperationalState(D initialOperationalState) { + this.initialOperationalState = initialOperationalState; + } + + protected D getInitialConfigurationState() { + return initialConfigurationState; + } + + protected void setInitialConfigurationState(D initialConfigurationState) { + this.initialConfigurationState = initialConfigurationState; + } + + protected P getPath() { + return path; + } + + protected Iterable> getListeners() { + return listeners; + } + + protected D getFinalConfigurationState() { + return finalConfigurationState; + } + + protected void setFinalConfigurationState(D finalConfigurationState) { + this.finalConfigurationState = finalConfigurationState; + } + + protected D getFinalOperationalState() { + return finalOperationalState; + } + + protected void setFinalOperationalState(D finalOperationalState) { + this.finalOperationalState = finalOperationalState; + } + + protected RootedChangeSet getNormalizedConfigurationChanges() { + return normalizedConfigurationChanges; + } + + protected void setNormalizedConfigurationChanges(RootedChangeSet normalizedConfigurationChanges) { + this.normalizedConfigurationChanges = normalizedConfigurationChanges; + } + + protected RootedChangeSet getNormalizedOperationalChanges() { + return normalizedOperationalChanges; + } + + protected void setNormalizedOperationalChanges(RootedChangeSet normalizedOperationalChange) { + this.normalizedOperationalChanges = normalizedOperationalChange; + } + + protected DataChangeEvent createEvent(DataModification modification) { + return ImmutableDataChangeEvent. builder()// + .addTransaction(modification, containsPredicate) // + .addConfigurationChangeSet(normalizedConfigurationChanges) // + .addOperationalChangeSet(normalizedOperationalChanges) // + .setOriginalConfigurationSubtree(initialConfigurationState) // + .setOriginalOperationalSubtree(initialOperationalState) // + .setUpdatedConfigurationSubtree(finalConfigurationState) // + .setUpdatedOperationalSubtree(finalOperationalState) // + .build(); + + } + +} diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/RootedChangeSet.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/RootedChangeSet.java new file mode 100644 index 0000000000..e0525657e5 --- /dev/null +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/RootedChangeSet.java @@ -0,0 +1,66 @@ +package org.opendaylight.controller.md.sal.common.impl.service; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.opendaylight.yangtools.concepts.Path; + +public class RootedChangeSet

,D> { + + private final P root; + private final Map original; + private final Map created = new HashMap<>(); + private final Map updated = new HashMap<>(); + private final Set

removed = new HashSet<>(); + + + + public RootedChangeSet(P root,Map original) { + super(); + this.root = root; + this.original = original; + } + + protected P getRoot() { + return root; + } + + protected Map getOriginal() { + return original; + } + + protected Map getCreated() { + return created; + } + + protected Map getUpdated() { + return updated; + } + + protected Set

getRemoved() { + return removed; + } + + public void addCreated(Map created) { + this.created.putAll(created); + } + + public void addCreated(Entry entry) { + created.put(entry.getKey(), entry.getValue()); + } + + public void addUpdated(Entry entry) { + updated.put(entry.getKey(), entry.getValue()); + } + + public void addRemoval(P path) { + removed.add(path); + } + + public boolean isChange() { + return !created.isEmpty() || !updated.isEmpty() || !removed.isEmpty(); + } +} diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/TwoPhaseCommit.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/TwoPhaseCommit.java new file mode 100644 index 0000000000..e99fc0f24c --- /dev/null +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/service/TwoPhaseCommit.java @@ -0,0 +1,237 @@ +/** + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.common.impl.service; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + +import org.opendaylight.controller.md.sal.common.api.TransactionStatus; +import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.DataChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler; +import org.opendaylight.controller.md.sal.common.api.data.DataCommitHandler.DataCommitTransaction; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.yangtools.concepts.Path; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class TwoPhaseCommit

, D extends Object, DCL extends DataChangeListener> implements + Callable> { + private final static Logger log = LoggerFactory.getLogger(TwoPhaseCommit.class); + + private final AbstractDataTransaction transaction; + + private final AbstractDataBroker dataBroker; + + public TwoPhaseCommit(final AbstractDataTransaction transaction, final AbstractDataBroker broker) { + this.transaction = transaction; + this.dataBroker = broker; + } + + @Override + public RpcResult call() throws Exception { + final Object transactionId = this.transaction.getIdentifier(); + + Set

changedPaths = ImmutableSet.

builder().addAll(transaction.getUpdatedConfigurationData().keySet()) + .addAll(transaction.getCreatedConfigurationData().keySet()) + .addAll(transaction.getRemovedConfigurationData()) + .addAll(transaction.getUpdatedOperationalData().keySet()) + .addAll(transaction.getCreatedOperationalData().keySet()) + .addAll(transaction.getRemovedOperationalData()).build(); + + log.trace("Transaction: {} Affected Subtrees:", transactionId, changedPaths); + + final ImmutableList.Builder> listenersBuilder = ImmutableList.builder(); + listenersBuilder.addAll(dataBroker.affectedListeners(changedPaths)); + filterProbablyAffectedListeners(dataBroker.probablyAffectedListeners(changedPaths),listenersBuilder); + + + + final ImmutableList> listeners = listenersBuilder.build(); + final Iterable> commitHandlers = dataBroker.affectedCommitHandlers(changedPaths); + captureInitialState(listeners); + + + log.trace("Transaction: {} Starting Request Commit.",transactionId); + final List> handlerTransactions = new ArrayList<>(); + try { + for (final DataCommitHandler handler : commitHandlers) { + DataCommitTransaction requestCommit = handler.requestCommit(this.transaction); + if (requestCommit != null) { + handlerTransactions.add(requestCommit); + } else { + log.debug("Transaction: {}, Handler {} is not participating in transaction.", transactionId, + handler); + } + } + } catch (Exception e) { + log.error("Transaction: {} Request Commit failed", transactionId, e); + dataBroker.getFailedTransactionsCount().getAndIncrement(); + this.transaction.changeStatus(TransactionStatus.FAILED); + return this.rollback(handlerTransactions, e); + + } + + log.trace("Transaction: {} Starting Finish.",transactionId); + final List> results = new ArrayList>(); + try { + for (final DataCommitTransaction subtransaction : handlerTransactions) { + results.add(subtransaction.finish()); + } + } catch (Exception e) { + log.error("Transaction: {} Finish Commit failed", transactionId, e); + dataBroker.getFailedTransactionsCount().getAndIncrement(); + transaction.changeStatus(TransactionStatus.FAILED); + return this.rollback(handlerTransactions, e); + } + + + dataBroker.getFinishedTransactionsCount().getAndIncrement(); + transaction.changeStatus(TransactionStatus.COMMITED); + + log.trace("Transaction: {} Finished successfully.", transactionId); + + captureFinalState(listeners); + + log.trace("Transaction: {} Notifying listeners."); + + publishDataChangeEvent(listeners); + return Rpcs. getRpcResult(true, TransactionStatus.COMMITED, + Collections. emptySet()); + } + + private void captureInitialState(ImmutableList> listeners) { + for (ListenerStateCapture state : listeners) { + state.setInitialConfigurationState(dataBroker.readConfigurationData(state.getPath())); + state.setInitialOperationalState(dataBroker.readOperationalData(state.getPath())); + } + } + + + private void captureFinalState(ImmutableList> listeners) { + for (ListenerStateCapture state : listeners) { + state.setFinalConfigurationState(dataBroker.readConfigurationData(state.getPath())); + state.setFinalOperationalState(dataBroker.readOperationalData(state.getPath())); + } + } + + private void filterProbablyAffectedListeners( + ImmutableList> probablyAffectedListeners, Builder> reallyAffected) { + + for(ListenerStateCapture listenerSet : probablyAffectedListeners) { + P affectedPath = listenerSet.getPath(); + Optional> configChange = resolveConfigChange(affectedPath); + Optional> operChange = resolveOperChange(affectedPath); + + if(configChange.isPresent() || operChange.isPresent()) { + reallyAffected.add(listenerSet); + if(configChange.isPresent()) { + listenerSet.setNormalizedConfigurationChanges(configChange.get()); + } + + if(operChange.isPresent()) { + listenerSet.setNormalizedOperationalChanges(operChange.get()); + } + } + } + } + + private Optional> resolveOperChange(P affectedPath) { + Map originalOper = dataBroker.deepGetBySubpath(transaction.getOriginalOperationalData(),affectedPath); + Map createdOper = dataBroker.deepGetBySubpath(transaction.getCreatedOperationalData(),affectedPath); + Map updatedOper = dataBroker.deepGetBySubpath(transaction.getUpdatedOperationalData(),affectedPath); + Set

removedOper = Sets.filter(transaction.getRemovedOperationalData(), dataBroker.createIsContainedPredicate(affectedPath)); + return resolveChanges(affectedPath,originalOper,createdOper,updatedOper,removedOper); + } + + private Optional> resolveConfigChange(P affectedPath) { + Map originalConfig = dataBroker.deepGetBySubpath(transaction.getOriginalConfigurationData(),affectedPath); + Map createdConfig = dataBroker.deepGetBySubpath(transaction.getCreatedConfigurationData(),affectedPath); + Map updatedConfig = dataBroker.deepGetBySubpath(transaction.getUpdatedConfigurationData(),affectedPath); + Set

removedConfig = Sets.filter(transaction.getRemovedConfigurationData(), dataBroker.createIsContainedPredicate(affectedPath)); + return resolveChanges(affectedPath,originalConfig,createdConfig,updatedConfig,removedConfig); + } + + private Optional> resolveChanges(P affectedPath, Map originalConfig, Map createdConfig, Map updatedConfig,Set

potentialDeletions) { + Predicate

isContained = dataBroker.createIsContainedPredicate(affectedPath); + + if(createdConfig.isEmpty() && updatedConfig.isEmpty() && potentialDeletions.isEmpty()) { + return Optional.absent(); + } + RootedChangeSet changeSet = new RootedChangeSet(affectedPath,originalConfig); + changeSet.addCreated(createdConfig); + + for(Entry entry : updatedConfig.entrySet()) { + if(originalConfig.containsKey(entry.getKey())) { + changeSet.addUpdated(entry); + } else { + changeSet.addCreated(entry); + } + } + + for(Entry entry : originalConfig.entrySet()) { + for(P deletion : potentialDeletions) { + if(isContained.apply(deletion)) { + changeSet.addRemoval(entry.getKey()); + } + } + } + + if(changeSet.isChange()) { + return Optional.of(changeSet); + } else { + return Optional.absent(); + } + + } + + public void publishDataChangeEvent(final ImmutableList> listeners) { + ExecutorService executor = this.dataBroker.getExecutor(); + final Runnable notifyTask = new Runnable() { + @Override + public void run() { + for (final ListenerStateCapture listenerSet : listeners) { + { + DataChangeEvent changeEvent = listenerSet.createEvent(transaction); + for (final DataChangeListenerRegistration listener : listenerSet.getListeners()) { + try { + listener.getInstance().onDataChanged(changeEvent); + } catch (Exception e) { + log.error("Unhandled exception when invoking listener {}", listener); + } + } + } + } + } + }; + executor.submit(notifyTask); + } + + public RpcResult rollback(final List> transactions, final Exception e) { + for (final DataCommitTransaction transaction : transactions) { + transaction.rollback(); + } + Set _emptySet = Collections. emptySet(); + return Rpcs. getRpcResult(false, TransactionStatus.FAILED, _emptySet); + } +} diff --git a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareDataStoreAdapter.java b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareDataStoreAdapter.java index b32d906d1e..b02a37c300 100644 --- a/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareDataStoreAdapter.java +++ b/opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareDataStoreAdapter.java @@ -30,6 +30,7 @@ import org.opendaylight.yangtools.yang.data.api.Node; import org.opendaylight.yangtools.yang.data.impl.CompositeNodeTOImpl; import org.opendaylight.yangtools.yang.model.api.DataSchemaNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; import org.opendaylight.yangtools.yang.model.api.SchemaServiceListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +42,7 @@ import com.google.common.collect.ImmutableSet; public class SchemaAwareDataStoreAdapter extends AbstractLockableDelegator implements // DataStore, // SchemaServiceListener, // + SchemaContextListener, // AutoCloseable { private final static Logger LOG = LoggerFactory.getLogger(SchemaAwareDataStoreAdapter.class);