LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+ SnapshotManager.this.currentState = CREATING;
+
try {
createSnapshotProcedure.apply(null);
} catch (Exception e) {
+ SnapshotManager.this.currentState = IDLE;
LOG.error("Error creating snapshot", e);
return false;
}
- SnapshotManager.this.currentState = CREATING;
return true;
}
import org.opendaylight.controller.config.api.DependencyResolver;
import org.opendaylight.controller.config.api.ModuleIdentifier;
-import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationServiceAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
import org.opendaylight.controller.sal.core.api.Broker;
public class BindingNotificationAdapterModule extends AbstractBindingNotificationAdapterModule {
- public BindingNotificationAdapterModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
+ public BindingNotificationAdapterModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public BindingNotificationAdapterModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.md.sal.binding.impl.BindingNotificationAdapterModule oldModule, java.lang.AutoCloseable oldInstance) {
+ public BindingNotificationAdapterModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver, final org.opendaylight.controller.config.yang.md.sal.binding.impl.BindingNotificationAdapterModule oldModule, final java.lang.AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
final BindingToNormalizedNodeCodec codec = getBindingMappingServiceDependency();
final Broker.ProviderSession session = getDomAsyncBrokerDependency().registerProvider(new DummyDOMProvider());
final DOMNotificationService notifService = session.getService(DOMNotificationService.class);
- return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), notifService, SingletonHolder.INVOKER_FACTORY);
+ return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), notifService);
}
}
*/
package org.opendaylight.controller.config.yang.md.sal.binding.impl;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import org.opendaylight.controller.md.sal.binding.compat.HydrogenNotificationBrokerImpl;
+
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.md.sal.binding.api.NotificationService;
+import org.opendaylight.controller.md.sal.binding.compat.HeliumNotificationProviderServiceAdapter;
import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
-import org.opendaylight.controller.sal.binding.impl.NotificationBrokerImpl;
/**
*
public final class NotificationBrokerImplModule extends
org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractNotificationBrokerImplModule {
- public NotificationBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
- org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ public NotificationBrokerImplModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+ final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public NotificationBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
- org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
- NotificationBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) {
+ public NotificationBrokerImplModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+ final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
+ final NotificationBrokerImplModule oldModule, final java.lang.AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
@Override
public java.lang.AutoCloseable createInstance() {
+ final NotificationPublishService notificationPublishService = getNotificationPublishAdapterDependency();
+ final NotificationService notificationService = getNotificationAdapterDependency();
+
+ if(notificationPublishService != null & notificationService != null) {
+ return new HeliumNotificationProviderServiceAdapter(notificationPublishService, notificationService);
+ }
+
/*
* FIXME: Switch to new broker (which has different threading model)
* once this change is communicated with downstream users or
* we will have adapter implementation which will honor Helium
* threading model for notifications.
*/
- ListeningExecutorService listeningExecutor = SingletonHolder.getDefaultNotificationExecutor();
- NotificationBrokerImpl broker = new NotificationBrokerImpl(listeningExecutor);
- return broker;
+
+ return new HydrogenNotificationBrokerImpl(SingletonHolder.getDefaultNotificationExecutor());
}
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.yangtools.yang.binding.Notification;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
+public class HydrogenNotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(HydrogenNotificationBrokerImpl.class);
private final ListenerRegistry<NotificationInterestListener> interestListeners =
ListenerRegistry.create();
private final AtomicReference<ListenerMapGeneration> listeners = new AtomicReference<>(new ListenerMapGeneration());
private final ExecutorService executor;
- public NotificationBrokerImpl(final ExecutorService executor) {
+ public HydrogenNotificationBrokerImpl(final ExecutorService executor) {
this.executor = Preconditions.checkNotNull(executor);
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import java.util.Arrays;
import java.util.Collection;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.slf4j.Logger;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMAdapterBuilder.Factory;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.md.sal.dom.api.DOMService;
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
private final BindingNormalizedNodeSerializer codec;
private final DOMNotificationService domNotifService;
- public BindingDOMNotificationServiceAdapter(final BindingNormalizedNodeSerializer codec, final DOMNotificationService domNotifService, final NotificationInvokerFactory notificationInvokerFactory) {
+ public BindingDOMNotificationServiceAdapter(final BindingNormalizedNodeSerializer codec, final DOMNotificationService domNotifService) {
this.codec = codec;
this.domNotifService = domNotifService;
}
protected NotificationService createInstance(final BindingToNormalizedNodeCodec codec,
final ClassToInstanceMap<DOMService> delegates) {
final DOMNotificationService domNotification = delegates.getInstance(DOMNotificationService.class);
- final NotificationInvokerFactory invokerFactory = SingletonHolder.INVOKER_FACTORY;
- return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), domNotification, invokerFactory);
+ return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), domNotification);
}
@Override
}
}
+ augment "/config:modules/config:module/config:configuration" {
+ case binding-notification-broker {
+ when "/config:modules/config:module/config:type = 'binding-notification-broker'";
+ container notification-adapter {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity binding-new-notification-service;
+ }
+ }
+ }
+
+ container notification-publish-adapter {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity binding-new-notification-publish-service;
+ }
+ }
+ }
+ }
+ }
+
augment "/config:modules/config:module/config:state" {
case binding-notification-broker {
when "/config:modules/config:module/config:type = 'binding-notification-broker'";
import org.opendaylight.controller.md.sal.dom.broker.impl.DOMNotificationRouter;
import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
import org.opendaylight.controller.sal.binding.test.util.MockSchemaService;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
}
public NotificationService createNotificationService() {
- return new BindingDOMNotificationServiceAdapter(bindingToNormalized.getCodecRegistry(), domNotificationRouter,
- SingletonHolder.INVOKER_FACTORY);
+ return new BindingDOMNotificationServiceAdapter(bindingToNormalized.getCodecRegistry(), domNotificationRouter);
}
public NotificationPublishService createNotificationPublishService() {
import javassist.ClassPool;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.md.sal.binding.api.NotificationService;
+import org.opendaylight.controller.md.sal.binding.compat.HeliumNotificationProviderServiceAdapter;
import org.opendaylight.controller.md.sal.binding.compat.HeliumRpcProviderRegistry;
import org.opendaylight.controller.md.sal.binding.compat.HydrogenDataBrokerAdapter;
import org.opendaylight.controller.md.sal.binding.compat.HydrogenMountProvisionServiceAdapter;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMDataBrokerAdapter;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMMountPointServiceAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationPublishServiceAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationServiceAdapter;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMRpcProviderServiceAdapter;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMRpcServiceAdapter;
import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMNotificationRouter;
import org.opendaylight.controller.md.sal.dom.broker.impl.DOMRpcRouter;
import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.broker.impl.mount.DOMMountPointServiceImpl;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
-import org.opendaylight.controller.sal.binding.impl.NotificationBrokerImpl;
import org.opendaylight.controller.sal.binding.impl.RootBindingAwareBroker;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.BrokerService;
private RootBindingAwareBroker baBrokerImpl;
- private NotificationBrokerImpl baNotifyImpl;
+ private HeliumNotificationProviderServiceAdapter baNotifyImpl;
private BrokerImpl biBrokerImpl;
private BindingDOMRpcProviderServiceAdapter baProviderRpc;
private DOMRpcRouter domRouter;
+ private NotificationPublishService publishService;
+
+ private NotificationService listenService;
+
+ private DOMNotificationPublishService domPublishService;
+
+ private DOMNotificationService domListenService;
+
public DOMDataBroker getDomAsyncDataBroker() {
public void startBindingNotificationBroker() {
checkState(executor != null);
- baNotifyImpl = new NotificationBrokerImpl(executor);
+ final DOMNotificationRouter router = DOMNotificationRouter.create(16);
+ domPublishService = router;
+ domListenService = router;
+ publishService = new BindingDOMNotificationPublishServiceAdapter(codec, domPublishService);
+ listenService = new BindingDOMNotificationServiceAdapter(codec, domListenService);
+ baNotifyImpl = new HeliumNotificationProviderServiceAdapter(publishService,listenService);
}
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-notification-broker</type>
<name>binding-notification-broker</name>
+ <notification-adapter xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-new-notification-service</type>
+ <name>binding-notification-adapter</name>
+ </notification-adapter>
+ <notification-publish-adapter xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-new-notification-publish-service</type>
+ <name>binding-notification-publish-adapter</name>
+ </notification-publish-adapter>
</module>
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:binding:impl">prefix:binding-broker-impl</type>
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">prefix:dom-inmemory-data-broker</type>
<name>inmemory-data-broker</name>
+
<schema-service>
<type xmlns:dom="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">dom:schema-service</type>
<name>yang-schema-service</name>
</schema-service>
+
+ <config-data-store>
+ <type xmlns:config-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:config-dom-store">config-dom-store-spi:config-dom-datastore</type>
+ <name>config-store-service</name>
+ </config-data-store>
+
+ <operational-data-store>
+ <type xmlns:operational-dom-store-spi="urn:opendaylight:params:xml:ns:yang:controller:md:sal:core:spi:operational-dom-store">operational-dom-store-spi:operational-dom-datastore</type>
+ <name>operational-store-service</name>
+ </operational-data-store>
</module>
<module>
<type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl">prefix:dom-broker-impl</type>
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractDOMBroker extends AbstractDOMTransactionFactory<DOMStore>
+ implements DOMDataBroker, AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractDOMBroker.class);
+
+ private final AtomicLong txNum = new AtomicLong();
+ private final AtomicLong chainNum = new AtomicLong();
+ private final Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> extensions;
+ private volatile AutoCloseable closeable;
+
+ protected AbstractDOMBroker(final Map<LogicalDatastoreType, DOMStore> datastores) {
+ super(datastores);
+
+ boolean treeChange = true;
+ for (DOMStore ds : datastores.values()) {
+ if (!(ds instanceof DOMStoreTreeChangePublisher)) {
+ treeChange = false;
+ break;
+ }
+ }
+
+ if (treeChange) {
+ extensions = ImmutableMap.<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension>of(DOMDataTreeChangeService.class, new DOMDataTreeChangeService() {
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerDataTreeChangeListener(final DOMDataTreeIdentifier treeId, final L listener) {
+ DOMStore publisher = getTxFactories().get(treeId.getDatastoreType());
+ checkState(publisher != null, "Requested logical data store is not available.");
+
+ return ((DOMStoreTreeChangePublisher) publisher).registerTreeChangeListener(treeId.getRootIdentifier(), listener);
+ }
+ });
+ } else {
+ extensions = Collections.emptyMap();
+ }
+ }
+
+ public void setCloseable(final AutoCloseable closeable) {
+ this.closeable = closeable;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+
+ if (closeable != null) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ LOG.debug("Error closing instance", e);
+ }
+ }
+ }
+
+ @Override
+ protected Object newTransactionIdentifier() {
+ return "DOM-" + txNum.getAndIncrement();
+ }
+
+ @Override
+ public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(final LogicalDatastoreType store,
+ final YangInstanceIdentifier path, final DOMDataChangeListener listener, final DataChangeScope triggeringScope) {
+
+ DOMStore potentialStore = getTxFactories().get(store);
+ checkState(potentialStore != null, "Requested logical data store is not available.");
+ return potentialStore.registerChangeListener(path, listener, triggeringScope);
+ }
+
+ @Override
+ public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
+ return extensions;
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(final TransactionChainListener listener) {
+ checkNotClosed();
+
+ final Map<LogicalDatastoreType, DOMStoreTransactionChain> backingChains = new EnumMap<>(LogicalDatastoreType.class);
+ for (Map.Entry<LogicalDatastoreType, DOMStore> entry : getTxFactories().entrySet()) {
+ backingChains.put(entry.getKey(), entry.getValue().createTransactionChain());
+ }
+
+ final long chainId = chainNum.getAndIncrement();
+ LOG.debug("Transaction chain {} created with listener {}, backing store chains {}", chainId, listener,
+ backingChains);
+ return new DOMBrokerTransactionChain(chainId, backingChains, this, listener);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public abstract class AbstractDOMBrokerTransaction<K, T extends DOMStoreTransaction> implements
+ AsyncTransaction<YangInstanceIdentifier, NormalizedNode<?, ?>> {
+
+ private Map<K, T> backingTxs;
+ private final Object identifier;
+ private final Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories;
+
+ /**
+ *
+ * Creates new composite Transactions.
+ *
+ * @param identifier
+ * Identifier of transaction.
+ */
+ protected AbstractDOMBrokerTransaction(final Object identifier, Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories) {
+ this.identifier = Preconditions.checkNotNull(identifier, "Identifier should not be null");
+ this.storeTxFactories = Preconditions.checkNotNull(storeTxFactories, "Store Transaction Factories should not be null");
+ this.backingTxs = new EnumMap(LogicalDatastoreType.class);
+ }
+
+ /**
+ * Returns subtransaction associated with supplied key.
+ *
+ * @param key
+ * @return
+ * @throws NullPointerException
+ * if key is null
+ * @throws IllegalArgumentException
+ * if no subtransaction is associated with key.
+ */
+ protected final T getSubtransaction(final K key) {
+ Preconditions.checkNotNull(key, "key must not be null.");
+
+ T ret = backingTxs.get(key);
+ if(ret == null){
+ ret = createTransaction(key);
+ backingTxs.put(key, ret);
+ }
+ Preconditions.checkArgument(ret != null, "No subtransaction associated with %s", key);
+ return ret;
+ }
+
+ protected abstract T createTransaction(final K key);
+
+ /**
+ * Returns immutable Iterable of all subtransactions.
+ *
+ */
+ protected Collection<T> getSubtransactions() {
+ return backingTxs.values();
+ }
+
+ @Override
+ public Object getIdentifier() {
+ return identifier;
+ }
+
+ protected void closeSubtransactions() {
+ /*
+ * We share one exception for all failures, which are added
+ * as supressedExceptions to it.
+ */
+ IllegalStateException failure = null;
+ for (T subtransaction : backingTxs.values()) {
+ try {
+ subtransaction.close();
+ } catch (Exception e) {
+ // If we did not allocated failure we allocate it
+ if (failure == null) {
+ failure = new IllegalStateException("Uncaught exception occured during closing transaction", e);
+ } else {
+ // We update it with additional exceptions, which occurred during error.
+ failure.addSuppressed(e);
+ }
+ }
+ }
+ // If we have failure, we throw it at after all attempts to close.
+ if (failure != null) {
+ throw failure;
+ }
+ }
+
+ protected DOMStoreTransactionFactory getTxFactory(K type){
+ return storeTxFactories.get(type);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Collection;
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+
+public abstract class AbstractDOMTransactionFactory<T extends DOMStoreTransactionFactory> implements AutoCloseable {
+ private static final AtomicIntegerFieldUpdater<AbstractDOMTransactionFactory> UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(AbstractDOMTransactionFactory.class, "closed");
+ private final Map<LogicalDatastoreType, T> storeTxFactories;
+ private volatile int closed = 0;
+
+ protected AbstractDOMTransactionFactory(final Map<LogicalDatastoreType, T> txFactories) {
+ this.storeTxFactories = new EnumMap<>(txFactories);
+ }
+
+ /**
+ * Implementations must return unique identifier for each and every call of
+ * this method;
+ *
+ * @return new Unique transaction identifier.
+ */
+ protected abstract Object newTransactionIdentifier();
+
+ /**
+ *
+ * @param transaction
+ * @param cohorts
+ * @return
+ */
+ protected abstract CheckedFuture<Void,TransactionCommitFailedException> submit(final DOMDataWriteTransaction transaction,
+ final Collection<DOMStoreThreePhaseCommitCohort> cohorts);
+
+ /**
+ *
+ * @return
+ */
+ public final DOMDataReadOnlyTransaction newReadOnlyTransaction() {
+ checkNotClosed();
+
+ return new DOMBrokerReadOnlyTransaction(newTransactionIdentifier(), storeTxFactories);
+ }
+
+
+ /**
+ *
+ * @return
+ */
+ public final DOMDataWriteTransaction newWriteOnlyTransaction() {
+ checkNotClosed();
+
+ return new DOMBrokerWriteOnlyTransaction(newTransactionIdentifier(), storeTxFactories, this);
+ }
+
+
+ /**
+ *
+ * @return
+ */
+ public final DOMDataReadWriteTransaction newReadWriteTransaction() {
+ checkNotClosed();
+
+ return new DOMBrokerReadWriteTransaction<>(newTransactionIdentifier(), storeTxFactories, this);
+ }
+
+ /**
+ * Convenience accessor of backing factories intended to be used only by
+ * finalization of this class.
+ *
+ * <b>Note:</b>
+ * Finalization of this class may want to access other functionality of
+ * supplied Transaction factories.
+ *
+ * @return Map of backing transaction factories.
+ */
+ protected final Map<LogicalDatastoreType, T> getTxFactories() {
+ return storeTxFactories;
+ }
+
+ /**
+ * Checks if instance is not closed.
+ *
+ * @throws IllegalStateException If instance of this class was closed.
+ *
+ */
+ protected final void checkNotClosed() {
+ Preconditions.checkState(closed == 0, "Transaction factory was closed. No further operations allowed.");
+ }
+
+ @Override
+ public void close() {
+ final boolean success = UPDATER.compareAndSet(this, 0, 1);
+ Preconditions.checkState(success, "Transaction factory was already closed");
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class DOMBrokerReadOnlyTransaction<T extends DOMStoreReadTransaction>
+ extends AbstractDOMBrokerTransaction<LogicalDatastoreType, T>
+ implements DOMDataReadOnlyTransaction {
+ /**
+ * Creates new composite Transactions.
+ *
+ * @param identifier Identifier of transaction.
+ */
+ protected DOMBrokerReadOnlyTransaction(Object identifier, Map storeTxFactories) {
+ super(identifier, storeTxFactories);
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+ final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ return getSubtransaction(store).read(path);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(
+ final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ return getSubtransaction(store).exists(path);
+ }
+
+ @Override
+ public void close() {
+ closeSubtransactions();
+ }
+
+ @Override
+ protected T createTransaction(LogicalDatastoreType key) {
+ return (T) getTxFactory(key).newReadOnlyTransaction();
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class DOMBrokerReadWriteTransaction<T extends DOMStoreReadWriteTransaction>
+ extends DOMBrokerWriteOnlyTransaction<DOMStoreReadWriteTransaction> implements DOMDataReadWriteTransaction {
+ /**
+ * Creates new composite Transactions.
+ *
+ * @param identifier Identifier of transaction.
+ * @param storeTxFactories
+ */
+ protected DOMBrokerReadWriteTransaction(Object identifier, Map<LogicalDatastoreType, ? extends DOMStoreTransactionFactory> storeTxFactories, final AbstractDOMTransactionFactory<?> commitImpl) {
+ super(identifier, storeTxFactories, commitImpl);
+ }
+
+ @Override
+ public CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> read(
+ final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ return getSubtransaction(store).read(path);
+ }
+
+ @Override
+ public CheckedFuture<Boolean, ReadFailedException> exists(
+ final LogicalDatastoreType store,
+ final YangInstanceIdentifier path) {
+ return getSubtransaction(store).exists(path);
+ }
+
+ @Override
+ protected DOMStoreReadWriteTransaction createTransaction(LogicalDatastoreType key) {
+ return getTxFactory(key).newReadWriteTransaction();
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DOMBrokerTransactionChain extends AbstractDOMTransactionFactory<DOMStoreTransactionChain>
+ implements DOMTransactionChain {
+ private static enum State {
+ RUNNING,
+ CLOSING,
+ CLOSED,
+ FAILED,
+ }
+
+ private static final AtomicIntegerFieldUpdater<DOMBrokerTransactionChain> COUNTER_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(DOMBrokerTransactionChain.class, "counter");
+ private static final AtomicReferenceFieldUpdater<DOMBrokerTransactionChain, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DOMBrokerTransactionChain.class, State.class, "state");
+ private static final Logger LOG = LoggerFactory.getLogger(DOMBrokerTransactionChain.class);
+ private final AtomicLong txNum = new AtomicLong();
+ private final AbstractDOMBroker broker;
+ private final TransactionChainListener listener;
+ private final long chainId;
+
+ private volatile State state = State.RUNNING;
+ private volatile int counter = 0;
+
+ /**
+ *
+ * @param chainId
+ * ID of transaction chain
+ * @param chains
+ * Backing {@link DOMStoreTransactionChain}s.
+ * @param listener
+ * Listener, which listens on transaction chain events.
+ * @throws NullPointerException
+ * If any of arguments is null.
+ */
+ public DOMBrokerTransactionChain(final long chainId,
+ final Map<LogicalDatastoreType, DOMStoreTransactionChain> chains,
+ AbstractDOMBroker broker, final TransactionChainListener listener) {
+ super(chains);
+ this.chainId = chainId;
+ this.broker = Preconditions.checkNotNull(broker);
+ this.listener = Preconditions.checkNotNull(listener);
+ }
+
+ private void checkNotFailed() {
+ Preconditions.checkState(state != State.FAILED, "Transaction chain has failed");
+ }
+
+ @Override
+ protected Object newTransactionIdentifier() {
+ return "DOM-CHAIN-" + chainId + "-" + txNum.getAndIncrement();
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit(
+ final DOMDataWriteTransaction transaction, final Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ checkNotFailed();
+ checkNotClosed();
+
+ final CheckedFuture<Void, TransactionCommitFailedException> ret = broker.submit(transaction, cohorts);
+
+ COUNTER_UPDATER.incrementAndGet(this);
+ Futures.addCallback(ret, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ transactionCompleted();
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ transactionFailed(transaction, t);
+ }
+ });
+
+ return ret;
+ }
+
+ @Override
+ public void close() {
+ final boolean success = STATE_UPDATER.compareAndSet(this, State.RUNNING, State.CLOSING);
+ if (!success) {
+ LOG.debug("Chain {} is no longer running", this);
+ return;
+ }
+
+ super.close();
+ for (DOMStoreTransactionChain subChain : getTxFactories().values()) {
+ subChain.close();
+ }
+
+ if (counter == 0) {
+ finishClose();
+ }
+ }
+
+ private void finishClose() {
+ state = State.CLOSED;
+ listener.onTransactionChainSuccessful(this);
+ }
+
+ private void transactionCompleted() {
+ if (COUNTER_UPDATER.decrementAndGet(this) == 0 && state == State.CLOSING) {
+ finishClose();
+ }
+ }
+
+ private void transactionFailed(final DOMDataWriteTransaction tx, final Throwable cause) {
+ state = State.FAILED;
+ LOG.debug("Transaction chain {}Â failed.", this, cause);
+ listener.onTransactionChainFailed(this, tx, cause);
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.common.impl.service.AbstractDataTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DOMBrokerWriteOnlyTransaction<T extends DOMStoreWriteTransaction>
+ extends AbstractDOMBrokerTransaction<LogicalDatastoreType, T> implements DOMDataWriteTransaction {
+
+ private static final AtomicReferenceFieldUpdater<DOMBrokerWriteOnlyTransaction, AbstractDOMTransactionFactory> IMPL_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DOMBrokerWriteOnlyTransaction.class, AbstractDOMTransactionFactory.class, "commitImpl");
+ @SuppressWarnings("rawtypes")
+ private static final AtomicReferenceFieldUpdater<DOMBrokerWriteOnlyTransaction, Future> FUTURE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(DOMBrokerWriteOnlyTransaction.class, Future.class, "commitFuture");
+ private static final Logger LOG = LoggerFactory.getLogger(DOMBrokerWriteOnlyTransaction.class);
+ private static final Future<?> CANCELLED_FUTURE = Futures.immediateCancelledFuture();
+
+ /**
+ * Implementation of real commit. It also acts as an indication that
+ * the transaction is running -- which we flip atomically using
+ * {@link #IMPL_UPDATER}.
+ */
+ private volatile AbstractDOMTransactionFactory<?> commitImpl;
+
+ /**
+ * Future task of transaction commit. It starts off as null, but is
+ * set appropriately on {@link #submit()} and {@link #cancel()} via
+ * {@link AtomicReferenceFieldUpdater#lazySet(Object, Object)}.
+ *
+ * Lazy set is safe for use because it is only referenced to in the
+ * {@link #cancel()} slow path, where we will busy-wait for it. The
+ * fast path gets the benefit of a store-store barrier instead of the
+ * usual store-load barrier.
+ */
+ private volatile Future<?> commitFuture;
+
+ protected DOMBrokerWriteOnlyTransaction(final Object identifier,
+ Map storeTxFactories, final AbstractDOMTransactionFactory<?> commitImpl) {
+ super(identifier, storeTxFactories);
+ this.commitImpl = Preconditions.checkNotNull(commitImpl, "commitImpl must not be null.");
+ }
+
+ @Override
+ protected T createTransaction(LogicalDatastoreType key) {
+ // FIXME : Casting shouldn't be necessary here
+ return (T) getTxFactory(key).newWriteOnlyTransaction();
+ }
+
+ @Override
+ public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ checkRunning(commitImpl);
+ getSubtransaction(store).write(path, data);
+ }
+
+ @Override
+ public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
+ checkRunning(commitImpl);
+ getSubtransaction(store).delete(path);
+ }
+
+ @Override
+ public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+ checkRunning(commitImpl);
+ getSubtransaction(store).merge(path, data);
+ }
+
+ @Override
+ public boolean cancel() {
+ final AbstractDOMTransactionFactory<?> impl = IMPL_UPDATER.getAndSet(this, null);
+ if (impl != null) {
+ LOG.trace("Transaction {} cancelled before submit", getIdentifier());
+ FUTURE_UPDATER.lazySet(this, CANCELLED_FUTURE);
+ closeSubtransactions();
+ return true;
+ }
+
+ // The transaction is in process of being submitted or cancelled. Busy-wait
+ // for the corresponding future.
+ Future<?> future;
+ do {
+ future = commitFuture;
+ } while (future == null);
+
+ return future.cancel(false);
+ }
+
+ @Deprecated
+ @Override
+ public ListenableFuture<RpcResult<TransactionStatus>> commit() {
+ return AbstractDataTransaction.convertToLegacyCommitFuture(submit());
+ }
+
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit() {
+ final AbstractDOMTransactionFactory<?> impl = IMPL_UPDATER.getAndSet(this, null);
+ checkRunning(impl);
+
+ final Collection<T> txns = getSubtransactions();
+ final Collection<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>(txns.size());
+
+ // FIXME: deal with errors thrown by backed (ready and submit can fail in theory)
+ for (DOMStoreWriteTransaction txn : txns) {
+ cohorts.add(txn.ready());
+ }
+
+ final CheckedFuture<Void, TransactionCommitFailedException> ret = impl.submit(this, cohorts);
+ FUTURE_UPDATER.lazySet(this, ret);
+ return ret;
+ }
+
+ private void checkRunning(final AbstractDOMTransactionFactory<?> impl) {
+ Preconditions.checkState(impl != null, "Transaction %s is no longer running", getIdentifier());
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+
+/**
+ * Abstract base class for our internal implementation of {@link DataTreeCandidateNode},
+ * which we instantiate from a serialized stream. We do not retain the before-image and
+ * do not implement {@link #getModifiedChild(PathArgument)}, as that method is only
+ * useful for end users. Instances based on this class should never be leaked outside of
+ * this component.
+ */
+abstract class AbstractDataTreeCandidateNode implements DataTreeCandidateNode {
+ private final ModificationType type;
+
+ protected AbstractDataTreeCandidateNode(final ModificationType type) {
+ this.type = Preconditions.checkNotNull(type);
+ }
+
+ @Override
+ public final DataTreeCandidateNode getModifiedChild(final PathArgument identifier) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public final ModificationType getModificationType() {
+ return type;
+ }
+
+ @Override
+ public final Optional<NormalizedNode<?, ?>> getDataBefore() {
+ throw new UnsupportedOperationException("Before-image not available after serialization");
+ }
+
+ static DataTreeCandidateNode createUnmodified() {
+ return new AbstractDataTreeCandidateNode(ModificationType.UNMODIFIED) {
+ @Override
+ public PathArgument getIdentifier() {
+ throw new UnsupportedOperationException("Root node does not have an identifier");
+ }
+
+ @Override
+ public Optional<NormalizedNode<?, ?>> getDataAfter() {
+ throw new UnsupportedOperationException("After-image not available after serialization");
+ }
+
+ @Override
+ public Collection<DataTreeCandidateNode> getChildNodes() {
+ throw new UnsupportedOperationException("Children not available after serialization");
+ }
+ };
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ChainedCommitCohort extends ShardDataTreeCohort {
+ private static final Logger LOG = LoggerFactory.getLogger(ChainedCommitCohort.class);
+ private final ReadWriteShardDataTreeTransaction transaction;
+ private final ShardDataTreeTransactionChain chain;
+ private final ShardDataTreeCohort delegate;
+
+ ChainedCommitCohort(final ShardDataTreeTransactionChain chain, final ReadWriteShardDataTreeTransaction transaction, final ShardDataTreeCohort delegate) {
+ this.transaction = Preconditions.checkNotNull(transaction);
+ this.delegate = Preconditions.checkNotNull(delegate);
+ this.chain = Preconditions.checkNotNull(chain);
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ final ListenableFuture<Void> ret = delegate.commit();
+
+ Futures.addCallback(ret, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(Void result) {
+ chain.clearTransaction(transaction);
+ LOG.debug("Committed transaction {}", transaction);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.error("Transaction {} commit failed, cannot recover", transaction, t);
+ }
+ });
+
+ return ret;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ return delegate.canCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ return delegate.preCommit();
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ return delegate.abort();
+ }
+
+ @Override
+ DataTreeCandidateTip getCandidate() {
+ return delegate.getCandidate();
+ }
+}
\ No newline at end of file
*/
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
import java.util.List;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* previous Tx's ready operations haven't completed yet.
*/
@Override
- protected Future<ActorSelection> sendFindPrimaryShardAsync(final String shardName) {
+ protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(final String shardName) {
// Check if there are any previous ready Futures, otherwise let the super class handle it.
if(previousReadyFutures.isEmpty()) {
return super.sendFindPrimaryShardAsync(shardName);
previousReadyFutures, getActorContext().getClientDispatcher());
// Add a callback for completion of the combined Futures.
- final Promise<ActorSelection> returnPromise = akka.dispatch.Futures.promise();
+ final Promise<PrimaryShardInfo> returnPromise = akka.dispatch.Futures.promise();
OnComplete<Iterable<Object>> onComplete = new OnComplete<Iterable<Object>>() {
@Override
public void onComplete(Throwable failure, Iterable<Object> notUsed) {
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.databroker.AbstractDOMBroker;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
-import org.opendaylight.controller.md.sal.dom.broker.impl.AbstractDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.broker.impl.TransactionCommitFailedExceptionMapper;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.LoggerFactory;
/**
- * Implementation of DOMDataCommitExecutor that coordinates transaction commits concurrently. The 3
+ * ConcurrentDOMDataBroker commits transactions concurrently. The 3
* commit phases (canCommit, preCommit, and commit) are performed serially and non-blocking
* (ie async) per transaction but multiple transaction commits can run concurrent.
*
* @author Thomas Pantelis
*/
-public class ConcurrentDOMDataBroker extends AbstractDOMDataBroker {
+public class ConcurrentDOMDataBroker extends AbstractDOMBroker {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentDOMDataBroker.class);
private static final String CAN_COMMIT = "CAN_COMMIT";
private static final String PRE_COMMIT = "PRE_COMMIT";
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteArrayDataInput;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.GeneratedMessage.GeneratedExtension;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader;
+import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages.AppendEntries;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DataTreeCandidatePayload extends Payload implements Externalizable {
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeCandidatePayload.class);
+ private static final long serialVersionUID = 1L;
+ private static final byte DELETE = 0;
+ private static final byte SUBTREE_MODIFIED = 1;
+ private static final byte UNMODIFIED = 2;
+ private static final byte WRITE = 3;
+
+ private transient byte[] serialized;
+
+ public DataTreeCandidatePayload() {
+ // Required by Externalizable
+ }
+
+ private DataTreeCandidatePayload(final byte[] serialized) {
+ this.serialized = Preconditions.checkNotNull(serialized);
+ }
+
+ private static void writeChildren(final NormalizedNodeOutputStreamWriter writer, final DataOutput out,
+ final Collection<DataTreeCandidateNode> children) throws IOException {
+ out.writeInt(children.size());
+ for (DataTreeCandidateNode child : children) {
+ writeNode(writer, out, child);
+ }
+ }
+
+ private static void writeNode(final NormalizedNodeOutputStreamWriter writer, final DataOutput out,
+ final DataTreeCandidateNode node) throws IOException {
+ switch (node.getModificationType()) {
+ case DELETE:
+ out.writeByte(DELETE);
+ writer.writePathArgument(node.getIdentifier());
+ break;
+ case SUBTREE_MODIFIED:
+ out.writeByte(SUBTREE_MODIFIED);
+ writer.writePathArgument(node.getIdentifier());
+ writeChildren(writer, out, node.getChildNodes());
+ break;
+ case WRITE:
+ out.writeByte(WRITE);
+ writer.writeNormalizedNode(node.getDataAfter().get());
+ break;
+ case UNMODIFIED:
+ throw new IllegalArgumentException("Unmodified candidate should never be in the payload");
+ default:
+ throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
+ }
+ }
+
+ static DataTreeCandidatePayload create(DataTreeCandidate candidate) {
+ final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+ try (final NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(out)) {
+ writer.writeYangInstanceIdentifier(candidate.getRootPath());
+
+ final DataTreeCandidateNode node = candidate.getRootNode();
+ switch (node.getModificationType()) {
+ case DELETE:
+ out.writeByte(DELETE);
+ break;
+ case SUBTREE_MODIFIED:
+ out.writeByte(SUBTREE_MODIFIED);
+ writeChildren(writer, out, node.getChildNodes());
+ break;
+ case UNMODIFIED:
+ out.writeByte(UNMODIFIED);
+ break;
+ case WRITE:
+ out.writeByte(WRITE);
+ writer.writeNormalizedNode(node.getDataAfter().get());
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled node type " + node.getModificationType());
+ }
+
+ writer.close();
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Failed to serialize candidate %s", candidate), e);
+ }
+
+ return new DataTreeCandidatePayload(out.toByteArray());
+ }
+
+ private static Collection<DataTreeCandidateNode> readChildren(final NormalizedNodeInputStreamReader reader,
+ final DataInput in) throws IOException {
+ final int size = in.readInt();
+ if (size != 0) {
+ final Collection<DataTreeCandidateNode> ret = new ArrayList<>(size);
+ for (int i = 0; i < size; ++i) {
+ final DataTreeCandidateNode child = readNode(reader, in);
+ if (child != null) {
+ ret.add(child);
+ }
+ }
+ return ret;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ private static DataTreeCandidateNode readNode(final NormalizedNodeInputStreamReader reader,
+ final DataInput in) throws IOException {
+ final byte type = in.readByte();
+ switch (type) {
+ case DELETE:
+ return DeletedDataTreeCandidateNode.create(reader.readPathArgument());
+ case SUBTREE_MODIFIED:
+ final PathArgument identifier = reader.readPathArgument();
+ final Collection<DataTreeCandidateNode> children = readChildren(reader, in);
+ if (children.isEmpty()) {
+ LOG.debug("Modified node {} does not have any children, not instantiating it", identifier);
+ return null;
+ } else {
+ return ModifiedDataTreeCandidateNode.create(identifier, children);
+ }
+ case UNMODIFIED:
+ return null;
+ case WRITE:
+ return DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode());
+ default:
+ throw new IllegalArgumentException("Unhandled node type " + type);
+ }
+ }
+
+ private static DataTreeCandidate parseCandidate(final ByteArrayDataInput in) throws IOException {
+ final NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(in);
+ final YangInstanceIdentifier rootPath = reader.readYangInstanceIdentifier();
+ final byte type = in.readByte();
+
+ final DataTreeCandidateNode rootNode;
+ switch (type) {
+ case DELETE:
+ rootNode = DeletedDataTreeCandidateNode.create();
+ break;
+ case SUBTREE_MODIFIED:
+ rootNode = ModifiedDataTreeCandidateNode.create(readChildren(reader, in));
+ break;
+ case WRITE:
+ rootNode = DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode());
+ break;
+ default:
+ throw new IllegalArgumentException("Unhandled node type " + type);
+ }
+
+ return DataTreeCandidates.newDataTreeCandidate(rootPath, rootNode);
+ }
+
+ DataTreeCandidate getCandidate() throws IOException {
+ return parseCandidate(ByteStreams.newDataInput(serialized));
+ }
+
+ @Override
+ @Deprecated
+ @SuppressWarnings("rawtypes")
+ public <T> Map<GeneratedExtension, T> encode() {
+ return null;
+ }
+
+ @Override
+ @Deprecated
+ public Payload decode(final AppendEntries.ReplicatedLogEntry.Payload payload) {
+ return null;
+ }
+
+ @Override
+ public int size() {
+ return serialized.length;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeByte((byte)serialVersionUID);
+ out.writeInt(serialized.length);
+ out.write(serialized);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ final long version = in.readByte();
+ Preconditions.checkArgument(version == serialVersionUID, "Unsupported serialization version %s", version);
+
+ final int length = in.readInt();
+ serialized = new byte[length];
+ in.readFully(serialized);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+
+/**
+ * A deserialized {@link DataTreeCandidateNode} which represents a deletion.
+ */
+abstract class DeletedDataTreeCandidateNode extends AbstractDataTreeCandidateNode {
+ private DeletedDataTreeCandidateNode() {
+ super(ModificationType.DELETE);
+ }
+
+ static DataTreeCandidateNode create() {
+ return new DeletedDataTreeCandidateNode() {
+ @Override
+ public PathArgument getIdentifier() {
+ throw new UnsupportedOperationException("Root node does not have an identifier");
+ }
+ };
+ }
+
+ static DataTreeCandidateNode create(final PathArgument identifier) {
+ return new DeletedDataTreeCandidateNode() {
+ @Override
+ public final PathArgument getIdentifier() {
+ return identifier;
+ }
+ };
+ }
+
+ @Override
+ public final Optional<NormalizedNode<?, ?>> getDataAfter() {
+ return Optional.absent();
+ }
+
+ @Override
+ public final Collection<DataTreeCandidateNode> getChildNodes() {
+ // We would require the before-image to reconstruct the list of nodes which
+ // were deleted.
+ throw new UnsupportedOperationException("Children not available after serialization");
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+
+/**
+ * A deserialized {@link DataTreeCandidateNode} which represents a modification in
+ * one of its children.
+ */
+abstract class ModifiedDataTreeCandidateNode extends AbstractDataTreeCandidateNode {
+ private final Collection<DataTreeCandidateNode> children;
+
+ private ModifiedDataTreeCandidateNode(final Collection<DataTreeCandidateNode> children) {
+ super(ModificationType.SUBTREE_MODIFIED);
+ this.children = Preconditions.checkNotNull(children);
+ }
+
+ static DataTreeCandidateNode create(final Collection<DataTreeCandidateNode> children) {
+ return new ModifiedDataTreeCandidateNode(children) {
+ @Override
+ public PathArgument getIdentifier() {
+ throw new UnsupportedOperationException("Root node does not have an identifier");
+ }
+ };
+ }
+
+ static DataTreeCandidateNode create(final PathArgument identifier, final Collection<DataTreeCandidateNode> children) {
+ return new ModifiedDataTreeCandidateNode(children) {
+ @Override
+ public final PathArgument getIdentifier() {
+ return identifier;
+ }
+ };
+ }
+
+ @Override
+ public final Optional<NormalizedNode<?, ?>> getDataAfter() {
+ throw new UnsupportedOperationException("After-image not available after serialization");
+ }
+
+ @Override
+ public final Collection<DataTreeCandidateNode> getChildNodes() {
+ return children;
+ }
+}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction<DataTreeModification> {
parent.abortTransaction(this);
}
- DOMStoreThreePhaseCommitCohort ready() {
+ ShardDataTreeCohort ready() {
Preconditions.checkState(close(), "Transaction is already closed");
return parent.finishTransaction(this);
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
}
}
+ private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
+ return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
+ }
+
void continueCommit(final CohortEntry cohortEntry) throws Exception {
+ final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate();
+
// If we do not have any followers and we are not using persistence
// or if cohortEntry has no modifications
// we can apply modification to the state immediately
- if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
- applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification());
+ if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
+ applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
} else {
Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
- new ModificationPayload(cohortEntry.getModification()));
+ DataTreeCandidatePayload.create(candidate));
}
}
}
}
+ private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+ LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
+
+ try {
+ // We block on the future here so we don't have to worry about possibly accessing our
+ // state on a different thread outside of our dispatcher. Also, the data store
+ // currently uses a same thread executor anyway.
+ cohortEntry.getCohort().commit().get();
+
+ sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
+
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
+
+ } catch (Exception e) {
+ sender.tell(new akka.actor.Status.Failure(e), getSelf());
+
+ LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+ transactionID, e);
+ shardMBean.incrementFailedTransactionsCount();
+ } finally {
+ commitCoordinator.currentTransactionComplete(transactionID, true);
+ }
+ }
+
private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
// With persistence enabled, this method is called via applyState by the leader strategy
// after the commit has been replicated to a majority of the followers.
CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if(cohortEntry == null) {
+ if (cohortEntry == null) {
// The transaction is no longer the current commit. This can happen if the transaction
// was aborted prior, most likely due to timeout in the front-end. We need to finish
// committing the transaction though since it was successfully persisted and replicated
// transaction.
cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
if(cohortEntry != null) {
- commitWithNewTransaction(cohortEntry.getModification());
+ try {
+ store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate());
+ } catch (DataValidationFailedException e) {
+ shardMBean.incrementFailedTransactionsCount();
+ LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
+ }
+
sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
} else {
// This really shouldn't happen - it likely means that persistence or replication
LOG.error(ex.getMessage());
sender.tell(new akka.actor.Status.Failure(ex), getSelf());
}
-
- return;
- }
-
- LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
-
- try {
- // We block on the future here so we don't have to worry about possibly accessing our
- // state on a different thread outside of our dispatcher. Also, the data store
- // currently uses a same thread executor anyway.
- cohortEntry.getCohort().commit().get();
-
- sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
-
- shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-
- } catch (Exception e) {
- sender.tell(new akka.actor.Status.Failure(e), getSelf());
-
- LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
- transactionID, e);
- shardMBean.incrementFailedTransactionsCount();
- } finally {
- commitCoordinator.currentTransactionComplete(transactionID, true);
+ } else {
+ finishCommit(sender, transactionID, cohortEntry);
}
}
@Override
protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
-
- if(data instanceof ModificationPayload) {
+ if (data instanceof DataTreeCandidatePayload) {
+ if (clientActor == null) {
+ // No clientActor indicates a replica coming from the leader
+ try {
+ store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+ } catch (DataValidationFailedException | IOException e) {
+ LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
+ }
+ } else {
+ // Replication consensus reached, proceed to commit
+ finishCommit(clientActor, identifier);
+ }
+ } else if (data instanceof ModificationPayload) {
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
}
- }
- else if (data instanceof CompositeModificationPayload) {
+ } else if (data instanceof CompositeModificationPayload) {
Object modification = ((CompositeModificationPayload) data).getModification();
applyModificationToState(clientActor, identifier, modification);
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.Logger;
/**
// Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
public interface CohortDecorator {
- DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual);
+ ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
}
private final Cache<String, CohortEntry> cohortCache;
static class CohortEntry {
private final String transactionID;
- private DOMStoreThreePhaseCommitCohort cohort;
- private final MutableCompositeModification compositeModification;
+ private ShardDataTreeCohort cohort;
private final ReadWriteShardDataTreeTransaction transaction;
private ActorRef replySender;
private Shard shard;
private boolean doImmediateCommit;
CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
- this.compositeModification = new MutableCompositeModification();
this.transaction = Preconditions.checkNotNull(transaction);
this.transactionID = transactionID;
}
- CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort,
+ CohortEntry(String transactionID, ShardDataTreeCohort cohort,
MutableCompositeModification compositeModification) {
this.transactionID = transactionID;
this.cohort = cohort;
- this.compositeModification = compositeModification;
this.transaction = null;
}
return transactionID;
}
- DOMStoreThreePhaseCommitCohort getCohort() {
+ ShardDataTreeCohort getCohort() {
return cohort;
}
- MutableCompositeModification getModification() {
- return compositeModification;
- }
-
void applyModifications(Iterable<Modification> modifications) {
- for(Modification modification: modifications) {
- compositeModification.addModification(modification);
+ for (Modification modification : modifications) {
modification.apply(transaction.getSnapshot());
}
}
void setShard(Shard shard) {
this.shard = shard;
}
-
- boolean hasModifications(){
- return compositeModification.getModifications().size() > 0;
- }
}
}
import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent;
import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
return ensureTransactionChain(chainId).newReadWriteTransaction(txId);
}
- void notifyListeners(final DataTreeCandidateTip candidate) {
+ void notifyListeners(final DataTreeCandidate candidate) {
LOG.debug("Notifying listeners on candidate {}", candidate);
// DataTreeChanges first, as they are more light-weight
if (chain != null) {
chain.close();
} else {
- LOG.warn("Closing non-existent transaction chain {}", transactionChainId);
+ LOG.debug("Closing non-existent transaction chain {}", transactionChainId);
}
}
return new SimpleEntry<>(reg, event);
}
+ void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException {
+ LOG.debug("Applying foreign transaction {}", identifier);
+
+ final DataTreeModification mod = dataTree.takeSnapshot().newModification();
+ DataTreeCandidates.applyToModification(mod, foreign);
+ mod.ready();
+
+ LOG.trace("Applying foreign modification {}", mod);
+ dataTree.validate(mod);
+ final DataTreeCandidate candidate = dataTree.prepare(mod);
+ dataTree.commit(candidate);
+ notifyListeners(candidate);
+ }
+
@Override
void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction) {
// Intentional no-op
}
@Override
- DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+ ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
final DataTreeModification snapshot = transaction.getSnapshot();
snapshot.ready();
- return new ShardDataTreeCohort(this, snapshot);
+ return new SimpleShardDataTreeCohort(this, snapshot);
}
+
}
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-final class ShardDataTreeCohort implements DOMStoreThreePhaseCommitCohort {
- private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeCohort.class);
- private static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
- private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
- private final DataTreeModification transaction;
- private final ShardDataTree dataTree;
- private DataTreeCandidateTip candidate;
-
- ShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) {
- this.dataTree = Preconditions.checkNotNull(dataTree);
- this.transaction = Preconditions.checkNotNull(transaction);
- }
-
- @Override
- public ListenableFuture<Boolean> canCommit() {
- try {
- dataTree.getDataTree().validate(transaction);
- LOG.debug("Transaction {} validated", transaction);
- return TRUE_FUTURE;
- } catch (Exception e) {
- return Futures.immediateFailedFuture(e);
- }
- }
-
- @Override
- public ListenableFuture<Void> preCommit() {
- try {
- candidate = dataTree.getDataTree().prepare(transaction);
- /*
- * FIXME: this is the place where we should be interacting with persistence, specifically by invoking
- * persist on the candidate (which gives us a Future).
- */
- LOG.debug("Transaction {} prepared candidate {}", transaction, candidate);
- return VOID_FUTURE;
- } catch (Exception e) {
- LOG.debug("Transaction {} failed to prepare", transaction, e);
- return Futures.immediateFailedFuture(e);
- }
- }
-
- @Override
- public ListenableFuture<Void> abort() {
- // No-op, really
- return VOID_FUTURE;
+public abstract class ShardDataTreeCohort {
+ ShardDataTreeCohort() {
+ // Prevent foreign instantiation
}
- @Override
- public ListenableFuture<Void> commit() {
- try {
- dataTree.getDataTree().commit(candidate);
- } catch (Exception e) {
- LOG.error("Transaction {} failed to commit", transaction, e);
- return Futures.immediateFailedFuture(e);
- }
+ abstract DataTreeCandidateTip getCandidate();
- LOG.debug("Transaction {} committed, proceeding to notify", transaction);
- dataTree.notifyListeners(candidate);
- return VOID_FUTURE;
- }
+ @VisibleForTesting
+ public abstract ListenableFuture<Boolean> canCommit();
+ @VisibleForTesting
+ public abstract ListenableFuture<Void> preCommit();
+ @VisibleForTesting
+ public abstract ListenableFuture<Void> abort();
+ @VisibleForTesting
+ public abstract ListenableFuture<Void> commit();
}
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.concurrent.NotThreadSafe;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
- protected DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+ protected ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction);
// dataTree is finalizing ready the transaction, we just record it for the next
// transaction in chain
- final DOMStoreThreePhaseCommitCohort delegate = dataTree.finishTransaction(transaction);
+ final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction);
openTransaction = null;
previousTx = transaction;
LOG.debug("Committing transaction {}", transaction);
- return new CommitCohort(transaction, delegate);
+ return new ChainedCommitCohort(this, transaction, delegate);
}
@Override
return MoreObjects.toStringHelper(this).add("id", chainId).toString();
}
- private final class CommitCohort extends ForwardingDOMStoreThreePhaseCommitCohort {
- private final ReadWriteShardDataTreeTransaction transaction;
- private final DOMStoreThreePhaseCommitCohort delegate;
-
- CommitCohort(final ReadWriteShardDataTreeTransaction transaction, final DOMStoreThreePhaseCommitCohort delegate) {
- this.transaction = Preconditions.checkNotNull(transaction);
- this.delegate = Preconditions.checkNotNull(delegate);
- }
-
- @Override
- protected DOMStoreThreePhaseCommitCohort delegate() {
- return delegate;
- }
-
- @Override
- public ListenableFuture<Void> commit() {
- final ListenableFuture<Void> ret = super.commit();
-
- Futures.addCallback(ret, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- if (transaction.equals(previousTx)) {
- previousTx = null;
- }
- LOG.debug("Committed transaction {}", transaction);
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.error("Transaction {} commit failed, cannot recover", transaction, t);
- }
- });
-
- return ret;
+ void clearTransaction(ReadWriteShardDataTreeTransaction transaction) {
+ if (transaction.equals(previousTx)) {
+ previousTx = null;
}
}
}
*/
package org.opendaylight.controller.cluster.datastore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-
abstract class ShardDataTreeTransactionParent {
abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction);
- abstract DOMStoreThreePhaseCommitCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
+ abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
}
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.collect.Lists;
import java.io.IOException;
-import java.util.List;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.slf4j.Logger;
*/
class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build();
- private final ShardDataTree store;
- private List<ModificationPayload> currentLogRecoveryBatch;
+ private final DataTree store;
private final String shardName;
private final Logger log;
+ private DataTreeModification transaction;
+ private int size;
ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) {
- this.store = store;
+ this.store = store.getDataTree();
this.shardName = shardName;
this.log = log;
}
@Override
public void startLogRecoveryBatch(int maxBatchSize) {
- currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize);
-
log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize);
+ transaction = store.takeSnapshot().newModification();
+ size = 0;
}
@Override
public void appendRecoveredLogEntry(Payload payload) {
try {
- if(payload instanceof ModificationPayload) {
- currentLogRecoveryBatch.add((ModificationPayload) payload);
+ if (payload instanceof DataTreeCandidatePayload) {
+ DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
+ size++;
+ } else if (payload instanceof ModificationPayload) {
+ MutableCompositeModification.fromSerializable(
+ ((ModificationPayload) payload).getModification()).apply(transaction);
+ size++;
} else if (payload instanceof CompositeModificationPayload) {
- currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
- ((CompositeModificationPayload) payload).getModification())));
+ MutableCompositeModification.fromSerializable(
+ ((CompositeModificationPayload) payload).getModification()).apply(transaction);
+ size++;
} else if (payload instanceof CompositeModificationByteStringPayload) {
- currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable(
- ((CompositeModificationByteStringPayload) payload).getModification())));
+ MutableCompositeModification.fromSerializable(
+ ((CompositeModificationByteStringPayload) payload).getModification()).apply(transaction);
+ size++;
} else {
log.error("{}: Unknown payload {} received during recovery", shardName, payload);
}
- } catch (IOException e) {
+ } catch (IOException | ClassNotFoundException e) {
log.error("{}: Error extracting ModificationPayload", shardName, e);
}
-
}
- private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) {
- DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction);
- try {
- commitCohort.preCommit().get();
- commitCohort.commit().get();
- } catch (Exception e) {
- log.error("{}: Failed to commit Tx on recovery", shardName, e);
- }
+ private void commitTransaction(DataTreeModification tx) throws DataValidationFailedException {
+ tx.ready();
+ store.validate(tx);
+ store.commit(store.prepare(tx));
}
/**
*/
@Override
public void applyCurrentLogRecoveryBatch() {
- log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size());
-
- ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null);
- DataTreeModification snapshot = writeTx.getSnapshot();
- for (ModificationPayload payload : currentLogRecoveryBatch) {
- try {
- MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot);
- } catch (Exception e) {
- log.error("{}: Error extracting ModificationPayload", shardName, e);
- }
+ log.debug("{}: Applying current log recovery batch with size {}", shardName, size);
+ try {
+ commitTransaction(transaction);
+ } catch (DataValidationFailedException e) {
+ log.error("{}: Failed to apply recovery batch", shardName, e);
}
-
- commitTransaction(writeTx);
-
- currentLogRecoveryBatch = null;
+ transaction = null;
}
/**
public void applyRecoverySnapshot(final byte[] snapshotBytes) {
log.debug("{}: Applying recovered snapshot", shardName);
- // Intentionally bypass normal transaction to side-step persistence/replication
- final DataTree tree = store.getDataTree();
- DataTreeModification writeTx = tree.takeSnapshot().newModification();
-
- NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
- writeTx.write(ROOT, node);
- writeTx.ready();
+ final NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
+ final DataTreeModification tx = store.takeSnapshot().newModification();
+ tx.write(ROOT, node);
try {
- tree.validate(writeTx);
- tree.commit(tree.prepare(writeTx));
+ commitTransaction(tx);
} catch (DataValidationFailedException e) {
- log.error("{}: Failed to validate recovery snapshot", shardName, e);
+ log.error("{}: Failed to apply recovery snapshot", shardName, e);
}
}
}
import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.slf4j.Logger;
void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction)
throws ExecutionException, InterruptedException {
- DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction);
+ ShardDataTreeCohort commitCohort = store.finishTransaction(transaction);
commitCohort.preCommit().get();
commitCohort.commit().get();
}
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
/**
* @author: syedbahm
LOG.debug("readyTransaction : {}", transactionID);
- DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
+ ShardDataTreeCohort cohort = transaction.ready();
getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(),
cohort, compositeModification, returnSerialized, doImmediateCommit), getContext());
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class SimpleShardDataTreeCohort extends ShardDataTreeCohort {
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class);
+ private static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+ private static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
+ private final DataTreeModification transaction;
+ private final ShardDataTree dataTree;
+ private DataTreeCandidateTip candidate;
+
+ SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) {
+ this.dataTree = Preconditions.checkNotNull(dataTree);
+ this.transaction = Preconditions.checkNotNull(transaction);
+ }
+
+ @Override
+ DataTreeCandidateTip getCandidate() {
+ return candidate;
+ }
+
+ @Override
+ public ListenableFuture<Boolean> canCommit() {
+ try {
+ dataTree.getDataTree().validate(transaction);
+ LOG.debug("Transaction {} validated", transaction);
+ return TRUE_FUTURE;
+ } catch (Exception e) {
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> preCommit() {
+ try {
+ candidate = dataTree.getDataTree().prepare(transaction);
+ /*
+ * FIXME: this is the place where we should be interacting with persistence, specifically by invoking
+ * persist on the candidate (which gives us a Future).
+ */
+ LOG.debug("Transaction {} prepared candidate {}", transaction, candidate);
+ return VOID_FUTURE;
+ } catch (Exception e) {
+ LOG.debug("Transaction {} failed to prepare", transaction, e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> abort() {
+ // No-op, really
+ return VOID_FUTURE;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ try {
+ dataTree.getDataTree().commit(candidate);
+ } catch (Exception e) {
+ LOG.error("Transaction {} failed to commit", transaction, e);
+ return Futures.immediateFailedFuture(e);
+ }
+
+ LOG.debug("Transaction {} committed, proceeding to notify", transaction);
+ dataTree.notifyListeners(candidate);
+ return VOID_FUTURE;
+ }
+}
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.cluster.datastore.compat.PreLithiumTransactionContextImpl;
import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeAggregator;
return false;
}
- private boolean isRootPath(YangInstanceIdentifier path){
+ private static boolean isRootPath(YangInstanceIdentifier path) {
return !path.getPathArguments().iterator().hasNext();
}
return ShardStrategyFactory.getStrategy(path).findShard(path);
}
- protected Future<ActorSelection> sendFindPrimaryShardAsync(String shardName) {
+ protected Future<PrimaryShardInfo> sendFindPrimaryShardAsync(String shardName) {
return actorContext.findPrimaryShardAsync(shardName);
}
private TransactionFutureCallback getOrCreateTxFutureCallback(String shardName) {
TransactionFutureCallback txFutureCallback = txFutureCallbackMap.get(shardName);
if(txFutureCallback == null) {
- Future<ActorSelection> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
+ Future<PrimaryShardInfo> findPrimaryFuture = sendFindPrimaryShardAsync(shardName);
final TransactionFutureCallback newTxFutureCallback = new TransactionFutureCallback(this, shardName);
txFutureCallback = newTxFutureCallback;
txFutureCallbackMap.put(shardName, txFutureCallback);
- findPrimaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ findPrimaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
- public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
if(failure != null) {
newTxFutureCallback.createTransactionContext(failure, null);
} else {
- newTxFutureCallback.setPrimaryShard(primaryShard);
+ newTxFutureCallback.setPrimaryShard(primaryShardInfo.getPrimaryShardActor());
}
}
}, actorContext.getClientDispatcher());
*/
package org.opendaylight.controller.cluster.datastore.messages;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
/**
* Transaction ReadyTransaction message that is forwarded to the local Shard from the ShardTransaction.
*/
public class ForwardedReadyTransaction {
private final String transactionID;
- private final DOMStoreThreePhaseCommitCohort cohort;
+ private final ShardDataTreeCohort cohort;
private final Modification modification;
private final boolean returnSerialized;
private final boolean doImmediateCommit;
private final short txnClientVersion;
public ForwardedReadyTransaction(String transactionID, short txnClientVersion,
- DOMStoreThreePhaseCommitCohort cohort, Modification modification,
+ ShardDataTreeCohort cohort, Modification modification,
boolean returnSerialized, boolean doImmediateCommit) {
this.transactionID = transactionID;
this.cohort = cohort;
return transactionID;
}
- public DOMStoreThreePhaseCommitCohort getCohort() {
+ public ShardDataTreeCohort getCohort() {
return cohort;
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorSelection;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Local message DTO that contains information about the primary shard.
+ *
+ * @author Thomas Pantelis
+ */
+public class PrimaryShardInfo {
+ private final ActorSelection primaryShardActor;
+ private final Optional<DataTree> localShardDataTree;
+
+ public PrimaryShardInfo(@Nonnull ActorSelection primaryShardActor, @Nonnull Optional<DataTree> localShardDataTree) {
+ this.primaryShardActor = Preconditions.checkNotNull(primaryShardActor);
+ this.localShardDataTree = Preconditions.checkNotNull(localShardDataTree);
+ }
+
+ /**
+ * Returns an ActorSelection representing the primary shard actor.
+ */
+ public @Nonnull ActorSelection getPrimaryShardActor() {
+ return primaryShardActor;
+ }
+
+ /**
+ * Returns an Optional whose value contains the primary shard's DataTree if the primary shard is local
+ * to the caller. Otherwise the Optional value is absent.
+ */
+ public @Nonnull Optional<DataTree> getLocalShardDataTree() {
+ return localShardDataTree;
+ }
+}
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.reporting.MetricsReporter;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private Timeout transactionCommitOperationTimeout;
private Timeout shardInitializationTimeout;
private final Dispatchers dispatchers;
- private Cache<String, Future<ActorSelection>> primaryShardActorSelectionCache;
+ private Cache<String, Future<PrimaryShardInfo>> primaryShardInfoCache;
private volatile SchemaContext schemaContext;
private volatile boolean updated;
shardInitializationTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
- primaryShardActorSelectionCache = CacheBuilder.newBuilder()
+ primaryShardInfoCache = CacheBuilder.newBuilder()
.expireAfterWrite(datastoreContext.getShardLeaderElectionTimeout().duration().toMillis(), TimeUnit.MILLISECONDS)
.build();
}
return schemaContext;
}
- public Future<ActorSelection> findPrimaryShardAsync(final String shardName) {
- Future<ActorSelection> ret = primaryShardActorSelectionCache.getIfPresent(shardName);
+ public Future<PrimaryShardInfo> findPrimaryShardAsync(final String shardName) {
+ Future<PrimaryShardInfo> ret = primaryShardInfoCache.getIfPresent(shardName);
if(ret != null){
return ret;
}
Future<Object> future = executeOperationAsync(shardManager,
new FindPrimary(shardName, true), shardInitializationTimeout);
- return future.transform(new Mapper<Object, ActorSelection>() {
+ return future.transform(new Mapper<Object, PrimaryShardInfo>() {
@Override
- public ActorSelection checkedApply(Object response) throws Exception {
+ public PrimaryShardInfo checkedApply(Object response) throws Exception {
if(response instanceof PrimaryFound) {
PrimaryFound found = (PrimaryFound)response;
LOG.debug("Primary found {}", found.getPrimaryPath());
ActorSelection actorSelection = actorSystem.actorSelection(found.getPrimaryPath());
- primaryShardActorSelectionCache.put(shardName, Futures.successful(actorSelection));
- return actorSelection;
+ PrimaryShardInfo info = new PrimaryShardInfo(actorSelection, Optional.<DataTree>absent());
+ primaryShardInfoCache.put(shardName, Futures.successful(info));
+ return info;
} else if(response instanceof NotInitializedException) {
throw (NotInitializedException)response;
} else if(response instanceof PrimaryNotFoundException) {
public void broadcast(final Object message){
for(final String shardName : configuration.getAllShardNames()){
- Future<ActorSelection> primaryFuture = findPrimaryShardAsync(shardName);
- primaryFuture.onComplete(new OnComplete<ActorSelection>() {
+ Future<PrimaryShardInfo> primaryFuture = findPrimaryShardAsync(shardName);
+ primaryFuture.onComplete(new OnComplete<PrimaryShardInfo>() {
@Override
- public void onComplete(Throwable failure, ActorSelection primaryShard) {
+ public void onComplete(Throwable failure, PrimaryShardInfo primaryShardInfo) {
if(failure != null) {
LOG.warn("broadcast failed to send message {} to shard {}: {}",
message.getClass().getSimpleName(), shardName, failure);
} else {
- primaryShard.tell(message, ActorRef.noSender());
+ primaryShardInfo.getPrimaryShardActor().tell(message, ActorRef.noSender());
}
}
}, getClientDispatcher());
}
@VisibleForTesting
- Cache<String, Future<ActorSelection>> getPrimaryShardActorSelectionCache() {
- return primaryShardActorSelectionCache;
+ Cache<String, Future<PrimaryShardInfo>> getPrimaryShardInfoCache() {
+ return primaryShardInfoCache;
}
}
import akka.testkit.TestActorRef;
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied));
}
- protected NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
- DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
- CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =
- transaction.read(YangInstanceIdentifier.builder().build());
-
- Optional<NormalizedNode<?, ?>> optional = read.checkedGet();
-
- NormalizedNode<?, ?> normalizedNode = optional.get();
-
- transaction.close();
-
- return normalizedNode;
- }
-
- protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
+ protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
final MutableCompositeModification modification) {
return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null);
}
- protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName,
+ protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName,
final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode<?, ?> data,
final MutableCompositeModification modification,
- final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
+ final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null);
tx.getSnapshot().write(path, data);
- DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit);
+ ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit);
modification.addModification(new WriteModification(path, data));
return cohort;
}
- protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName,
- final DOMStoreThreePhaseCommitCohort actual) {
+ protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
+ final ShardDataTreeCohort actual) {
return createDelegatingMockCohort(cohortName, actual, null);
}
- protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName,
- final DOMStoreThreePhaseCommitCohort actual,
- final Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit) {
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName);
+ protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName,
+ final ShardDataTreeCohort actual,
+ final Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit) {
+ ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName);
doAnswer(new Answer<ListenableFuture<Boolean>>() {
@Override
}
}).when(cohort).abort();
+ doAnswer(new Answer<DataTreeCandidateTip>() {
+ @Override
+ public DataTreeCandidateTip answer(final InvocationOnMock invocation) {
+ return actual.getCandidate();
+ }
+ }).when(cohort).getCandidate();
+
return cohort;
}
ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null);
transaction.getSnapshot().write(id, node);
- DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
+ ShardDataTreeCohort cohort = transaction.ready();
cohort.canCommit().get();
cohort.preCommit().get();
cohort.commit();
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD);
}
+ protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) {
+ return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
+ Optional.<DataTree>absent()));
+ }
+
protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
log.info("Created mock shard actor {}", actorRef);
doReturn(actorSystem.actorSelection(actorRef.path())).
when(mockActorContext).actorSelection(actorRef.path().toString());
- doReturn(Futures.successful(actorSystem.actorSelection(actorRef.path()))).
+ doReturn(primaryShardInfoReply(actorSystem, actorRef)).
when(mockActorContext).findPrimaryShardAsync(eq(shardName));
doReturn(false).when(mockActorContext).isPathLocal(actorRef.path().toString());
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
/**
* Unit tests for DOMConcurrentDataCommitCoordinator.
assertFailure(future, cause, mockCohort1, mockCohort2);
}
+
+ @Test
+ public void testCreateReadWriteTransaction(){
+ DOMStore domStore = mock(DOMStore.class);
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+ dataBroker.newReadWriteTransaction();
+
+ verify(domStore, never()).newReadWriteTransaction();
+ }
+
+
+ @Test
+ public void testCreateWriteOnlyTransaction(){
+ DOMStore domStore = mock(DOMStore.class);
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+ dataBroker.newWriteOnlyTransaction();
+
+ verify(domStore, never()).newWriteOnlyTransaction();
+ }
+
+ @Test
+ public void testCreateReadOnlyTransaction(){
+ DOMStore domStore = mock(DOMStore.class);
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+ dataBroker.newReadOnlyTransaction();
+
+ verify(domStore, never()).newReadOnlyTransaction();
+ }
+
+ @Test
+ public void testLazySubTransactionCreationForReadWriteTransactions(){
+ DOMStore configDomStore = mock(DOMStore.class);
+ DOMStore operationalDomStore = mock(DOMStore.class);
+ DOMStoreReadWriteTransaction storeTxn = mock(DOMStoreReadWriteTransaction.class);
+
+ doReturn(storeTxn).when(operationalDomStore).newReadWriteTransaction();
+ doReturn(storeTxn).when(configDomStore).newReadWriteTransaction();
+
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor);
+ DOMDataReadWriteTransaction dataTxn = dataBroker.newReadWriteTransaction();
+
+ dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+ dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+ dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+
+ verify(configDomStore, never()).newReadWriteTransaction();
+ verify(operationalDomStore, times(1)).newReadWriteTransaction();
+
+ dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+ verify(configDomStore, times(1)).newReadWriteTransaction();
+ verify(operationalDomStore, times(1)).newReadWriteTransaction();
+
+ }
+
+ @Test
+ public void testLazySubTransactionCreationForWriteOnlyTransactions(){
+ DOMStore configDomStore = mock(DOMStore.class);
+ DOMStore operationalDomStore = mock(DOMStore.class);
+ DOMStoreWriteTransaction storeTxn = mock(DOMStoreWriteTransaction.class);
+
+ doReturn(storeTxn).when(operationalDomStore).newWriteOnlyTransaction();
+ doReturn(storeTxn).when(configDomStore).newWriteOnlyTransaction();
+
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor);
+ DOMDataWriteTransaction dataTxn = dataBroker.newWriteOnlyTransaction();
+
+ dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+ dataTxn.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+ verify(configDomStore, never()).newWriteOnlyTransaction();
+ verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
+
+ dataTxn.put(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+ verify(configDomStore, times(1)).newWriteOnlyTransaction();
+ verify(operationalDomStore, times(1)).newWriteOnlyTransaction();
+
+ }
+
+
+ @Test
+ public void testLazySubTransactionCreationForReadOnlyTransactions(){
+ DOMStore configDomStore = mock(DOMStore.class);
+ DOMStore operationalDomStore = mock(DOMStore.class);
+ DOMStoreReadTransaction storeTxn = mock(DOMStoreReadTransaction.class);
+
+ doReturn(storeTxn).when(operationalDomStore).newReadOnlyTransaction();
+ doReturn(storeTxn).when(configDomStore).newReadOnlyTransaction();
+
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor);
+ DOMDataReadOnlyTransaction dataTxn = dataBroker.newReadOnlyTransaction();
+
+ dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+ dataTxn.read(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+
+ verify(configDomStore, never()).newReadOnlyTransaction();
+ verify(operationalDomStore, times(1)).newReadOnlyTransaction();
+
+ dataTxn.read(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build());
+
+ verify(configDomStore, times(1)).newReadOnlyTransaction();
+ verify(operationalDomStore, times(1)).newReadOnlyTransaction();
+
+ }
+
+ @Test
+ public void testSubmitWithOnlyOneSubTransaction() throws InterruptedException {
+ DOMStore configDomStore = mock(DOMStore.class);
+ DOMStore operationalDomStore = mock(DOMStore.class);
+ DOMStoreReadWriteTransaction mockStoreReadWriteTransaction = mock(DOMStoreReadWriteTransaction.class);
+ DOMStoreThreePhaseCommitCohort mockCohort = mock(DOMStoreThreePhaseCommitCohort.class);
+
+ doReturn(mockStoreReadWriteTransaction).when(operationalDomStore).newReadWriteTransaction();
+ doReturn(mockCohort).when(mockStoreReadWriteTransaction).ready();
+ doReturn(Futures.immediateFuture(false)).when(mockCohort).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(mockCohort).abort();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList();
+
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor) {
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction, Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ commitCohorts.addAll(cohorts);
+ latch.countDown();
+ return super.submit(transaction, cohorts);
+ }
+ };
+ DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
+
+ domDataReadWriteTransaction.delete(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build());
+
+ domDataReadWriteTransaction.submit();
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ assertTrue(commitCohorts.size() == 1);
+ }
+
+ @Test
+ public void testSubmitWithOnlyTwoSubTransactions() throws InterruptedException {
+ DOMStore configDomStore = mock(DOMStore.class);
+ DOMStore operationalDomStore = mock(DOMStore.class);
+ DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
+ DOMStoreReadWriteTransaction configTransaction = mock(DOMStoreReadWriteTransaction.class);
+ DOMStoreThreePhaseCommitCohort mockCohortOperational = mock(DOMStoreThreePhaseCommitCohort.class);
+ DOMStoreThreePhaseCommitCohort mockCohortConfig = mock(DOMStoreThreePhaseCommitCohort.class);
+
+ doReturn(operationalTransaction).when(operationalDomStore).newReadWriteTransaction();
+ doReturn(configTransaction).when(configDomStore).newReadWriteTransaction();
+
+ doReturn(mockCohortOperational).when(operationalTransaction).ready();
+ doReturn(Futures.immediateFuture(false)).when(mockCohortOperational).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(mockCohortOperational).abort();
+
+ doReturn(mockCohortConfig).when(configTransaction).ready();
+ doReturn(Futures.immediateFuture(false)).when(mockCohortConfig).canCommit();
+ doReturn(Futures.immediateFuture(null)).when(mockCohortConfig).abort();
+
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final List<DOMStoreThreePhaseCommitCohort> commitCohorts = new ArrayList();
+
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ operationalDomStore, LogicalDatastoreType.CONFIGURATION, configDomStore), futureExecutor) {
+ @Override
+ public CheckedFuture<Void, TransactionCommitFailedException> submit(DOMDataWriteTransaction transaction, Collection<DOMStoreThreePhaseCommitCohort> cohorts) {
+ commitCohorts.addAll(cohorts);
+ latch.countDown();
+ return super.submit(transaction, cohorts);
+ }
+ };
+ DOMDataReadWriteTransaction domDataReadWriteTransaction = dataBroker.newReadWriteTransaction();
+
+ domDataReadWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+ domDataReadWriteTransaction.merge(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+
+ domDataReadWriteTransaction.submit();
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ assertTrue(commitCohorts.size() == 2);
+ }
+
+ @Test
+ public void testCreateTransactionChain(){
+ DOMStore domStore = mock(DOMStore.class);
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+
+ dataBroker.createTransactionChain(mock(TransactionChainListener.class));
+
+ verify(domStore, times(2)).createTransactionChain();
+
+ }
+
+ @Test
+ public void testCreateTransactionOnChain(){
+ DOMStore domStore = mock(DOMStore.class);
+ ConcurrentDOMDataBroker dataBroker = new ConcurrentDOMDataBroker(ImmutableMap.of(LogicalDatastoreType.OPERATIONAL,
+ domStore, LogicalDatastoreType.CONFIGURATION, domStore), futureExecutor);
+
+ DOMStoreReadWriteTransaction operationalTransaction = mock(DOMStoreReadWriteTransaction.class);
+ DOMStoreTransactionChain mockChain = mock(DOMStoreTransactionChain.class);
+
+ doReturn(mockChain).when(domStore).createTransactionChain();
+ doReturn(operationalTransaction).when(mockChain).newWriteOnlyTransaction();
+
+ DOMTransactionChain transactionChain = dataBroker.createTransactionChain(mock(TransactionChainListener.class));
+
+ DOMDataWriteTransaction domDataWriteTransaction = transactionChain.newWriteOnlyTransaction();
+
+ verify(mockChain, never()).newWriteOnlyTransaction();
+
+ domDataWriteTransaction.put(LogicalDatastoreType.OPERATIONAL, YangInstanceIdentifier.builder().build(), mock(NormalizedNode.class));
+ }
+
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+
+public class DataTreeCandidatePayloadTest {
+ private DataTreeCandidate candidate;
+
+ private static DataTreeCandidateNode findNode(final Collection<DataTreeCandidateNode> nodes, final PathArgument arg) {
+ for (DataTreeCandidateNode node : nodes) {
+ if (arg.equals(node.getIdentifier())) {
+ return node;
+ }
+ }
+ return null;
+ }
+
+ private static void assertChildrenEquals(final Collection<DataTreeCandidateNode> expected,
+ final Collection<DataTreeCandidateNode> actual) {
+ // Make sure all expected nodes are there
+ for (DataTreeCandidateNode exp : expected) {
+ final DataTreeCandidateNode act = findNode(actual, exp.getIdentifier());
+ assertNotNull("missing expected child", act);
+ assertCandidateNodeEquals(exp, act);
+ }
+ // Make sure no nodes are present which are not in the expected set
+ for (DataTreeCandidateNode act : actual) {
+ final DataTreeCandidateNode exp = findNode(expected, act.getIdentifier());
+ assertNull("unexpected child", exp);
+ }
+ }
+
+ private static void assertCandidateEquals(final DataTreeCandidate expected, final DataTreeCandidate actual) {
+ assertEquals("root path", expected.getRootPath(), actual.getRootPath());
+
+ final DataTreeCandidateNode expRoot = expected.getRootNode();
+ final DataTreeCandidateNode actRoot = expected.getRootNode();
+ assertEquals("root type", expRoot.getModificationType(), actRoot.getModificationType());
+
+ switch (actRoot.getModificationType()) {
+ case DELETE:
+ case WRITE:
+ assertEquals("root data", expRoot.getDataAfter(), actRoot.getDataAfter());
+ break;
+ case SUBTREE_MODIFIED:
+ assertChildrenEquals(expRoot.getChildNodes(), actRoot.getChildNodes());
+ break;
+ default:
+ fail("Unexpect root type " + actRoot.getModificationType());
+ break;
+ }
+
+ assertCandidateNodeEquals(expected.getRootNode(), actual.getRootNode());
+ }
+
+ private static void assertCandidateNodeEquals(final DataTreeCandidateNode expected, final DataTreeCandidateNode actual) {
+ assertEquals("child type", expected.getModificationType(), actual.getModificationType());
+ assertEquals("child identifier", expected.getIdentifier(), actual.getIdentifier());
+
+ switch (actual.getModificationType()) {
+ case DELETE:
+ case WRITE:
+ assertEquals("child data", expected.getDataAfter(), actual.getDataAfter());
+ break;
+ case SUBTREE_MODIFIED:
+ assertChildrenEquals(expected.getChildNodes(), actual.getChildNodes());
+ break;
+ default:
+ fail("Unexpect root type " + actual.getModificationType());
+ break;
+ }
+ }
+
+ @Before
+ public void setUp() {
+ final YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+ final NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+ withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+ candidate = DataTreeCandidates.fromNormalizedNode(writePath, writeData);
+ }
+
+ @Test
+ public void testCandidateSerialization() throws IOException {
+ final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
+ assertEquals("payload size", 141, payload.size());
+ }
+
+ @Test
+ public void testCandidateSerDes() throws IOException {
+ final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
+ assertCandidateEquals(candidate, payload.getCandidate());
+ }
+
+ @Test
+ public void testPayloadSerDes() throws IOException {
+ final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
+ assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate());
+ }
+}
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.FiniteDuration;
public class ShardTest extends AbstractShardTest {
+ private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars");
@Test
public void testRegisterChangeListener() throws Exception {
}
@Test
- public void testRecovery() throws Exception {
+ public void testApplyStateWithCandidatePayload() throws Exception {
- // Set up the InMemorySnapshotStore.
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState");
+
+ NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node);
+
+ ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+ DataTreeCandidatePayload.create(candidate)));
+
+ shard.underlyingActor().onReceiveCommand(applyState);
+
+ NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
+ assertEquals("Applied state", node, actual);
+
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ DataTree setupInMemorySnapshotStore() throws DataValidationFailedException {
DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
testStore.setSchemaContext(SCHEMA_CONTEXT);
InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
SerializationUtils.serializeNormalizedNode(root),
Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+ return testStore;
+ }
+
+ private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException {
+ source.validate(mod);
+ final DataTreeCandidate candidate = source.prepare(mod);
+ source.commit(candidate);
+ return DataTreeCandidatePayload.create(candidate);
+ }
+
+ @Test
+ public void testDataTreeCandidateRecovery() throws Exception {
+ // Set up the InMemorySnapshotStore.
+ final DataTree source = setupInMemorySnapshotStore();
+
+ final DataTreeModification writeMod = source.takeSnapshot().newModification();
+ writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build());
+
+ // Set up the InMemoryJournal.
+ InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
+
+ int nListEntries = 16;
+ Set<Integer> listEntryKeys = new HashSet<>();
+
+ // Add some ModificationPayload entries
+ for (int i = 1; i <= nListEntries; i++) {
+ listEntryKeys.add(Integer.valueOf(i));
+
+ YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
+ .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
+
+ final DataTreeModification mod = source.takeSnapshot().newModification();
+ mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
+
+ InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ payloadForModification(source, mod)));
+ }
+
+ InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1,
+ new ApplyJournalEntries(nListEntries));
+
+ testRecovery(listEntryKeys);
+ }
+
+ @Test
+ public void testModicationRecovery() throws Exception {
+
+ // Set up the InMemorySnapshotStore.
+ setupInMemorySnapshotStore();
// Set up the InMemoryJournal.
testRecovery(listEntryKeys);
}
- private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
+ private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
MutableCompositeModification compMod = new MutableCompositeModification();
for(Modification mod: mods) {
compMod.addModification(mod);
return new ModificationPayload(compMod);
}
- @SuppressWarnings({ "unchecked" })
@Test
public void testConcurrentThreePhaseCommits() throws Throwable {
new ShardTestKit(getSystem()) {{
String transactionID1 = "tx1";
MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
String transactionID2 = "tx2";
MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+ ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
modification2);
String transactionID3 = "tx3";
MutableCompositeModification modification3 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+ ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
}};
}
- private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
+ private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path,
NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady);
}
- private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
+ private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID,
YangInstanceIdentifier path, NormalizedNode<?, ?> data, boolean ready, boolean doCommitOnReady) {
BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID);
batched.addModification(new WriteModification(path, data));
final String transactionID = "tx";
FiniteDuration duration = duration("5 seconds");
- final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+ final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
@Override
- public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+ public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
if(mockCohort.get() == null) {
mockCohort.set(createDelegatingMockCohort("cohort", actual));
}
final String transactionID = "tx";
FiniteDuration duration = duration("5 seconds");
- final AtomicReference<DOMStoreThreePhaseCommitCohort> mockCohort = new AtomicReference<>();
+ final AtomicReference<ShardDataTreeCohort> mockCohort = new AtomicReference<>();
ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() {
@Override
- public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) {
+ public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) {
if(mockCohort.get() == null) {
mockCohort.set(createDelegatingMockCohort("cohort", actual));
}
}
@SuppressWarnings("unchecked")
- private void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
+ private static void verifyOuterListEntry(final TestActorRef<Shard> shard, Object expIDValue) throws Exception {
NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
final AtomicBoolean overrideLeaderCalls = new AtomicBoolean();
new ShardTestKit(getSystem()) {{
Creator<Shard> creator = new Creator<Shard>() {
+ private static final long serialVersionUID = 1L;
+
@Override
public Shard create() throws Exception {
return new Shard(shardID, Collections.<String,String>emptyMap(),
String transactionID = "tx1";
MutableCompositeModification modification = new MutableCompositeModification();
NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+ ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
TestModel.TEST_PATH, containerNode, modification);
FiniteDuration duration = duration("5 seconds");
String transactionID = "tx";
MutableCompositeModification modification = new MutableCompositeModification();
NormalizedNode<?, ?> containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore,
+ ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore,
TestModel.TEST_PATH, containerNode, modification);
FiniteDuration duration = duration("5 seconds");
}};
}
+ private static DataTreeCandidateTip mockCandidate(final String name) {
+ DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
+ DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
+ doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType();
+ doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter();
+ doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
+ doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
+ return mockCandidate;
+ }
+
+ private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) {
+ DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name);
+ DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node");
+ doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType();
+ doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath();
+ doReturn(mockCandidateNode).when(mockCandidate).getRootNode();
+ return mockCandidate;
+ }
+
@Test
public void testCommitWhenTransactionHasNoModifications(){
// Note that persistence is enabled which would normally result in the entry getting written to the journal
String transactionID = "tx1";
MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+ doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate();
FiniteDuration duration = duration("5 seconds");
String transactionID = "tx1";
MutableCompositeModification modification = new MutableCompositeModification();
modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build()));
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit();
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit();
+ doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate();
FiniteDuration duration = duration("5 seconds");
String transactionID1 = "tx1";
MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit();
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit();
+ doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate();
String transactionID2 = "tx2";
MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
FiniteDuration duration = duration("5 seconds");
String transactionID1 = "tx1";
MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit();
String transactionID2 = "tx2";
MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
FiniteDuration duration = duration("5 seconds");
String transactionID1 = "tx1";
MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
// Simulate the ForwardedReadyTransaction messages that would be sent
String transactionID1 = "tx1";
MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
// Simulate the ForwardedReadyTransaction messages that would be sent
String transactionID1 = "tx1";
MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit();
// Simulate the ForwardedReadyTransaction messages that would be sent
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+ DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
+ DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
+ doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
+ doReturn(candidateRoot).when(candidate).getRootNode();
+ doReturn(candidate).when(cohort).getCandidate();
shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
cohort, modification, true, true), getRef());
String transactionID = "tx1";
MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit();
// Simulate the ForwardedReadyTransaction messages that would be sent
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort).preCommit();
doReturn(Futures.immediateFuture(null)).when(cohort).commit();
+ DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class);
+ DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class);
+ doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType();
+ doReturn(candidateRoot).when(candidate).getRootNode();
+ doReturn(candidate).when(cohort).getCandidate();
shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
cohort, modification, true, true), getRef());
ShardDataTree dataStore = shard.underlyingActor().getDataStore();
final String transactionID = "tx1";
- Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>> preCommit =
- new Function<DOMStoreThreePhaseCommitCohort,ListenableFuture<Void>>() {
+ Function<ShardDataTreeCohort, ListenableFuture<Void>> preCommit =
+ new Function<ShardDataTreeCohort, ListenableFuture<Void>>() {
@Override
- public ListenableFuture<Void> apply(final DOMStoreThreePhaseCommitCohort cohort) {
+ public ListenableFuture<Void> apply(final ShardDataTreeCohort cohort) {
ListenableFuture<Void> preCommitFuture = cohort.preCommit();
// Simulate an AbortTransaction message occurring during replication, after
};
MutableCompositeModification modification = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
+ ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME),
modification, preCommit);
String transactionID1 = "tx1";
MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
MutableCompositeModification modification2 = new MutableCompositeModification();
YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
+ ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore,
listNodePath,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2),
modification2);
String transactionID1 = "tx1";
MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
String transactionID2 = "tx2";
MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+ ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
modification2);
String transactionID3 = "tx3";
MutableCompositeModification modification3 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+ ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3);
// Ready the Tx's
String transactionID1 = "tx1";
MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1");
+ ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit();
doReturn(Futures.immediateFuture(null)).when(cohort1).abort();
String transactionID2 = "tx2";
MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2");
+ ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2");
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit();
FiniteDuration duration = duration("5 seconds");
/**
* This test simply verifies that the applySnapShot logic will work
* @throws ReadFailedException
+ * @throws DataValidationFailedException
*/
@Test
- public void testInMemoryDataStoreRestore() throws ReadFailedException {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor());
-
- store.onGlobalContextUpdated(SCHEMA_CONTEXT);
+ public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException {
+ DataTree store = InMemoryDataTreeFactory.getInstance().create();
+ store.setSchemaContext(SCHEMA_CONTEXT);
- DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction();
+ DataTreeModification putTransaction = store.takeSnapshot().newModification();
putTransaction.write(TestModel.TEST_PATH,
ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- commitTransaction(putTransaction);
+ commitTransaction(store, putTransaction);
- NormalizedNode<?, ?> expected = readStore(store);
+ NormalizedNode<?, ?> expected = readStore(store, YangInstanceIdentifier.builder().build());
- DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction();
+ DataTreeModification writeTransaction = store.takeSnapshot().newModification();
writeTransaction.delete(YangInstanceIdentifier.builder().build());
writeTransaction.write(YangInstanceIdentifier.builder().build(), expected);
- commitTransaction(writeTransaction);
+ commitTransaction(store, writeTransaction);
- NormalizedNode<?, ?> actual = readStore(store);
+ NormalizedNode<?, ?> actual = readStore(store, YangInstanceIdentifier.builder().build());
assertEquals(expected, actual);
}
shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}
- private void commitTransaction(final DOMStoreWriteTransaction transaction) {
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
- ListenableFuture<Void> future =
- commitCohort.preCommit();
- try {
- future.get();
- future = commitCohort.commit();
- future.get();
- } catch (InterruptedException | ExecutionException e) {
- }
+ private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException {
+ modification.ready();
+ store.validate(modification);
+ store.commit(store.prepare(modification));
}
}
if (exToThrow instanceof PrimaryNotFoundException) {
doReturn(Futures.failed(exToThrow)).when(mockActorContext).findPrimaryShardAsync(anyString());
} else {
- doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
+ doReturn(primaryShardInfoReply(getSystem(), actorRef)).
when(mockActorContext).findPrimaryShardAsync(anyString());
}
doReturn(getSystem().actorSelection(actorRef.path())).when(mockActorContext).
actorSelection(actorRef.path().toString());
- doReturn(Futures.successful(getSystem().actorSelection(actorRef.path()))).
+ doReturn(primaryShardInfoReply(getSystem(), actorRef)).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
doReturn(Futures.successful(new Object())).when(mockActorContext).executeOperationAsync(
when(mockActorContext).actorSelection(shardActorRef.path().toString());
if(shardFound) {
- doReturn(Futures.successful(actorSystem.actorSelection(shardActorRef.path()))).
+ doReturn(primaryShardInfoReply(actorSystem, shardActorRef)).
when(mockActorContext).findPrimaryShardAsync(eq(DefaultShardStrategy.DEFAULT_SHARD));
} else {
doReturn(Futures.failed(new PrimaryNotFoundException("test")))
doReturn(getSystem().actorSelection(shardActorRef.path())).
when(mockActorContext).actorSelection(shardActorRef.path().toString());
- doReturn(Futures.successful(getSystem().actorSelection(shardActorRef.path()))).
+ doReturn(primaryShardInfoReply(getSystem(), shardActorRef)).
when(mockActorContext).findPrimaryShardAsync(eq(shardName));
doReturn(true).when(mockActorContext).isPathLocal(shardActorRef.path().toString());
import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.ShardDataTree;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
import org.opendaylight.controller.cluster.datastore.ShardTestKit;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
String transactionID1 = "tx1";
MutableCompositeModification modification1 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+ ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
String transactionID2 = "tx2";
MutableCompositeModification modification2 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+ ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
modification2);
String transactionID3 = "tx3";
MutableCompositeModification modification3 = new MutableCompositeModification();
- DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+ ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
package org.opendaylight.controller.cluster.datastore.modification;
import static org.junit.Assert.assertEquals;
-import org.apache.commons.lang.SerializationUtils;
+import org.apache.commons.lang3.SerializationUtils;
import org.junit.Test;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
assertEquals("getPath", writePath, write.getPath());
assertEquals("getData", writeData, write.getData());
- ModificationPayload cloned =
- (ModificationPayload) SerializationUtils.clone(payload);
+ ModificationPayload cloned = SerializationUtils.clone(payload);
deserialized = (MutableCompositeModification) payload.getModification();
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+ final String expPrimaryPath = "akka://test-system/find-primary-shard";
ActorContext actorContext =
new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
mock(Configuration.class), dataStoreContext) {
@Override
protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
- return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
+ return Futures.successful((Object) new PrimaryFound(expPrimaryPath));
}
};
- Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
- ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+ PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
assertNotNull(actual);
+ assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
+ assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+ expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
- Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
- ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+ PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
- assertEquals(cachedSelection, actual);
+ assertEquals(cachedInfo, actual);
// Wait for 200 Milliseconds. The cached entry should have been removed.
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
- cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+ cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
assertNull(cached);
};
- Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
try {
Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
}
- Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
assertNull(cached);
}
};
- Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+ Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
try {
Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
}
- Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+ Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
assertNull(cached);
}
*/
package org.opendaylight.controller.md.cluster.datastore.model;
+import com.google.common.io.Resources;
+import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
-import java.util.Set;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.parser.api.YangSyntaxErrorException;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
public class TestModel {
- public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
- "test");
-
- public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
- "junk");
-
-
- public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
- public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
- public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
- public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
- public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
- public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
- public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
- private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
-
- public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
- public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
- public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
- node(OUTER_LIST_QNAME).build();
- public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
- node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
- public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
- public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
-
-
- public static final InputStream getDatastoreTestInputStream() {
- return getInputStream(DATASTORE_TEST_YANG);
- }
-
- private static InputStream getInputStream(final String resourceName) {
- return TestModel.class.getResourceAsStream(DATASTORE_TEST_YANG);
- }
-
- public static SchemaContext createTestContext() {
- YangParserImpl parser = new YangParserImpl();
- Set<Module> modules = parser.parseYangModelsFromStreams(Collections.singletonList(getDatastoreTestInputStream()));
- return parser.resolveSchemaContext(modules);
- }
+ public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
+ "test");
+
+ public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
+ "junk");
+
+
+ public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list");
+ public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list");
+ public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice");
+ public static final QName ID_QNAME = QName.create(TEST_QNAME, "id");
+ public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name");
+ public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc");
+ public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value");
+ private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
+
+ public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+ public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
+ public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+ node(OUTER_LIST_QNAME).build();
+ public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
+ node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build();
+ public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two");
+ public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three");
+
+
+ public static final InputStream getDatastoreTestInputStream() {
+ return TestModel.class.getResourceAsStream(DATASTORE_TEST_YANG);
+ }
+
+ public static SchemaContext createTestContext() {
+ YangParserImpl parser = new YangParserImpl();
+ try {
+ return parser.parseSources(Collections.singleton(Resources.asByteSource(TestModel.class.getResource(DATASTORE_TEST_YANG))));
+ } catch (IOException | YangSyntaxErrorException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertNotNull;
+import static junit.framework.Assert.assertTrue;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
+import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ForwardingExecutorService;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitDeadlockException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.TestModel;
+import org.opendaylight.controller.sal.core.spi.data.DOMStore;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.util.concurrent.DeadlockDetectingListeningExecutorService;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class DOMDataTreeListenerTest {
+
+ private SchemaContext schemaContext;
+ private AbstractDOMDataBroker domBroker;
+ private ListeningExecutorService executor;
+ private ExecutorService futureExecutor;
+ private CommitExecutorService commitExecutor;
+
+ private static final DataContainerChild<?, ?> OUTER_LIST = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
+ .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1))
+ .build();
+
+ private static final DataContainerChild<?, ?> OUTER_LIST_2 = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
+ .withChild(ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2))
+ .build();
+
+ private static final NormalizedNode<?, ?> TEST_CONTAINER = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+ .withChild(OUTER_LIST)
+ .build();
+
+ private static final NormalizedNode<?, ?> TEST_CONTAINER_2 = Builders.containerBuilder()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME))
+ .withChild(OUTER_LIST_2)
+ .build();
+
+ private static DOMDataTreeIdentifier ROOT_DATA_TREE_ID = new DOMDataTreeIdentifier(
+ LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+
+ private static DOMDataTreeIdentifier OUTER_LIST_DATA_TREE_ID = new DOMDataTreeIdentifier(
+ LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH);
+
+ @Before
+ public void setupStore() {
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
+ MoreExecutors.newDirectExecutorService());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
+ MoreExecutors.newDirectExecutorService());
+ schemaContext = TestModel.createTestContext();
+
+ operStore.onGlobalContextUpdated(schemaContext);
+ configStore.onGlobalContextUpdated(schemaContext);
+
+ ImmutableMap<LogicalDatastoreType, DOMStore> stores = ImmutableMap.<LogicalDatastoreType, DOMStore>builder() //
+ .put(CONFIGURATION, configStore) //
+ .put(OPERATIONAL, operStore) //
+ .build();
+
+ commitExecutor = new CommitExecutorService(Executors.newSingleThreadExecutor());
+ futureExecutor = SpecialExecutors.newBlockingBoundedCachedThreadPool(1, 5, "FCB");
+ executor = new DeadlockDetectingListeningExecutorService(commitExecutor,
+ TransactionCommitDeadlockException.DEADLOCK_EXCEPTION_SUPPLIER, futureExecutor);
+ domBroker = new SerializedDOMDataBroker(stores, executor);
+ }
+
+ @After
+ public void tearDown() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+
+ if (futureExecutor != null) {
+ futureExecutor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void writeContainerEmptyTreeTest() throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+ assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+ dataTreeChangeService);
+
+ final TestDataTreeListener listener = new TestDataTreeListener(latch);
+ final ListenerRegistration<TestDataTreeListener> listenerReg =
+ dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener);
+
+ final DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+ writeTx.submit();
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertEquals(1, listener.getReceivedChanges().size());
+ final Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+ assertEquals(1, changes.size());
+
+ DataTreeCandidate candidate = changes.iterator().next();
+ assertNotNull(candidate);
+ DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+ checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot);
+ listenerReg.close();
+ }
+
+ @Test
+ public void replaceContainerContainerInTreeTest() throws InterruptedException, TransactionCommitFailedException {
+ CountDownLatch latch = new CountDownLatch(2);
+
+ DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+ assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+ dataTreeChangeService);
+
+ DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+ writeTx.submit().checkedGet();
+
+ final TestDataTreeListener listener = new TestDataTreeListener(latch);
+ final ListenerRegistration<TestDataTreeListener> listenerReg =
+ dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener);
+ writeTx = domBroker.newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER_2);
+ writeTx.submit();
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertEquals(2, listener.getReceivedChanges().size());
+ Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+ assertEquals(1, changes.size());
+
+ DataTreeCandidate candidate = changes.iterator().next();
+ assertNotNull(candidate);
+ DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+ checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot);
+
+ changes = listener.getReceivedChanges().get(1);
+ assertEquals(1, changes.size());
+
+ candidate = changes.iterator().next();
+ assertNotNull(candidate);
+ candidateRoot = candidate.getRootNode();
+ checkChange(TEST_CONTAINER, TEST_CONTAINER_2, ModificationType.WRITE, candidateRoot);
+ listenerReg.close();
+ }
+
+ @Test
+ public void deleteContainerContainerInTreeTest() throws InterruptedException, TransactionCommitFailedException {
+ CountDownLatch latch = new CountDownLatch(2);
+
+ DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+ assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+ dataTreeChangeService);
+
+ DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+ writeTx.submit().checkedGet();
+
+ final TestDataTreeListener listener = new TestDataTreeListener(latch);
+ final ListenerRegistration<TestDataTreeListener> listenerReg =
+ dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener);
+
+ writeTx = domBroker.newWriteOnlyTransaction();
+ writeTx.delete(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
+ writeTx.submit();
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertEquals(2, listener.getReceivedChanges().size());
+ Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+ assertEquals(1, changes.size());
+
+ DataTreeCandidate candidate = changes.iterator().next();
+ assertNotNull(candidate);
+ DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+ checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot);
+
+ changes = listener.getReceivedChanges().get(1);
+ assertEquals(1, changes.size());
+
+ candidate = changes.iterator().next();
+ assertNotNull(candidate);
+ candidateRoot = candidate.getRootNode();
+ checkChange(TEST_CONTAINER, null, ModificationType.DELETE, candidateRoot);
+ listenerReg.close();
+ }
+
+ @Test
+ public void replaceChildListContainerInTreeTest() throws InterruptedException, TransactionCommitFailedException {
+ CountDownLatch latch = new CountDownLatch(2);
+
+ DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+ assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+ dataTreeChangeService);
+
+ DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+ writeTx.submit().checkedGet();
+
+ final TestDataTreeListener listener = new TestDataTreeListener(latch);
+ final ListenerRegistration<TestDataTreeListener> listenerReg =
+ dataTreeChangeService.registerDataTreeChangeListener(ROOT_DATA_TREE_ID, listener);
+
+ writeTx = domBroker.newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH, OUTER_LIST_2);
+ writeTx.submit();
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertEquals(2, listener.getReceivedChanges().size());
+ Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+ assertEquals(1, changes.size());
+
+ DataTreeCandidate candidate = changes.iterator().next();
+ assertNotNull(candidate);
+ DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+ checkChange(null, TEST_CONTAINER, ModificationType.WRITE, candidateRoot);
+
+ changes = listener.getReceivedChanges().get(1);
+ assertEquals(1, changes.size());
+
+ candidate = changes.iterator().next();
+ assertNotNull(candidate);
+ candidateRoot = candidate.getRootNode();
+ checkChange(TEST_CONTAINER, TEST_CONTAINER_2, ModificationType.SUBTREE_MODIFIED, candidateRoot);
+ final DataTreeCandidateNode modifiedChild = candidateRoot.getModifiedChild(
+ new YangInstanceIdentifier.NodeIdentifier(TestModel.OUTER_LIST_QNAME));
+ assertNotNull(modifiedChild);
+ checkChange(OUTER_LIST, OUTER_LIST_2, ModificationType.WRITE, modifiedChild);
+ listenerReg.close();
+ }
+
+ @Test
+ public void rootModificationChildListenerTest() throws InterruptedException, TransactionCommitFailedException {
+ CountDownLatch latch = new CountDownLatch(2);
+
+ DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+ assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+ dataTreeChangeService);
+
+ DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+ writeTx.submit().checkedGet();
+
+ final TestDataTreeListener listener = new TestDataTreeListener(latch);
+ final ListenerRegistration<TestDataTreeListener> listenerReg =
+ dataTreeChangeService.registerDataTreeChangeListener(OUTER_LIST_DATA_TREE_ID, listener);
+
+ writeTx = domBroker.newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER_2);
+ writeTx.submit().checkedGet();
+
+ latch.await(1, TimeUnit.SECONDS);
+
+ assertEquals(2, listener.getReceivedChanges().size());
+ Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+ assertEquals(1, changes.size());
+
+ DataTreeCandidate candidate = changes.iterator().next();
+ assertNotNull(candidate);
+ DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+ checkChange(null, OUTER_LIST, ModificationType.WRITE, candidateRoot);
+
+ changes = listener.getReceivedChanges().get(1);
+ assertEquals(1, changes.size());
+
+ candidate = changes.iterator().next();
+ assertNotNull(candidate);
+ candidateRoot = candidate.getRootNode();
+ checkChange(OUTER_LIST, OUTER_LIST_2, ModificationType.WRITE, candidateRoot);
+ listenerReg.close();
+ }
+
+ @Test
+ public void listEntryChangeNonRootRegistrationTest() throws InterruptedException, TransactionCommitFailedException {
+ CountDownLatch latch = new CountDownLatch(2);
+
+ DOMDataTreeChangeService dataTreeChangeService = getDOMDataTreeChangeService();
+ assertNotNull("DOMDataTreeChangeService not found, cannot continue with test!",
+ dataTreeChangeService);
+
+ DOMDataWriteTransaction writeTx = domBroker.newWriteOnlyTransaction();
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH, TEST_CONTAINER);
+ writeTx.submit().checkedGet();
+
+ final TestDataTreeListener listener = new TestDataTreeListener(latch);
+ final ListenerRegistration<TestDataTreeListener> listenerReg =
+ dataTreeChangeService.registerDataTreeChangeListener(OUTER_LIST_DATA_TREE_ID, listener);
+
+ final YangInstanceIdentifier.NodeIdentifierWithPredicates outerListEntryId1 =
+ new YangInstanceIdentifier.NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1);
+ final YangInstanceIdentifier.NodeIdentifierWithPredicates outerListEntryId2 =
+ new YangInstanceIdentifier.NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2);
+ final YangInstanceIdentifier.NodeIdentifierWithPredicates outerListEntryId3 =
+ new YangInstanceIdentifier.NodeIdentifierWithPredicates(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 3);
+
+ final MapEntryNode outerListEntry1 = ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1);
+ final MapEntryNode outerListEntry2 = ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2);
+ final MapEntryNode outerListEntry3 = ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 3);
+
+ final MapNode listAfter = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
+ .withChild(outerListEntry2)
+ .withChild(outerListEntry3)
+ .build();
+
+ writeTx = domBroker.newWriteOnlyTransaction();
+ writeTx.delete(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH.node(outerListEntryId1));
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH.node(outerListEntryId2),
+ outerListEntry2);
+ writeTx.put(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_LIST_PATH.node(outerListEntryId3),
+ outerListEntry3);
+ writeTx.submit();
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertEquals(2, listener.getReceivedChanges().size());
+ Collection<DataTreeCandidate> changes = listener.getReceivedChanges().get(0);
+ assertEquals(1, changes.size());
+
+ DataTreeCandidate candidate = changes.iterator().next();
+ assertNotNull(candidate);
+ DataTreeCandidateNode candidateRoot = candidate.getRootNode();
+ checkChange(null, OUTER_LIST, ModificationType.WRITE, candidateRoot);
+
+ changes = listener.getReceivedChanges().get(1);
+ assertEquals(1, changes.size());
+
+ candidate = changes.iterator().next();
+ assertNotNull(candidate);
+ candidateRoot = candidate.getRootNode();
+ checkChange(OUTER_LIST, listAfter, ModificationType.SUBTREE_MODIFIED, candidateRoot);
+ final DataTreeCandidateNode entry1Canditate = candidateRoot.getModifiedChild(outerListEntryId1);
+ checkChange(outerListEntry1, null, ModificationType.DELETE, entry1Canditate);
+ final DataTreeCandidateNode entry2Canditate = candidateRoot.getModifiedChild(outerListEntryId2);
+ checkChange(null, outerListEntry2, ModificationType.WRITE, entry2Canditate);
+ final DataTreeCandidateNode entry3Canditate = candidateRoot.getModifiedChild(outerListEntryId3);
+ checkChange(null, outerListEntry3, ModificationType.WRITE, entry3Canditate);
+ listenerReg.close();
+ }
+
+ private static void checkChange(NormalizedNode<?, ?> expectedBefore,
+ NormalizedNode<?, ?> expectedAfter,
+ ModificationType expectedMod,
+ DataTreeCandidateNode candidateNode) {
+ if (expectedBefore != null) {
+ assertTrue(candidateNode.getDataBefore().isPresent());
+ assertEquals(expectedBefore, candidateNode.getDataBefore().get());
+ } else {
+ assertFalse(candidateNode.getDataBefore().isPresent());
+ }
+
+ if (expectedAfter != null) {
+ assertTrue(candidateNode.getDataAfter().isPresent());
+ assertEquals(expectedAfter, candidateNode.getDataAfter().get());
+ } else {
+ assertFalse(candidateNode.getDataAfter().isPresent());
+ }
+
+ assertEquals(expectedMod, candidateNode.getModificationType());
+ }
+
+ private DOMDataTreeChangeService getDOMDataTreeChangeService() {
+ final DOMDataBrokerExtension extension = domBroker.getSupportedExtensions()
+ .get(DOMDataTreeChangeService.class);
+ if (extension == null) {
+ return null;
+ }
+ DOMDataTreeChangeService dataTreeChangeService = null;
+ if (extension instanceof DOMDataTreeChangeService) {
+ dataTreeChangeService = (DOMDataTreeChangeService) extension;
+ }
+ return dataTreeChangeService;
+ }
+
+
+ static class CommitExecutorService extends ForwardingExecutorService {
+
+ ExecutorService delegate;
+
+ public CommitExecutorService(final ExecutorService delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ protected ExecutorService delegate() {
+ return delegate;
+ }
+ }
+
+ static class TestDataTreeListener implements DOMDataTreeChangeListener {
+
+ private List<Collection<DataTreeCandidate>> receivedChanges = new ArrayList<>();
+ private CountDownLatch latch;
+
+ public TestDataTreeListener(final CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void onDataTreeChanged(@Nonnull final Collection<DataTreeCandidate> changes) {
+ receivedChanges.add(changes);
+ latch.countDown();
+ }
+
+ public List<Collection<DataTreeCandidate>> getReceivedChanges() {
+ return receivedChanges;
+ }
+ }
+}
* @return The context in which this transaction was allocated, or null
* if the context was not recorded.
*/
- @Nullable public final Throwable getDebugContext() {
+ @Nullable
+ public final Throwable getDebugContext() {
return debugContext;
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.spi.data;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract implementation of the {@link DOMStoreTransactionChain} interface relying on {@link DataTreeSnapshot} supplier
+ * and backend commit coordinator.
+ *
+ * @param <T> transaction identifier type
+ */
+@Beta
+public abstract class AbstractSnapshotBackedTransactionChain<T> extends TransactionReadyPrototype<T> implements DOMStoreTransactionChain {
+ private static abstract class State {
+ /**
+ * Allocate a new snapshot.
+ *
+ * @return A new snapshot
+ */
+ protected abstract DataTreeSnapshot getSnapshot();
+ }
+
+ private static final class Idle extends State {
+ private final AbstractSnapshotBackedTransactionChain<?> chain;
+
+ Idle(final AbstractSnapshotBackedTransactionChain<?> chain) {
+ this.chain = Preconditions.checkNotNull(chain);
+ }
+
+ @Override
+ protected DataTreeSnapshot getSnapshot() {
+ return chain.takeSnapshot();
+ }
+ }
+
+ /**
+ * We have a transaction out there.
+ */
+ private static final class Allocated extends State {
+ private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
+ private final DOMStoreWriteTransaction transaction;
+ private volatile DataTreeSnapshot snapshot;
+
+ Allocated(final DOMStoreWriteTransaction transaction) {
+ this.transaction = Preconditions.checkNotNull(transaction);
+ }
+
+ public DOMStoreWriteTransaction getTransaction() {
+ return transaction;
+ }
+
+ @Override
+ protected DataTreeSnapshot getSnapshot() {
+ final DataTreeSnapshot ret = snapshot;
+ Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
+ return ret;
+ }
+
+ void setSnapshot(final DataTreeSnapshot snapshot) {
+ final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
+ Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier());
+ }
+ }
+
+ /**
+ * Chain is logically shut down, no further allocation allowed.
+ */
+ private static final class Shutdown extends State {
+ private final String message;
+
+ Shutdown(final String message) {
+ this.message = Preconditions.checkNotNull(message);
+ }
+
+ @Override
+ protected DataTreeSnapshot getSnapshot() {
+ throw new IllegalStateException(message);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static final AtomicReferenceFieldUpdater<AbstractSnapshotBackedTransactionChain, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractSnapshotBackedTransactionChain.class, State.class, "state");
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractSnapshotBackedTransactionChain.class);
+ private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed");
+ private static final Shutdown FAILED = new Shutdown("Transaction chain has failed");
+ private final Idle idleState;
+ private volatile State state;
+
+ protected AbstractSnapshotBackedTransactionChain() {
+ idleState = new Idle(this);
+ state = idleState;
+ }
+
+ private Entry<State, DataTreeSnapshot> getSnapshot() {
+ final State localState = state;
+ return new SimpleEntry<>(localState, localState.getSnapshot());
+ }
+
+ private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
+ final State state = new Allocated(transaction);
+ return STATE_UPDATER.compareAndSet(this, expected, state);
+ }
+
+ @Override
+ public final DOMStoreReadTransaction newReadOnlyTransaction() {
+ final Entry<State, DataTreeSnapshot> entry = getSnapshot();
+ return SnapshotBackedTransactions.newReadTransaction(nextTransactionIdentifier(), getDebugTransactions(), entry.getValue());
+ }
+
+ @Override
+ public final DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ Entry<State, DataTreeSnapshot> entry;
+ DOMStoreReadWriteTransaction ret;
+
+ do {
+ entry = getSnapshot();
+ ret = new SnapshotBackedReadWriteTransaction<T>(nextTransactionIdentifier(),
+ getDebugTransactions(), entry.getValue(), this);
+ } while (!recordTransaction(entry.getKey(), ret));
+
+ return ret;
+ }
+
+ @Override
+ public final DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ Entry<State, DataTreeSnapshot> entry;
+ DOMStoreWriteTransaction ret;
+
+ do {
+ entry = getSnapshot();
+ ret = new SnapshotBackedWriteTransaction<T>(nextTransactionIdentifier(),
+ getDebugTransactions(), entry.getValue(), this);
+ } while (!recordTransaction(entry.getKey(), ret));
+
+ return ret;
+ }
+
+ @Override
+ protected final void transactionAborted(final SnapshotBackedWriteTransaction<T> tx) {
+ final State localState = state;
+ if (localState instanceof Allocated) {
+ final Allocated allocated = (Allocated)localState;
+ if (allocated.getTransaction().equals(tx)) {
+ final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
+ if (!success) {
+ LOG.warn("Transaction {} aborted, but chain {} state already transitioned from {} to {}, very strange",
+ tx, this, localState, state);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected final DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<T> tx, final DataTreeModification tree) {
+ final State localState = state;
+
+ if (localState instanceof Allocated) {
+ final Allocated allocated = (Allocated)localState;
+ final DOMStoreWriteTransaction transaction = allocated.getTransaction();
+ Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
+ allocated.setSnapshot(tree);
+ } else {
+ LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
+ }
+
+ return createCohort(tx, tree);
+ }
+
+ @Override
+ public final void close() {
+ final State localState = state;
+
+ do {
+ Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this);
+
+ if (FAILED.equals(localState)) {
+ LOG.debug("Ignoring user close in failed state");
+ return;
+ }
+ } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED));
+ }
+
+ /**
+ * Notify the base logic that a previously-submitted transaction has been committed successfully.
+ *
+ * @param transaction Transaction which completed successfully.
+ */
+ protected final void onTransactionCommited(final SnapshotBackedWriteTransaction<T> transaction) {
+ // If the committed transaction was the one we allocated last,
+ // we clear it and the ready snapshot, so the next transaction
+ // allocated refers to the data tree directly.
+ final State localState = state;
+
+ if (!(localState instanceof Allocated)) {
+ // This can legally happen if the chain is shut down before the transaction was committed
+ // by the backend.
+ LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
+ return;
+ }
+
+ final Allocated allocated = (Allocated)localState;
+ final DOMStoreWriteTransaction tx = allocated.getTransaction();
+ if (!tx.equals(transaction)) {
+ LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
+ return;
+ }
+
+ if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
+ LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state);
+ }
+ }
+
+ /**
+ * Notify the base logic that a previously-submitted transaction has failed.
+ *
+ * @param transaction Transaction which failed.
+ * @param cause Failure cause
+ */
+ protected final void onTransactionFailed(final SnapshotBackedWriteTransaction<T> transaction, final Throwable cause) {
+ LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, cause);
+ state = FAILED;
+ }
+
+ /**
+ * Return the next transaction identifier.
+ *
+ * @return transaction identifier.
+ */
+ protected abstract T nextTransactionIdentifier();
+
+ /**
+ * Inquire as to whether transactions should record their allocation context.
+ *
+ * @return True if allocation context should be recorded.
+ */
+ protected abstract boolean getDebugTransactions();
+
+ /**
+ * Take a fresh {@link DataTreeSnapshot} from the backend.
+ *
+ * @return A new snapshot.
+ */
+ protected abstract DataTreeSnapshot takeSnapshot();
+
+ /**
+ * Create a cohort for driving the transaction through the commit process.
+ *
+ * @param transaction Transaction handle
+ * @param modification {@link DataTreeModification} which needs to be applied to the backend
+ * @return A {@link DOMStoreThreePhaseCommitCohort} cohort.
+ */
+ protected abstract DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<T> transaction, final DataTreeModification modification);
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.annotations.Beta;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
* Implementation of read-only transaction backed by {@link DataTreeSnapshot}
* which delegates most of its calls to similar methods provided by underlying snapshot.
*
+ * <T> identifier type
*/
-final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction<Object>
- implements DOMStoreReadTransaction {
-
+@Beta
+public final class SnapshotBackedReadTransaction<T> extends AbstractDOMStoreTransaction<T> implements DOMStoreReadTransaction {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadTransaction.class);
private volatile DataTreeSnapshot stableSnapshot;
- public SnapshotBackedReadTransaction(final Object identifier, final boolean debug, final DataTreeSnapshot snapshot) {
+ /**
+ * Creates a new read-only transaction.
+ *
+ * @param identifier Transaction Identifier
+ * @param debug Enable transaction debugging
+ * @param snapshot Snapshot which will be modified.
+ */
+ SnapshotBackedReadTransaction(final T identifier, final boolean debug, final DataTreeSnapshot snapshot) {
super(identifier, debug);
this.stableSnapshot = Preconditions.checkNotNull(snapshot);
LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot);
checkNotNull(path, "Path must not be null.");
try {
- return Futures.immediateCheckedFuture(
- read(path).checkedGet().isPresent());
+ return Futures.immediateCheckedFuture(read(path).checkedGet().isPresent());
} catch (ReadFailedException e) {
return Futures.immediateFailedCheckedFuture(e);
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.annotations.Beta;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
* Implementation of Read-Write transaction which is backed by {@link DataTreeSnapshot}
* and executed according to {@link TransactionReadyPrototype}.
*
+ * @param <T> identifier type
*/
-final class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements DOMStoreReadWriteTransaction {
+@Beta
+public final class SnapshotBackedReadWriteTransaction<T> extends SnapshotBackedWriteTransaction<T> implements DOMStoreReadWriteTransaction {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadWriteTransaction.class);
- /**
- * Creates new read-write transaction.
- *
- * @param identifier transaction Identifier
- * @param snapshot Snapshot which will be modified.
- * @param readyImpl Implementation of ready method.
- */
- protected SnapshotBackedReadWriteTransaction(final Object identifier, final boolean debug,
- final DataTreeSnapshot snapshot, final TransactionReadyPrototype store) {
- super(identifier, debug, snapshot, store);
+ SnapshotBackedReadWriteTransaction(final T identifier, final boolean debug,
+ final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+ super(identifier, debug, snapshot, readyImpl);
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.spi.data;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+/**
+ * Public utility class for instantiating snapshot-backed transactions.
+ */
+@Beta
+public final class SnapshotBackedTransactions {
+ private SnapshotBackedTransactions() {
+ throw new UnsupportedOperationException("Utility class");
+ }
+
+ /**
+ * Creates a new read-only transaction.
+ *
+ * @param identifier Transaction Identifier
+ * @param debug Enable transaction debugging
+ * @param snapshot Snapshot which will be modified.
+ */
+ public static <T> SnapshotBackedReadTransaction<T> newReadTransaction(final T identifier, final boolean debug, final DataTreeSnapshot snapshot) {
+ return new SnapshotBackedReadTransaction<T>(identifier, debug, snapshot);
+ }
+
+ /**
+ * Creates a new read-write transaction.
+ *
+ * @param identifier transaction Identifier
+ * @param debug Enable transaction debugging
+ * @param snapshot Snapshot which will be modified.
+ * @param readyImpl Implementation of ready method.
+ */
+ public static <T> SnapshotBackedReadWriteTransaction<T> newReadWriteTransaction(final T identifier, final boolean debug,
+ final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+ return new SnapshotBackedReadWriteTransaction<T>(identifier, debug, snapshot, readyImpl);
+ }
+
+ /**
+ * Creates a new write-only transaction.
+ *
+ * @param identifier transaction Identifier
+ * @param debug Enable transaction debugging
+ * @param snapshot Snapshot which will be modified.
+ * @param readyImpl Implementation of ready method.
+ */
+ public static <T> SnapshotBackedWriteTransaction<T> newWriteTransaction(final T identifier, final boolean debug,
+ final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+ return new SnapshotBackedWriteTransaction<T>(identifier, debug, snapshot, readyImpl);
+ }
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.annotations.Beta;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
* Implementation of Write transaction which is backed by
* {@link DataTreeSnapshot} and executed according to
- * {@link org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype}.
+ * {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype}.
*
+ * @param <T> Identifier type
*/
-class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object> implements DOMStoreWriteTransaction {
+@Beta
+public class SnapshotBackedWriteTransaction<T> extends AbstractDOMStoreTransaction<T> implements DOMStoreWriteTransaction {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedWriteTransaction.class);
+ @SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, TransactionReadyPrototype> READY_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, TransactionReadyPrototype.class, "readyImpl");
+ @SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, DataTreeModification> TREE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, DataTreeModification.class, "mutableTree");
// non-null when not ready
- private volatile TransactionReadyPrototype readyImpl;
+ private volatile TransactionReadyPrototype<T> readyImpl;
// non-null when not committed/closed
private volatile DataTreeModification mutableTree;
- /**
- * Creates new write-only transaction.
- *
- * @param identifier
- * transaction Identifier
- * @param snapshot
- * Snapshot which will be modified.
- * @param readyImpl
- * Implementation of ready method.
- */
- public SnapshotBackedWriteTransaction(final Object identifier, final boolean debug,
- final DataTreeSnapshot snapshot, final TransactionReadyPrototype readyImpl) {
+ SnapshotBackedWriteTransaction(final T identifier, final boolean debug,
+ final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
super(identifier, debug);
this.readyImpl = Preconditions.checkNotNull(readyImpl, "readyImpl must not be null.");
mutableTree = snapshot.newModification();
* @param path Path to read
* @return null if the the transaction has been closed;
*/
- protected final Optional<NormalizedNode<?, ?>> readSnapshotNode(final YangInstanceIdentifier path) {
+ final Optional<NormalizedNode<?, ?>> readSnapshotNode(final YangInstanceIdentifier path) {
return readyImpl == null ? null : mutableTree.readNode(path);
}
@Override
public DOMStoreThreePhaseCommitCohort ready() {
- final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null);
+ @SuppressWarnings("unchecked")
+ final TransactionReadyPrototype<T> wasReady = READY_UPDATER.getAndSet(this, null);
checkState(wasReady != null, "Transaction %s is no longer open", getIdentifier());
LOG.debug("Store transaction: {} : Ready", getIdentifier());
@Override
public void close() {
- final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null);
+ @SuppressWarnings("unchecked")
+ final TransactionReadyPrototype<T> wasReady = READY_UPDATER.getAndSet(this, null);
if (wasReady != null) {
LOG.debug("Store transaction: {} : Closed", getIdentifier());
TREE_UPDATER.lazySet(this, null);
/**
* Prototype implementation of
- * {@link #ready(org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction)}
+ * {@link #ready(org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction)}
*
* This class is intended to be implemented by Transaction factories
- * responsible for allocation of {@link org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction} and
+ * responsible for allocation of {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction} and
* providing underlying logic for applying implementation.
*
+ * @param <T> identifier type
*/
- abstract static class TransactionReadyPrototype {
+ public abstract static class TransactionReadyPrototype<T> {
/**
* Called when a transaction is closed without being readied. This is not invoked for
* transactions which are ready.
*
* @param tx Transaction which got aborted.
*/
- protected abstract void transactionAborted(final SnapshotBackedWriteTransaction tx);
+ protected abstract void transactionAborted(final SnapshotBackedWriteTransaction<T> tx);
/**
* Returns a commit coordinator associated with supplied transactions.
* Modified data tree which has been constructed.
* @return DOMStoreThreePhaseCommitCohort associated with transaction
*/
- protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction tx, DataTreeModification tree);
+ protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction<T> tx, DataTreeModification tree);
}
}
\ No newline at end of file
package org.opendaylight.controller.md.sal.dom.store.impl;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-final class ChainedTransactionCommitImpl extends ForwardingDOMStoreThreePhaseCommitCohort {
- private final SnapshotBackedWriteTransaction transaction;
- private final DOMStoreThreePhaseCommitCohort delegate;
+final class ChainedTransactionCommitImpl extends InMemoryDOMStoreThreePhaseCommitCohort {
private final DOMStoreTransactionChainImpl txChain;
- ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
- final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
- this.transaction = Preconditions.checkNotNull(transaction);
- this.delegate = Preconditions.checkNotNull(delegate);
+ ChainedTransactionCommitImpl(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction<String> transaction,
+ final DataTreeModification modification, final DOMStoreTransactionChainImpl txChain) {
+ super(store, transaction, modification);
this.txChain = Preconditions.checkNotNull(txChain);
}
- @Override
- protected DOMStoreThreePhaseCommitCohort delegate() {
- return delegate;
- }
-
@Override
public ListenableFuture<Void> commit() {
- ListenableFuture<Void> commitFuture = super.commit();
- Futures.addCallback(commitFuture, new FutureCallback<Void>() {
- @Override
- public void onFailure(final Throwable t) {
- txChain.onTransactionFailed(transaction, t);
- }
-
- @Override
- public void onSuccess(final Void result) {
- txChain.onTransactionCommited(transaction);
- }
- });
- return commitFuture;
+ ListenableFuture<Void> ret = super.commit();
+ txChain.transactionCommited(getTransaction());
+ return ret;
}
}
\ No newline at end of file
package org.opendaylight.controller.md.sal.dom.store.impl;
import com.google.common.base.Preconditions;
-import java.util.AbstractMap.SimpleEntry;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-final class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
- private static abstract class State {
- /**
- * Allocate a new snapshot.
- *
- * @return A new snapshot
- */
- protected abstract DataTreeSnapshot getSnapshot();
- }
-
- private static final class Idle extends State {
- private final InMemoryDOMDataStore store;
-
- Idle(final InMemoryDOMDataStore store) {
- this.store = Preconditions.checkNotNull(store);
- }
-
- @Override
- protected DataTreeSnapshot getSnapshot() {
- return store.takeSnapshot();
- }
- }
-
- /**
- * We have a transaction out there.
- */
- private static final class Allocated extends State {
- private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
- private final DOMStoreWriteTransaction transaction;
- private volatile DataTreeSnapshot snapshot;
-
- Allocated(final DOMStoreWriteTransaction transaction) {
- this.transaction = Preconditions.checkNotNull(transaction);
- }
-
- public DOMStoreWriteTransaction getTransaction() {
- return transaction;
- }
-
- @Override
- protected DataTreeSnapshot getSnapshot() {
- final DataTreeSnapshot ret = snapshot;
- Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
- return ret;
- }
-
- void setSnapshot(final DataTreeSnapshot snapshot) {
- final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
- Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier());
- }
- }
-
- /**
- * Chain is logically shut down, no further allocation allowed.
- */
- private static final class Shutdown extends State {
- private final String message;
-
- Shutdown(final String message) {
- this.message = Preconditions.checkNotNull(message);
- }
-
- @Override
- protected DataTreeSnapshot getSnapshot() {
- throw new IllegalStateException(message);
- }
- }
-
- private static final AtomicReferenceFieldUpdater<DOMStoreTransactionChainImpl, State> STATE_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(DOMStoreTransactionChainImpl.class, State.class, "state");
- private static final Logger LOG = LoggerFactory.getLogger(DOMStoreTransactionChainImpl.class);
- private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed");
- private static final Shutdown FAILED = new Shutdown("Transaction chain has failed");
+final class DOMStoreTransactionChainImpl extends AbstractSnapshotBackedTransactionChain<String> {
private final InMemoryDOMDataStore store;
- private final Idle idleState;
- private volatile State state;
DOMStoreTransactionChainImpl(final InMemoryDOMDataStore store) {
this.store = Preconditions.checkNotNull(store);
- idleState = new Idle(store);
- state = idleState;
- }
-
- private Entry<State, DataTreeSnapshot> getSnapshot() {
- final State localState = state;
- return new SimpleEntry<>(localState, localState.getSnapshot());
- }
-
- private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
- final State state = new Allocated(transaction);
- return STATE_UPDATER.compareAndSet(this, expected, state);
}
@Override
- public DOMStoreReadTransaction newReadOnlyTransaction() {
- final Entry<State, DataTreeSnapshot> entry = getSnapshot();
- return new SnapshotBackedReadTransaction(store.nextIdentifier(), store.getDebugTransactions(), entry.getValue());
+ protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification modification) {
+ return new ChainedTransactionCommitImpl(store, tx, modification, this);
}
@Override
- public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- Entry<State, DataTreeSnapshot> entry;
- DOMStoreReadWriteTransaction ret;
-
- do {
- entry = getSnapshot();
- ret = new SnapshotBackedReadWriteTransaction(store.nextIdentifier(),
- store.getDebugTransactions(), entry.getValue(), this);
- } while (!recordTransaction(entry.getKey(), ret));
-
- return ret;
- }
-
- @Override
- public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- Entry<State, DataTreeSnapshot> entry;
- DOMStoreWriteTransaction ret;
-
- do {
- entry = getSnapshot();
- ret = new SnapshotBackedWriteTransaction(store.nextIdentifier(),
- store.getDebugTransactions(), entry.getValue(), this);
- } while (!recordTransaction(entry.getKey(), ret));
-
- return ret;
+ protected DataTreeSnapshot takeSnapshot() {
+ return store.takeSnapshot();
}
@Override
- protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
- final State localState = state;
- if (localState instanceof Allocated) {
- final Allocated allocated = (Allocated)localState;
- if (allocated.getTransaction().equals(tx)) {
- final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
- if (!success) {
- LOG.info("State already transitioned from {} to {}", localState, state);
- }
- }
- }
+ protected String nextTransactionIdentifier() {
+ return store.nextIdentifier();
}
@Override
- protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
- final State localState = state;
-
- if (localState instanceof Allocated) {
- final Allocated allocated = (Allocated)localState;
- final DOMStoreWriteTransaction transaction = allocated.getTransaction();
- Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
- allocated.setSnapshot(tree);
- } else {
- LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
- }
-
- return new ChainedTransactionCommitImpl(tx, store.transactionReady(tx, tree), this);
+ protected boolean getDebugTransactions() {
+ return store.getDebugTransactions();
}
- @Override
- public void close() {
- final State localState = state;
-
- do {
- Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this);
-
- if (FAILED.equals(localState)) {
- LOG.debug("Ignoring user close in failed state");
- return;
- }
- } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED));
- }
-
- void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, final Throwable t) {
- LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, t);
- state = FAILED;
- }
-
- void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
- // If the committed transaction was the one we allocated last,
- // we clear it and the ready snapshot, so the next transaction
- // allocated refers to the data tree directly.
- final State localState = state;
-
- if (!(localState instanceof Allocated)) {
- LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
- return;
- }
-
- final Allocated allocated = (Allocated)localState;
- final DOMStoreWriteTransaction tx = allocated.getTransaction();
- if (!tx.equals(transaction)) {
- LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
- return;
- }
-
- if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
- LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state);
- }
+ void transactionCommited(final SnapshotBackedWriteTransaction<String> transaction) {
+ super.onTransactionCommited(transaction);
}
-}
\ No newline at end of file
+}
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
-import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
*
* Implementation of {@link DOMStore} which uses {@link DataTree} and other
* classes such as {@link SnapshotBackedWriteTransaction}.
- * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
+ * {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
* to implement {@link DOMStore} contract.
*
*/
-public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
+public class InMemoryDOMDataStore extends TransactionReadyPrototype<String> implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
- private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
- private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
private static final Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
new Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent>() {
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new SnapshotBackedReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot());
+ return SnapshotBackedTransactions.newReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot());
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new SnapshotBackedReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
+ return SnapshotBackedTransactions.newReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new SnapshotBackedWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
+ return SnapshotBackedTransactions.newWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
}
@Override
}
@Override
- protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+ protected void transactionAborted(final SnapshotBackedWriteTransaction<String> tx) {
LOG.debug("Tx: {} is closed.", tx.getIdentifier());
}
@Override
- protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
- LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), tree);
- return new ThreePhaseCommitImpl(tx, tree);
+ protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification modification) {
+ LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), modification);
+ return new InMemoryDOMStoreThreePhaseCommitCohort(this, tx, modification);
}
- Object nextIdentifier() {
+ String nextIdentifier() {
return name + "-" + txCounter.getAndIncrement();
}
- private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
- final Throwable ctx = transaction.getDebugContext();
- if (ctx != null) {
- LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
- }
+ void validate(final DataTreeModification modification) throws DataValidationFailedException {
+ dataTree.validate(modification);
}
- private final class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
- private final SnapshotBackedWriteTransaction transaction;
- private final DataTreeModification modification;
-
- private ResolveDataChangeEventsTask listenerResolver;
- private DataTreeCandidate candidate;
-
- public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction, final DataTreeModification modification) {
- this.transaction = writeTransaction;
- this.modification = modification;
- }
-
- @Override
- public ListenableFuture<Boolean> canCommit() {
- try {
- dataTree.validate(modification);
- LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
- return CAN_COMMIT_FUTURE;
- } catch (ConflictingModificationAppliedException e) {
- LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
- e.getPath());
- warnDebugContext(transaction);
- return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
- } catch (DataValidationFailedException e) {
- LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
- e.getPath(), e);
- warnDebugContext(transaction);
-
- // For debugging purposes, allow dumping of the modification. Coupled with the above
- // precondition log, it should allow us to understand what went on.
- LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, dataTree);
-
- return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
- } catch (Exception e) {
- LOG.warn("Unexpected failure in validation phase", e);
- return Futures.immediateFailedFuture(e);
- }
- }
-
- @Override
- public ListenableFuture<Void> preCommit() {
- try {
- candidate = dataTree.prepare(modification);
- listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
- return SUCCESSFUL_FUTURE;
- } catch (Exception e) {
- LOG.warn("Unexpected failure in pre-commit phase", e);
- return Futures.immediateFailedFuture(e);
- }
- }
-
- @Override
- public ListenableFuture<Void> abort() {
- candidate = null;
- return SUCCESSFUL_FUTURE;
- }
-
- @Override
- public ListenableFuture<Void> commit() {
- checkState(candidate != null, "Proposed subtree must be computed");
-
- /*
- * The commit has to occur atomically with regard to listener
- * registrations.
- */
- synchronized (InMemoryDOMDataStore.this) {
- dataTree.commit(candidate);
- changePublisher.publishChange(candidate);
- listenerResolver.resolve(dataChangeListenerNotificationManager);
- }
+ DataTreeCandidate prepare(final DataTreeModification modification) {
+ return dataTree.prepare(modification);
+ }
- return SUCCESSFUL_FUTURE;
- }
+ synchronized void commit(final DataTreeCandidate candidate) {
+ dataTree.commit(candidate);
+ changePublisher.publishChange(candidate);
+ ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(dataChangeListenerNotificationManager);
}
}
--- /dev/null
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InMemoryDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMStoreThreePhaseCommitCohort.class);
+ private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
+ private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+ private final SnapshotBackedWriteTransaction<String> transaction;
+ private final DataTreeModification modification;
+ private final InMemoryDOMDataStore store;
+ private DataTreeCandidate candidate;
+
+ public InMemoryDOMStoreThreePhaseCommitCohort(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction<String> writeTransaction, final DataTreeModification modification) {
+ this.transaction = Preconditions.checkNotNull(writeTransaction);
+ this.modification = Preconditions.checkNotNull(modification);
+ this.store = Preconditions.checkNotNull(store);
+ }
+
+ private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
+ final Throwable ctx = transaction.getDebugContext();
+ if (ctx != null) {
+ LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
+ }
+ }
+
+ @Override
+ public final ListenableFuture<Boolean> canCommit() {
+ try {
+ store.validate(modification);
+ LOG.debug("Store Transaction: {} can be committed", getTransaction().getIdentifier());
+ return CAN_COMMIT_FUTURE;
+ } catch (ConflictingModificationAppliedException e) {
+ LOG.warn("Store Tx: {} Conflicting modification for {}.", getTransaction().getIdentifier(),
+ e.getPath());
+ warnDebugContext(getTransaction());
+ return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
+ } catch (DataValidationFailedException e) {
+ LOG.warn("Store Tx: {} Data Precondition failed for {}.", getTransaction().getIdentifier(),
+ e.getPath(), e);
+ warnDebugContext(getTransaction());
+
+ // For debugging purposes, allow dumping of the modification. Coupled with the above
+ // precondition log, it should allow us to understand what went on.
+ LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, store);
+
+ return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in validation phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public final ListenableFuture<Void> preCommit() {
+ try {
+ candidate = store.prepare(modification);
+ return SUCCESSFUL_FUTURE;
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in pre-commit phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public final ListenableFuture<Void> abort() {
+ candidate = null;
+ return SUCCESSFUL_FUTURE;
+ }
+
+ protected final SnapshotBackedWriteTransaction<String> getTransaction() {
+ return transaction;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ checkState(candidate != null, "Proposed subtree must be computed");
+
+ /*
+ * The commit has to occur atomically with regard to listener
+ * registrations.
+ */
+ store.commit(candidate);
+ return SUCCESSFUL_FUTURE;
+ }
+}
\ No newline at end of file
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
public class InMemoryDataStoreTest {
private SchemaContext schemaContext;
Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockSnapshot )
.readNode( Mockito.any( YangInstanceIdentifier.class ) );
- DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction("1", true, mockSnapshot);
+ DOMStoreReadTransaction readTx = SnapshotBackedTransactions.newReadTransaction("1", true, mockSnapshot);
doReadAndThrowEx( readTx );
}
Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockModification )
.readNode( Mockito.any( YangInstanceIdentifier.class ) );
Mockito.doReturn( mockModification ).when( mockSnapshot ).newModification();
- TransactionReadyPrototype mockReady = Mockito.mock( TransactionReadyPrototype.class );
- DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction("1", false, mockSnapshot, mockReady);
+ @SuppressWarnings("unchecked")
+ TransactionReadyPrototype<String> mockReady = Mockito.mock( TransactionReadyPrototype.class );
+ DOMStoreReadTransaction readTx = SnapshotBackedTransactions.newReadWriteTransaction("1", false, mockSnapshot, mockReady);
doReadAndThrowEx( readTx );
}
- private void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable {
-
+ private static void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable {
try {
readTx.read(TestModel.TEST_PATH).get();
} catch( ExecutionException e ) {
* Create rpc implementation capable of handling RPC for monitoring and notifications even before the schemas of remote device are downloaded
*/
static NetconfDeviceRpc getRpcForInitialization(final NetconfDeviceCommunicator listener) {
- return new NetconfDeviceRpc(INIT_SCHEMA_CTX, listener, new NetconfMessageTransformer(INIT_SCHEMA_CTX));
+ return new NetconfDeviceRpc(INIT_SCHEMA_CTX, listener, new NetconfMessageTransformer(INIT_SCHEMA_CTX, false));
}
@VisibleForTesting
void handleSalInitializationSuccess(final SchemaContext result, final NetconfSessionPreferences remoteSessionCapabilities, final DOMRpcService deviceRpc) {
- messageTransformer = new NetconfMessageTransformer(result);
+ messageTransformer = new NetconfMessageTransformer(result, true);
updateTransformer(messageTransformer);
// salFacade.onDeviceConnected has to be called before the notification handler is initialized
}
private NetconfDeviceRpc getDeviceSpecificRpc(final SchemaContext result) {
- return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result));
+ return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
}
private Collection<SourceIdentifier> stripMissingSource(final Collection<SourceIdentifier> requiredSources, final SourceIdentifier sIdToRemove) {
private final Multimap<QName, NotificationDefinition> mappedNotifications;
private final DomToNormalizedNodeParserFactory parserFactory;
- public NetconfMessageTransformer(final SchemaContext schemaContext) {
+ public NetconfMessageTransformer(final SchemaContext schemaContext, final boolean strictParsing) {
this.counter = new MessageCounter();
this.schemaContext = schemaContext;
- parserFactory = DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, schemaContext);
+ parserFactory = DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, schemaContext, strictParsing);
mappedRpcs = Maps.uniqueIndex(schemaContext.getOperations(), QNAME_FUNCTION);
mappedNotifications = Multimaps.index(schemaContext.getNotifications(), QNAME_NOREV_FUNCTION);
final DataSchemaNode schemasNode = ((ContainerSchemaNode) NetconfDevice.INIT_SCHEMA_CTX.getDataChildByName("netconf-state")).getDataChildByName("schemas");
final Document schemasXml = XmlUtil.readXmlToDocument(getClass().getResourceAsStream("/netconf-state.schemas.payload.xml"));
- final ToNormalizedNodeParser<Element, ContainerNode, ContainerSchemaNode> containerNodeParser = DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, NetconfDevice.INIT_SCHEMA_CTX).getContainerNodeParser();
+ final ToNormalizedNodeParser<Element, ContainerNode, ContainerSchemaNode> containerNodeParser = DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, NetconfDevice.INIT_SCHEMA_CTX, false).getContainerNodeParser();
final ContainerNode compositeNodeSchemas = containerNodeParser.parse(Collections.singleton(schemasXml.getDocumentElement()), (ContainerSchemaNode) schemasNode);
final NetconfStateSchemas schemas = NetconfStateSchemas.create(new RemoteDeviceId("device", new InetSocketAddress(99)), compositeNodeSchemas);
public void setup() throws Exception {
final SchemaContext schemaContext = getNotificationSchemaContext(getClass());
- messageTransformer = new NetconfMessageTransformer(schemaContext);
+ messageTransformer = new NetconfMessageTransformer(schemaContext, true);
final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
cfgCtx = parser.resolveSchemaContext(Sets.union(configModules, notifModules));
assertNotNull(cfgCtx);
- messageTransformer = new NetconfMessageTransformer(cfgCtx);
+ messageTransformer = new NetconfMessageTransformer(cfgCtx, true);
}
private LeafNode<Object> buildLeaf(final QName running, final Object value) {
}
private NetconfMessageTransformer getTransformer(final SchemaContext schema) {
- return new NetconfMessageTransformer(schema);
+ return new NetconfMessageTransformer(schema, true);
}
@Test
import org.opendaylight.controller.sal.restconf.impl.RestconfDocumentedException;
import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorTag;
import org.opendaylight.controller.sal.restconf.impl.RestconfError.ErrorType;
+import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
+import org.opendaylight.yangtools.yang.data.api.schema.ChoiceNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.stream.NormalizedNodeStreamWriter;
final JsonReader reader = new JsonReader(new InputStreamReader(entityStream));
jsonParser.parse(reader);
- final NormalizedNode<?, ?> partialResult = resultHolder.getResult();
+ NormalizedNode<?, ?> partialResult = resultHolder.getResult();
final NormalizedNode<?, ?> result;
- if(partialResult instanceof MapNode) {
+
+ // unwrap result from augmentation and choice nodes on PUT
+ if (!isPost()) {
+ while (partialResult instanceof AugmentationNode || partialResult instanceof ChoiceNode) {
+ final Object childNode = ((DataContainerNode) partialResult).getValue().iterator().next();
+ partialResult = (NormalizedNode<?, ?>) childNode;
+ }
+ }
+
+ if (partialResult instanceof MapNode) {
result = Iterables.getOnlyElement(((MapNode) partialResult).getValue());
} else {
result = partialResult;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.impl.codec.xml.XmlUtils;
import org.opendaylight.yangtools.yang.data.impl.schema.transform.dom.parser.DomToNormalizedNodeParserFactory;
+import org.opendaylight.yangtools.yang.model.api.AugmentationSchema;
+import org.opendaylight.yangtools.yang.model.api.AugmentationTarget;
import org.opendaylight.yangtools.yang.model.api.ChoiceCaseNode;
import org.opendaylight.yangtools.yang.model.api.ChoiceSchemaNode;
import org.opendaylight.yangtools.yang.model.api.ContainerSchemaNode;
final String docRootElm = doc.getDocumentElement().getLocalName();
final String schemaNodeName = pathContext.getSchemaNode().getQName().getLocalName();
+ // FIXME the factory instance should be cached if the schema context is the same
+ final DomToNormalizedNodeParserFactory parserFactory =
+ DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, pathContext.getSchemaContext());
+
if (!schemaNodeName.equalsIgnoreCase(docRootElm)) {
final DataSchemaNode foundSchemaNode = findSchemaNodeOrParentChoiceByName(schemaNode, docRootElm);
if (foundSchemaNode != null) {
+ if (schemaNode instanceof AugmentationTarget) {
+ final AugmentationSchema augmentSchemaNode = findCorrespondingAugment(schemaNode, foundSchemaNode);
+ if (augmentSchemaNode != null) {
+ return parserFactory.getAugmentationNodeParser().parse(elements, augmentSchemaNode);
+ }
+ }
schemaNode = foundSchemaNode;
}
}
- // FIXME the factory instance should be cached if the schema context is the same
- final DomToNormalizedNodeParserFactory parserFactory =
- DomToNormalizedNodeParserFactory.getInstance(XmlUtils.DEFAULT_XML_CODEC_PROVIDER, pathContext.getSchemaContext());
-
NormalizedNode<?, ?> parsed = null;
+
if(schemaNode instanceof ContainerSchemaNode) {
return parserFactory.getContainerNodeParser().parse(Collections.singletonList(doc.getDocumentElement()), (ContainerSchemaNode) schemaNode);
} else if(schemaNode instanceof ListSchemaNode) {
final ChoiceSchemaNode casted = (ChoiceSchemaNode) schemaNode;
return parserFactory.getChoiceNodeParser().parse(elements, casted);
}
-
// FIXME : add another DataSchemaNode extensions e.g. LeafSchemaNode
return parsed;
}
return null;
}
+
+ private static AugmentationSchema findCorrespondingAugment(final DataSchemaNode parent, final DataSchemaNode child) {
+ if (parent instanceof AugmentationTarget && !((parent instanceof ChoiceCaseNode) || (parent instanceof ChoiceSchemaNode))) {
+ for (AugmentationSchema augmentation : ((AugmentationTarget) parent).getAvailableAugmentations()) {
+ DataSchemaNode childInAugmentation = augmentation.getDataChildByName(child.getQName());
+ if (childInAugmentation != null) {
+ return augmentation;
+ }
+ }
+ }
+ return null;
+ }
}
throw new RestconfDocumentedException("Input is required.", ErrorType.PROTOCOL, ErrorTag.MALFORMED_MESSAGE);
}
- final URI payloadNS = payload.getData().getNodeType().getNamespace();
- if (payloadNS == null) {
- throw new RestconfDocumentedException(
- "Data has bad format. Root element node must have namespace (XML format) or module name(JSON format)",
- ErrorType.PROTOCOL, ErrorTag.UNKNOWN_NAMESPACE);
- }
+ // FIXME: move this to parsing stage (we can have augmentation nodes here which do not have namespace)
+// final URI payloadNS = payload.getData().getNodeType().getNamespace();
+// if (payloadNS == null) {
+// throw new RestconfDocumentedException(
+// "Data has bad format. Root element node must have namespace (XML format) or module name(JSON format)",
+// ErrorType.PROTOCOL, ErrorTag.UNKNOWN_NAMESPACE);
+// }
final DOMMountPoint mountPoint = payload.getInstanceIdentifierContext().getMountPoint();
final InstanceIdentifierContext<?> iiWithData = payload.getInstanceIdentifierContext();