import org.opendaylight.controller.config.api.DependencyResolver;
import org.opendaylight.controller.config.api.ModuleIdentifier;
-import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationServiceAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
import org.opendaylight.controller.sal.core.api.Broker;
public class BindingNotificationAdapterModule extends AbstractBindingNotificationAdapterModule {
- public BindingNotificationAdapterModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver) {
+ public BindingNotificationAdapterModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public BindingNotificationAdapterModule(ModuleIdentifier identifier, DependencyResolver dependencyResolver, org.opendaylight.controller.config.yang.md.sal.binding.impl.BindingNotificationAdapterModule oldModule, java.lang.AutoCloseable oldInstance) {
+ public BindingNotificationAdapterModule(final ModuleIdentifier identifier, final DependencyResolver dependencyResolver, final org.opendaylight.controller.config.yang.md.sal.binding.impl.BindingNotificationAdapterModule oldModule, final java.lang.AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
final BindingToNormalizedNodeCodec codec = getBindingMappingServiceDependency();
final Broker.ProviderSession session = getDomAsyncBrokerDependency().registerProvider(new DummyDOMProvider());
final DOMNotificationService notifService = session.getService(DOMNotificationService.class);
- return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), notifService, SingletonHolder.INVOKER_FACTORY);
+ return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), notifService);
}
}
*/
package org.opendaylight.controller.config.yang.md.sal.binding.impl;
-import com.google.common.util.concurrent.ListeningExecutorService;
+import org.opendaylight.controller.md.sal.binding.compat.HydrogenNotificationBrokerImpl;
+
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.md.sal.binding.api.NotificationService;
+import org.opendaylight.controller.md.sal.binding.compat.HeliumNotificationProviderServiceAdapter;
import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
-import org.opendaylight.controller.sal.binding.impl.NotificationBrokerImpl;
/**
*
public final class NotificationBrokerImplModule extends
org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractNotificationBrokerImplModule {
- public NotificationBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
- org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ public NotificationBrokerImplModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+ final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
super(identifier, dependencyResolver);
}
- public NotificationBrokerImplModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
- org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
- NotificationBrokerImplModule oldModule, java.lang.AutoCloseable oldInstance) {
+ public NotificationBrokerImplModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+ final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
+ final NotificationBrokerImplModule oldModule, final java.lang.AutoCloseable oldInstance) {
super(identifier, dependencyResolver, oldModule, oldInstance);
}
@Override
public java.lang.AutoCloseable createInstance() {
+ final NotificationPublishService notificationPublishService = getNotificationPublishAdapterDependency();
+ final NotificationService notificationService = getNotificationAdapterDependency();
+
+ if(notificationPublishService != null & notificationService != null) {
+ return new HeliumNotificationProviderServiceAdapter(notificationPublishService, notificationService);
+ }
+
/*
* FIXME: Switch to new broker (which has different threading model)
* once this change is communicated with downstream users or
* we will have adapter implementation which will honor Helium
* threading model for notifications.
*/
- ListeningExecutorService listeningExecutor = SingletonHolder.getDefaultNotificationExecutor();
- NotificationBrokerImpl broker = new NotificationBrokerImpl(listeningExecutor);
- return broker;
+
+ return new HydrogenNotificationBrokerImpl(SingletonHolder.getDefaultNotificationExecutor());
}
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.yangtools.yang.binding.Notification;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
+public class HydrogenNotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(HydrogenNotificationBrokerImpl.class);
private final ListenerRegistry<NotificationInterestListener> interestListeners =
ListenerRegistry.create();
private final AtomicReference<ListenerMapGeneration> listeners = new AtomicReference<>(new ListenerMapGeneration());
private final ExecutorService executor;
- public NotificationBrokerImpl(final ExecutorService executor) {
+ public HydrogenNotificationBrokerImpl(final ExecutorService executor) {
this.executor = Preconditions.checkNotNull(executor);
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import java.util.Arrays;
import java.util.Collection;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.sal.binding.impl;
+package org.opendaylight.controller.md.sal.binding.compat;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.slf4j.Logger;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMAdapterBuilder.Factory;
import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.md.sal.dom.api.DOMService;
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
-import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory;
import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
private final BindingNormalizedNodeSerializer codec;
private final DOMNotificationService domNotifService;
- public BindingDOMNotificationServiceAdapter(final BindingNormalizedNodeSerializer codec, final DOMNotificationService domNotifService, final NotificationInvokerFactory notificationInvokerFactory) {
+ public BindingDOMNotificationServiceAdapter(final BindingNormalizedNodeSerializer codec, final DOMNotificationService domNotifService) {
this.codec = codec;
this.domNotifService = domNotifService;
}
protected NotificationService createInstance(final BindingToNormalizedNodeCodec codec,
final ClassToInstanceMap<DOMService> delegates) {
final DOMNotificationService domNotification = delegates.getInstance(DOMNotificationService.class);
- final NotificationInvokerFactory invokerFactory = SingletonHolder.INVOKER_FACTORY;
- return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), domNotification, invokerFactory);
+ return new BindingDOMNotificationServiceAdapter(codec.getCodecRegistry(), domNotification);
}
@Override
}
}
+ augment "/config:modules/config:module/config:configuration" {
+ case binding-notification-broker {
+ when "/config:modules/config:module/config:type = 'binding-notification-broker'";
+ container notification-adapter {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity binding-new-notification-service;
+ }
+ }
+ }
+
+ container notification-publish-adapter {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity binding-new-notification-publish-service;
+ }
+ }
+ }
+ }
+ }
+
augment "/config:modules/config:module/config:state" {
case binding-notification-broker {
when "/config:modules/config:module/config:type = 'binding-notification-broker'";
import org.opendaylight.controller.md.sal.dom.broker.impl.DOMNotificationRouter;
import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.sal.binding.codegen.impl.SingletonHolder;
import org.opendaylight.controller.sal.binding.test.util.MockSchemaService;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
}
public NotificationService createNotificationService() {
- return new BindingDOMNotificationServiceAdapter(bindingToNormalized.getCodecRegistry(), domNotificationRouter,
- SingletonHolder.INVOKER_FACTORY);
+ return new BindingDOMNotificationServiceAdapter(bindingToNormalized.getCodecRegistry(), domNotificationRouter);
}
public NotificationPublishService createNotificationPublishService() {
import javassist.ClassPool;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.MountPointService;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
+import org.opendaylight.controller.md.sal.binding.api.NotificationService;
+import org.opendaylight.controller.md.sal.binding.compat.HeliumNotificationProviderServiceAdapter;
import org.opendaylight.controller.md.sal.binding.compat.HeliumRpcProviderRegistry;
import org.opendaylight.controller.md.sal.binding.compat.HydrogenDataBrokerAdapter;
import org.opendaylight.controller.md.sal.binding.compat.HydrogenMountProvisionServiceAdapter;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMDataBrokerAdapter;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMMountPointServiceAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationPublishServiceAdapter;
+import org.opendaylight.controller.md.sal.binding.impl.BindingDOMNotificationServiceAdapter;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMRpcProviderServiceAdapter;
import org.opendaylight.controller.md.sal.binding.impl.BindingDOMRpcServiceAdapter;
import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMMountPointService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationPublishService;
+import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMNotificationRouter;
import org.opendaylight.controller.md.sal.dom.broker.impl.DOMRpcRouter;
import org.opendaylight.controller.md.sal.dom.broker.impl.SerializedDOMDataBroker;
import org.opendaylight.controller.md.sal.dom.broker.impl.mount.DOMMountPointServiceImpl;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.controller.sal.binding.api.data.DataProviderService;
import org.opendaylight.controller.sal.binding.api.mount.MountProviderService;
-import org.opendaylight.controller.sal.binding.impl.NotificationBrokerImpl;
import org.opendaylight.controller.sal.binding.impl.RootBindingAwareBroker;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.BrokerService;
private RootBindingAwareBroker baBrokerImpl;
- private NotificationBrokerImpl baNotifyImpl;
+ private HeliumNotificationProviderServiceAdapter baNotifyImpl;
private BrokerImpl biBrokerImpl;
private BindingDOMRpcProviderServiceAdapter baProviderRpc;
private DOMRpcRouter domRouter;
+ private NotificationPublishService publishService;
+
+ private NotificationService listenService;
+
+ private DOMNotificationPublishService domPublishService;
+
+ private DOMNotificationService domListenService;
+
public DOMDataBroker getDomAsyncDataBroker() {
public void startBindingNotificationBroker() {
checkState(executor != null);
- baNotifyImpl = new NotificationBrokerImpl(executor);
+ final DOMNotificationRouter router = DOMNotificationRouter.create(16);
+ domPublishService = router;
+ domListenService = router;
+ publishService = new BindingDOMNotificationPublishServiceAdapter(codec, domPublishService);
+ listenService = new BindingDOMNotificationServiceAdapter(codec, domListenService);
+ baNotifyImpl = new HeliumNotificationProviderServiceAdapter(publishService,listenService);
}
* @return The context in which this transaction was allocated, or null
* if the context was not recorded.
*/
- @Nullable public final Throwable getDebugContext() {
+ @Nullable
+ public final Throwable getDebugContext() {
return debugContext;
}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.spi.data;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract implementation of the {@link DOMStoreTransactionChain} interface relying on {@link DataTreeSnapshot} supplier
+ * and backend commit coordinator.
+ *
+ * @param <T> transaction identifier type
+ */
+@Beta
+public abstract class AbstractSnapshotBackedTransactionChain<T> extends TransactionReadyPrototype<T> implements DOMStoreTransactionChain {
+ private static abstract class State {
+ /**
+ * Allocate a new snapshot.
+ *
+ * @return A new snapshot
+ */
+ protected abstract DataTreeSnapshot getSnapshot();
+ }
+
+ private static final class Idle extends State {
+ private final AbstractSnapshotBackedTransactionChain<?> chain;
+
+ Idle(final AbstractSnapshotBackedTransactionChain<?> chain) {
+ this.chain = Preconditions.checkNotNull(chain);
+ }
+
+ @Override
+ protected DataTreeSnapshot getSnapshot() {
+ return chain.takeSnapshot();
+ }
+ }
+
+ /**
+ * We have a transaction out there.
+ */
+ private static final class Allocated extends State {
+ private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
+ private final DOMStoreWriteTransaction transaction;
+ private volatile DataTreeSnapshot snapshot;
+
+ Allocated(final DOMStoreWriteTransaction transaction) {
+ this.transaction = Preconditions.checkNotNull(transaction);
+ }
+
+ public DOMStoreWriteTransaction getTransaction() {
+ return transaction;
+ }
+
+ @Override
+ protected DataTreeSnapshot getSnapshot() {
+ final DataTreeSnapshot ret = snapshot;
+ Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
+ return ret;
+ }
+
+ void setSnapshot(final DataTreeSnapshot snapshot) {
+ final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
+ Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier());
+ }
+ }
+
+ /**
+ * Chain is logically shut down, no further allocation allowed.
+ */
+ private static final class Shutdown extends State {
+ private final String message;
+
+ Shutdown(final String message) {
+ this.message = Preconditions.checkNotNull(message);
+ }
+
+ @Override
+ protected DataTreeSnapshot getSnapshot() {
+ throw new IllegalStateException(message);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static final AtomicReferenceFieldUpdater<AbstractSnapshotBackedTransactionChain, State> STATE_UPDATER =
+ AtomicReferenceFieldUpdater.newUpdater(AbstractSnapshotBackedTransactionChain.class, State.class, "state");
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractSnapshotBackedTransactionChain.class);
+ private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed");
+ private static final Shutdown FAILED = new Shutdown("Transaction chain has failed");
+ private final Idle idleState;
+ private volatile State state;
+
+ protected AbstractSnapshotBackedTransactionChain() {
+ idleState = new Idle(this);
+ state = idleState;
+ }
+
+ private Entry<State, DataTreeSnapshot> getSnapshot() {
+ final State localState = state;
+ return new SimpleEntry<>(localState, localState.getSnapshot());
+ }
+
+ private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
+ final State state = new Allocated(transaction);
+ return STATE_UPDATER.compareAndSet(this, expected, state);
+ }
+
+ @Override
+ public final DOMStoreReadTransaction newReadOnlyTransaction() {
+ final Entry<State, DataTreeSnapshot> entry = getSnapshot();
+ return SnapshotBackedTransactions.newReadTransaction(nextTransactionIdentifier(), getDebugTransactions(), entry.getValue());
+ }
+
+ @Override
+ public final DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ Entry<State, DataTreeSnapshot> entry;
+ DOMStoreReadWriteTransaction ret;
+
+ do {
+ entry = getSnapshot();
+ ret = new SnapshotBackedReadWriteTransaction<T>(nextTransactionIdentifier(),
+ getDebugTransactions(), entry.getValue(), this);
+ } while (!recordTransaction(entry.getKey(), ret));
+
+ return ret;
+ }
+
+ @Override
+ public final DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ Entry<State, DataTreeSnapshot> entry;
+ DOMStoreWriteTransaction ret;
+
+ do {
+ entry = getSnapshot();
+ ret = new SnapshotBackedWriteTransaction<T>(nextTransactionIdentifier(),
+ getDebugTransactions(), entry.getValue(), this);
+ } while (!recordTransaction(entry.getKey(), ret));
+
+ return ret;
+ }
+
+ @Override
+ protected final void transactionAborted(final SnapshotBackedWriteTransaction<T> tx) {
+ final State localState = state;
+ if (localState instanceof Allocated) {
+ final Allocated allocated = (Allocated)localState;
+ if (allocated.getTransaction().equals(tx)) {
+ final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
+ if (!success) {
+ LOG.warn("Transaction {} aborted, but chain {} state already transitioned from {} to {}, very strange",
+ tx, this, localState, state);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected final DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<T> tx, final DataTreeModification tree) {
+ final State localState = state;
+
+ if (localState instanceof Allocated) {
+ final Allocated allocated = (Allocated)localState;
+ final DOMStoreWriteTransaction transaction = allocated.getTransaction();
+ Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
+ allocated.setSnapshot(tree);
+ } else {
+ LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
+ }
+
+ return createCohort(tx, tree);
+ }
+
+ @Override
+ public final void close() {
+ final State localState = state;
+
+ do {
+ Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this);
+
+ if (FAILED.equals(localState)) {
+ LOG.debug("Ignoring user close in failed state");
+ return;
+ }
+ } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED));
+ }
+
+ /**
+ * Notify the base logic that a previously-submitted transaction has been committed successfully.
+ *
+ * @param transaction Transaction which completed successfully.
+ */
+ protected final void onTransactionCommited(final SnapshotBackedWriteTransaction<T> transaction) {
+ // If the committed transaction was the one we allocated last,
+ // we clear it and the ready snapshot, so the next transaction
+ // allocated refers to the data tree directly.
+ final State localState = state;
+
+ if (!(localState instanceof Allocated)) {
+ // This can legally happen if the chain is shut down before the transaction was committed
+ // by the backend.
+ LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
+ return;
+ }
+
+ final Allocated allocated = (Allocated)localState;
+ final DOMStoreWriteTransaction tx = allocated.getTransaction();
+ if (!tx.equals(transaction)) {
+ LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
+ return;
+ }
+
+ if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
+ LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state);
+ }
+ }
+
+ /**
+ * Notify the base logic that a previously-submitted transaction has failed.
+ *
+ * @param transaction Transaction which failed.
+ * @param cause Failure cause
+ */
+ protected final void onTransactionFailed(final SnapshotBackedWriteTransaction<T> transaction, final Throwable cause) {
+ LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, cause);
+ state = FAILED;
+ }
+
+ /**
+ * Return the next transaction identifier.
+ *
+ * @return transaction identifier.
+ */
+ protected abstract T nextTransactionIdentifier();
+
+ /**
+ * Inquire as to whether transactions should record their allocation context.
+ *
+ * @return True if allocation context should be recorded.
+ */
+ protected abstract boolean getDebugTransactions();
+
+ /**
+ * Take a fresh {@link DataTreeSnapshot} from the backend.
+ *
+ * @return A new snapshot.
+ */
+ protected abstract DataTreeSnapshot takeSnapshot();
+
+ /**
+ * Create a cohort for driving the transaction through the commit process.
+ *
+ * @param transaction Transaction handle
+ * @param modification {@link DataTreeModification} which needs to be applied to the backend
+ * @return A {@link DOMStoreThreePhaseCommitCohort} cohort.
+ */
+ protected abstract DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<T> transaction, final DataTreeModification modification);
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.annotations.Beta;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
* Implementation of read-only transaction backed by {@link DataTreeSnapshot}
* which delegates most of its calls to similar methods provided by underlying snapshot.
*
+ * <T> identifier type
*/
-final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction<Object>
- implements DOMStoreReadTransaction {
-
+@Beta
+public final class SnapshotBackedReadTransaction<T> extends AbstractDOMStoreTransaction<T> implements DOMStoreReadTransaction {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadTransaction.class);
private volatile DataTreeSnapshot stableSnapshot;
- public SnapshotBackedReadTransaction(final Object identifier, final boolean debug, final DataTreeSnapshot snapshot) {
+ /**
+ * Creates a new read-only transaction.
+ *
+ * @param identifier Transaction Identifier
+ * @param debug Enable transaction debugging
+ * @param snapshot Snapshot which will be modified.
+ */
+ SnapshotBackedReadTransaction(final T identifier, final boolean debug, final DataTreeSnapshot snapshot) {
super(identifier, debug);
this.stableSnapshot = Preconditions.checkNotNull(snapshot);
LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot);
checkNotNull(path, "Path must not be null.");
try {
- return Futures.immediateCheckedFuture(
- read(path).checkedGet().isPresent());
+ return Futures.immediateCheckedFuture(read(path).checkedGet().isPresent());
} catch (ReadFailedException e) {
return Futures.immediateFailedCheckedFuture(e);
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.annotations.Beta;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
* Implementation of Read-Write transaction which is backed by {@link DataTreeSnapshot}
* and executed according to {@link TransactionReadyPrototype}.
*
+ * @param <T> identifier type
*/
-final class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements DOMStoreReadWriteTransaction {
+@Beta
+public final class SnapshotBackedReadWriteTransaction<T> extends SnapshotBackedWriteTransaction<T> implements DOMStoreReadWriteTransaction {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedReadWriteTransaction.class);
- /**
- * Creates new read-write transaction.
- *
- * @param identifier transaction Identifier
- * @param snapshot Snapshot which will be modified.
- * @param readyImpl Implementation of ready method.
- */
- protected SnapshotBackedReadWriteTransaction(final Object identifier, final boolean debug,
- final DataTreeSnapshot snapshot, final TransactionReadyPrototype store) {
- super(identifier, debug, snapshot, store);
+ SnapshotBackedReadWriteTransaction(final T identifier, final boolean debug,
+ final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+ super(identifier, debug, snapshot, readyImpl);
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.spi.data;
+
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+/**
+ * Public utility class for instantiating snapshot-backed transactions.
+ */
+@Beta
+public final class SnapshotBackedTransactions {
+ private SnapshotBackedTransactions() {
+ throw new UnsupportedOperationException("Utility class");
+ }
+
+ /**
+ * Creates a new read-only transaction.
+ *
+ * @param identifier Transaction Identifier
+ * @param debug Enable transaction debugging
+ * @param snapshot Snapshot which will be modified.
+ */
+ public static <T> SnapshotBackedReadTransaction<T> newReadTransaction(final T identifier, final boolean debug, final DataTreeSnapshot snapshot) {
+ return new SnapshotBackedReadTransaction<T>(identifier, debug, snapshot);
+ }
+
+ /**
+ * Creates a new read-write transaction.
+ *
+ * @param identifier transaction Identifier
+ * @param debug Enable transaction debugging
+ * @param snapshot Snapshot which will be modified.
+ * @param readyImpl Implementation of ready method.
+ */
+ public static <T> SnapshotBackedReadWriteTransaction<T> newReadWriteTransaction(final T identifier, final boolean debug,
+ final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+ return new SnapshotBackedReadWriteTransaction<T>(identifier, debug, snapshot, readyImpl);
+ }
+
+ /**
+ * Creates a new write-only transaction.
+ *
+ * @param identifier transaction Identifier
+ * @param debug Enable transaction debugging
+ * @param snapshot Snapshot which will be modified.
+ * @param readyImpl Implementation of ready method.
+ */
+ public static <T> SnapshotBackedWriteTransaction<T> newWriteTransaction(final T identifier, final boolean debug,
+ final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
+ return new SnapshotBackedWriteTransaction<T>(identifier, debug, snapshot, readyImpl);
+ }
+}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-package org.opendaylight.controller.md.sal.dom.store.impl;
+package org.opendaylight.controller.sal.core.spi.data;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.annotations.Beta;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
/**
* Implementation of Write transaction which is backed by
* {@link DataTreeSnapshot} and executed according to
- * {@link org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype}.
+ * {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype}.
*
+ * @param <T> Identifier type
*/
-class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction<Object> implements DOMStoreWriteTransaction {
+@Beta
+public class SnapshotBackedWriteTransaction<T> extends AbstractDOMStoreTransaction<T> implements DOMStoreWriteTransaction {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotBackedWriteTransaction.class);
+ @SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, TransactionReadyPrototype> READY_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, TransactionReadyPrototype.class, "readyImpl");
+ @SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<SnapshotBackedWriteTransaction, DataTreeModification> TREE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(SnapshotBackedWriteTransaction.class, DataTreeModification.class, "mutableTree");
// non-null when not ready
- private volatile TransactionReadyPrototype readyImpl;
+ private volatile TransactionReadyPrototype<T> readyImpl;
// non-null when not committed/closed
private volatile DataTreeModification mutableTree;
- /**
- * Creates new write-only transaction.
- *
- * @param identifier
- * transaction Identifier
- * @param snapshot
- * Snapshot which will be modified.
- * @param readyImpl
- * Implementation of ready method.
- */
- public SnapshotBackedWriteTransaction(final Object identifier, final boolean debug,
- final DataTreeSnapshot snapshot, final TransactionReadyPrototype readyImpl) {
+ SnapshotBackedWriteTransaction(final T identifier, final boolean debug,
+ final DataTreeSnapshot snapshot, final TransactionReadyPrototype<T> readyImpl) {
super(identifier, debug);
this.readyImpl = Preconditions.checkNotNull(readyImpl, "readyImpl must not be null.");
mutableTree = snapshot.newModification();
* @param path Path to read
* @return null if the the transaction has been closed;
*/
- protected final Optional<NormalizedNode<?, ?>> readSnapshotNode(final YangInstanceIdentifier path) {
+ final Optional<NormalizedNode<?, ?>> readSnapshotNode(final YangInstanceIdentifier path) {
return readyImpl == null ? null : mutableTree.readNode(path);
}
@Override
public DOMStoreThreePhaseCommitCohort ready() {
- final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null);
+ @SuppressWarnings("unchecked")
+ final TransactionReadyPrototype<T> wasReady = READY_UPDATER.getAndSet(this, null);
checkState(wasReady != null, "Transaction %s is no longer open", getIdentifier());
LOG.debug("Store transaction: {} : Ready", getIdentifier());
@Override
public void close() {
- final TransactionReadyPrototype wasReady = READY_UPDATER.getAndSet(this, null);
+ @SuppressWarnings("unchecked")
+ final TransactionReadyPrototype<T> wasReady = READY_UPDATER.getAndSet(this, null);
if (wasReady != null) {
LOG.debug("Store transaction: {} : Closed", getIdentifier());
TREE_UPDATER.lazySet(this, null);
/**
* Prototype implementation of
- * {@link #ready(org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction)}
+ * {@link #ready(org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction)}
*
* This class is intended to be implemented by Transaction factories
- * responsible for allocation of {@link org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction} and
+ * responsible for allocation of {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction} and
* providing underlying logic for applying implementation.
*
+ * @param <T> identifier type
*/
- abstract static class TransactionReadyPrototype {
+ public abstract static class TransactionReadyPrototype<T> {
/**
* Called when a transaction is closed without being readied. This is not invoked for
* transactions which are ready.
*
* @param tx Transaction which got aborted.
*/
- protected abstract void transactionAborted(final SnapshotBackedWriteTransaction tx);
+ protected abstract void transactionAborted(final SnapshotBackedWriteTransaction<T> tx);
/**
* Returns a commit coordinator associated with supplied transactions.
* Modified data tree which has been constructed.
* @return DOMStoreThreePhaseCommitCohort associated with transaction
*/
- protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction tx, DataTreeModification tree);
+ protected abstract DOMStoreThreePhaseCommitCohort transactionReady(SnapshotBackedWriteTransaction<T> tx, DataTreeModification tree);
}
}
\ No newline at end of file
package org.opendaylight.controller.md.sal.dom.store.impl;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-final class ChainedTransactionCommitImpl extends ForwardingDOMStoreThreePhaseCommitCohort {
- private final SnapshotBackedWriteTransaction transaction;
- private final DOMStoreThreePhaseCommitCohort delegate;
+final class ChainedTransactionCommitImpl extends InMemoryDOMStoreThreePhaseCommitCohort {
private final DOMStoreTransactionChainImpl txChain;
- ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
- final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
- this.transaction = Preconditions.checkNotNull(transaction);
- this.delegate = Preconditions.checkNotNull(delegate);
+ ChainedTransactionCommitImpl(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction<String> transaction,
+ final DataTreeModification modification, final DOMStoreTransactionChainImpl txChain) {
+ super(store, transaction, modification);
this.txChain = Preconditions.checkNotNull(txChain);
}
- @Override
- protected DOMStoreThreePhaseCommitCohort delegate() {
- return delegate;
- }
-
@Override
public ListenableFuture<Void> commit() {
- ListenableFuture<Void> commitFuture = super.commit();
- Futures.addCallback(commitFuture, new FutureCallback<Void>() {
- @Override
- public void onFailure(final Throwable t) {
- txChain.onTransactionFailed(transaction, t);
- }
-
- @Override
- public void onSuccess(final Void result) {
- txChain.onTransactionCommited(transaction);
- }
- });
- return commitFuture;
+ ListenableFuture<Void> ret = super.commit();
+ txChain.transactionCommited(getTransaction());
+ return ret;
}
}
\ No newline at end of file
package org.opendaylight.controller.md.sal.dom.store.impl;
import com.google.common.base.Preconditions;
-import java.util.AbstractMap.SimpleEntry;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.AbstractSnapshotBackedTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-final class DOMStoreTransactionChainImpl extends TransactionReadyPrototype implements DOMStoreTransactionChain {
- private static abstract class State {
- /**
- * Allocate a new snapshot.
- *
- * @return A new snapshot
- */
- protected abstract DataTreeSnapshot getSnapshot();
- }
-
- private static final class Idle extends State {
- private final InMemoryDOMDataStore store;
-
- Idle(final InMemoryDOMDataStore store) {
- this.store = Preconditions.checkNotNull(store);
- }
-
- @Override
- protected DataTreeSnapshot getSnapshot() {
- return store.takeSnapshot();
- }
- }
-
- /**
- * We have a transaction out there.
- */
- private static final class Allocated extends State {
- private static final AtomicReferenceFieldUpdater<Allocated, DataTreeSnapshot> SNAPSHOT_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(Allocated.class, DataTreeSnapshot.class, "snapshot");
- private final DOMStoreWriteTransaction transaction;
- private volatile DataTreeSnapshot snapshot;
-
- Allocated(final DOMStoreWriteTransaction transaction) {
- this.transaction = Preconditions.checkNotNull(transaction);
- }
-
- public DOMStoreWriteTransaction getTransaction() {
- return transaction;
- }
-
- @Override
- protected DataTreeSnapshot getSnapshot() {
- final DataTreeSnapshot ret = snapshot;
- Preconditions.checkState(ret != null, "Previous transaction %s is not ready yet", transaction.getIdentifier());
- return ret;
- }
-
- void setSnapshot(final DataTreeSnapshot snapshot) {
- final boolean success = SNAPSHOT_UPDATER.compareAndSet(this, null, snapshot);
- Preconditions.checkState(success, "Transaction %s has already been marked as ready", transaction.getIdentifier());
- }
- }
-
- /**
- * Chain is logically shut down, no further allocation allowed.
- */
- private static final class Shutdown extends State {
- private final String message;
-
- Shutdown(final String message) {
- this.message = Preconditions.checkNotNull(message);
- }
-
- @Override
- protected DataTreeSnapshot getSnapshot() {
- throw new IllegalStateException(message);
- }
- }
-
- private static final AtomicReferenceFieldUpdater<DOMStoreTransactionChainImpl, State> STATE_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(DOMStoreTransactionChainImpl.class, State.class, "state");
- private static final Logger LOG = LoggerFactory.getLogger(DOMStoreTransactionChainImpl.class);
- private static final Shutdown CLOSED = new Shutdown("Transaction chain is closed");
- private static final Shutdown FAILED = new Shutdown("Transaction chain has failed");
+final class DOMStoreTransactionChainImpl extends AbstractSnapshotBackedTransactionChain<String> {
private final InMemoryDOMDataStore store;
- private final Idle idleState;
- private volatile State state;
DOMStoreTransactionChainImpl(final InMemoryDOMDataStore store) {
this.store = Preconditions.checkNotNull(store);
- idleState = new Idle(store);
- state = idleState;
- }
-
- private Entry<State, DataTreeSnapshot> getSnapshot() {
- final State localState = state;
- return new SimpleEntry<>(localState, localState.getSnapshot());
- }
-
- private boolean recordTransaction(final State expected, final DOMStoreWriteTransaction transaction) {
- final State state = new Allocated(transaction);
- return STATE_UPDATER.compareAndSet(this, expected, state);
}
@Override
- public DOMStoreReadTransaction newReadOnlyTransaction() {
- final Entry<State, DataTreeSnapshot> entry = getSnapshot();
- return new SnapshotBackedReadTransaction(store.nextIdentifier(), store.getDebugTransactions(), entry.getValue());
+ protected DOMStoreThreePhaseCommitCohort createCohort(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification modification) {
+ return new ChainedTransactionCommitImpl(store, tx, modification, this);
}
@Override
- public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- Entry<State, DataTreeSnapshot> entry;
- DOMStoreReadWriteTransaction ret;
-
- do {
- entry = getSnapshot();
- ret = new SnapshotBackedReadWriteTransaction(store.nextIdentifier(),
- store.getDebugTransactions(), entry.getValue(), this);
- } while (!recordTransaction(entry.getKey(), ret));
-
- return ret;
- }
-
- @Override
- public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- Entry<State, DataTreeSnapshot> entry;
- DOMStoreWriteTransaction ret;
-
- do {
- entry = getSnapshot();
- ret = new SnapshotBackedWriteTransaction(store.nextIdentifier(),
- store.getDebugTransactions(), entry.getValue(), this);
- } while (!recordTransaction(entry.getKey(), ret));
-
- return ret;
+ protected DataTreeSnapshot takeSnapshot() {
+ return store.takeSnapshot();
}
@Override
- protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
- final State localState = state;
- if (localState instanceof Allocated) {
- final Allocated allocated = (Allocated)localState;
- if (allocated.getTransaction().equals(tx)) {
- final boolean success = STATE_UPDATER.compareAndSet(this, localState, idleState);
- if (!success) {
- LOG.info("State already transitioned from {} to {}", localState, state);
- }
- }
- }
+ protected String nextTransactionIdentifier() {
+ return store.nextIdentifier();
}
@Override
- protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
- final State localState = state;
-
- if (localState instanceof Allocated) {
- final Allocated allocated = (Allocated)localState;
- final DOMStoreWriteTransaction transaction = allocated.getTransaction();
- Preconditions.checkState(tx.equals(transaction), "Mis-ordered ready transaction %s last allocated was %s", tx, transaction);
- allocated.setSnapshot(tree);
- } else {
- LOG.debug("Ignoring transaction {} readiness due to state {}", tx, localState);
- }
-
- return new ChainedTransactionCommitImpl(tx, store.transactionReady(tx, tree), this);
+ protected boolean getDebugTransactions() {
+ return store.getDebugTransactions();
}
- @Override
- public void close() {
- final State localState = state;
-
- do {
- Preconditions.checkState(!CLOSED.equals(localState), "Transaction chain {} has been closed", this);
-
- if (FAILED.equals(localState)) {
- LOG.debug("Ignoring user close in failed state");
- return;
- }
- } while (!STATE_UPDATER.compareAndSet(this, localState, CLOSED));
- }
-
- void onTransactionFailed(final SnapshotBackedWriteTransaction transaction, final Throwable t) {
- LOG.debug("Transaction chain {} failed on transaction {}", this, transaction, t);
- state = FAILED;
- }
-
- void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
- // If the committed transaction was the one we allocated last,
- // we clear it and the ready snapshot, so the next transaction
- // allocated refers to the data tree directly.
- final State localState = state;
-
- if (!(localState instanceof Allocated)) {
- LOG.debug("Ignoring successful transaction {} in state {}", transaction, localState);
- return;
- }
-
- final Allocated allocated = (Allocated)localState;
- final DOMStoreWriteTransaction tx = allocated.getTransaction();
- if (!tx.equals(transaction)) {
- LOG.debug("Ignoring non-latest successful transaction {} in state {}", transaction, allocated);
- return;
- }
-
- if (!STATE_UPDATER.compareAndSet(this, localState, idleState)) {
- LOG.debug("Transaction chain {} has already transitioned from {} to {}, not making it idle", this, localState, state);
- }
+ void transactionCommited(final SnapshotBackedWriteTransaction<String> transaction) {
+ super.onTransactionCommited(transaction);
}
-}
\ No newline at end of file
+}
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
-import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
-import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
-import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager.Invoker;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
*
* Implementation of {@link DOMStore} which uses {@link DataTree} and other
* classes such as {@link SnapshotBackedWriteTransaction}.
- * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
+ * {@link org.opendaylight.controller.sal.core.spi.data.SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
* to implement {@link DOMStore} contract.
*
*/
-public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
+public class InMemoryDOMDataStore extends TransactionReadyPrototype<String> implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable, DOMStoreTreeChangePublisher {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
- private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
- private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
private static final Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
new Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent>() {
@Override
public DOMStoreReadTransaction newReadOnlyTransaction() {
- return new SnapshotBackedReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot());
+ return SnapshotBackedTransactions.newReadTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot());
}
@Override
public DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return new SnapshotBackedReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
+ return SnapshotBackedTransactions.newReadWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
}
@Override
public DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return new SnapshotBackedWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
+ return SnapshotBackedTransactions.newWriteTransaction(nextIdentifier(), debugTransactions, dataTree.takeSnapshot(), this);
}
@Override
}
@Override
- protected void transactionAborted(final SnapshotBackedWriteTransaction tx) {
+ protected void transactionAborted(final SnapshotBackedWriteTransaction<String> tx) {
LOG.debug("Tx: {} is closed.", tx.getIdentifier());
}
@Override
- protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction tx, final DataTreeModification tree) {
- LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), tree);
- return new ThreePhaseCommitImpl(tx, tree);
+ protected DOMStoreThreePhaseCommitCohort transactionReady(final SnapshotBackedWriteTransaction<String> tx, final DataTreeModification modification) {
+ LOG.debug("Tx: {} is submitted. Modifications: {}", tx.getIdentifier(), modification);
+ return new InMemoryDOMStoreThreePhaseCommitCohort(this, tx, modification);
}
- Object nextIdentifier() {
+ String nextIdentifier() {
return name + "-" + txCounter.getAndIncrement();
}
- private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
- final Throwable ctx = transaction.getDebugContext();
- if (ctx != null) {
- LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
- }
+ void validate(final DataTreeModification modification) throws DataValidationFailedException {
+ dataTree.validate(modification);
}
- private final class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
- private final SnapshotBackedWriteTransaction transaction;
- private final DataTreeModification modification;
-
- private ResolveDataChangeEventsTask listenerResolver;
- private DataTreeCandidate candidate;
-
- public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction, final DataTreeModification modification) {
- this.transaction = writeTransaction;
- this.modification = modification;
- }
-
- @Override
- public ListenableFuture<Boolean> canCommit() {
- try {
- dataTree.validate(modification);
- LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
- return CAN_COMMIT_FUTURE;
- } catch (ConflictingModificationAppliedException e) {
- LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
- e.getPath());
- warnDebugContext(transaction);
- return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
- } catch (DataValidationFailedException e) {
- LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
- e.getPath(), e);
- warnDebugContext(transaction);
-
- // For debugging purposes, allow dumping of the modification. Coupled with the above
- // precondition log, it should allow us to understand what went on.
- LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, dataTree);
-
- return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
- } catch (Exception e) {
- LOG.warn("Unexpected failure in validation phase", e);
- return Futures.immediateFailedFuture(e);
- }
- }
-
- @Override
- public ListenableFuture<Void> preCommit() {
- try {
- candidate = dataTree.prepare(modification);
- listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
- return SUCCESSFUL_FUTURE;
- } catch (Exception e) {
- LOG.warn("Unexpected failure in pre-commit phase", e);
- return Futures.immediateFailedFuture(e);
- }
- }
-
- @Override
- public ListenableFuture<Void> abort() {
- candidate = null;
- return SUCCESSFUL_FUTURE;
- }
-
- @Override
- public ListenableFuture<Void> commit() {
- checkState(candidate != null, "Proposed subtree must be computed");
-
- /*
- * The commit has to occur atomically with regard to listener
- * registrations.
- */
- synchronized (InMemoryDOMDataStore.this) {
- dataTree.commit(candidate);
- changePublisher.publishChange(candidate);
- listenerResolver.resolve(dataChangeListenerNotificationManager);
- }
+ DataTreeCandidate prepare(final DataTreeModification modification) {
+ return dataTree.prepare(modification);
+ }
- return SUCCESSFUL_FUTURE;
- }
+ synchronized void commit(final DataTreeCandidate candidate) {
+ dataTree.commit(candidate);
+ changePublisher.publishChange(candidate);
+ ResolveDataChangeEventsTask.create(candidate, listenerTree).resolve(dataChangeListenerNotificationManager);
}
}
--- /dev/null
+package org.opendaylight.controller.md.sal.dom.store.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class InMemoryDOMStoreThreePhaseCommitCohort implements DOMStoreThreePhaseCommitCohort {
+ private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMStoreThreePhaseCommitCohort.class);
+ private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
+ private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
+ private final SnapshotBackedWriteTransaction<String> transaction;
+ private final DataTreeModification modification;
+ private final InMemoryDOMDataStore store;
+ private DataTreeCandidate candidate;
+
+ public InMemoryDOMStoreThreePhaseCommitCohort(final InMemoryDOMDataStore store, final SnapshotBackedWriteTransaction<String> writeTransaction, final DataTreeModification modification) {
+ this.transaction = Preconditions.checkNotNull(writeTransaction);
+ this.modification = Preconditions.checkNotNull(modification);
+ this.store = Preconditions.checkNotNull(store);
+ }
+
+ private static void warnDebugContext(final AbstractDOMStoreTransaction<?> transaction) {
+ final Throwable ctx = transaction.getDebugContext();
+ if (ctx != null) {
+ LOG.warn("Transaction {} has been allocated in the following context", transaction.getIdentifier(), ctx);
+ }
+ }
+
+ @Override
+ public final ListenableFuture<Boolean> canCommit() {
+ try {
+ store.validate(modification);
+ LOG.debug("Store Transaction: {} can be committed", getTransaction().getIdentifier());
+ return CAN_COMMIT_FUTURE;
+ } catch (ConflictingModificationAppliedException e) {
+ LOG.warn("Store Tx: {} Conflicting modification for {}.", getTransaction().getIdentifier(),
+ e.getPath());
+ warnDebugContext(getTransaction());
+ return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
+ } catch (DataValidationFailedException e) {
+ LOG.warn("Store Tx: {} Data Precondition failed for {}.", getTransaction().getIdentifier(),
+ e.getPath(), e);
+ warnDebugContext(getTransaction());
+
+ // For debugging purposes, allow dumping of the modification. Coupled with the above
+ // precondition log, it should allow us to understand what went on.
+ LOG.trace("Store Tx: {} modifications: {} tree: {}", modification, store);
+
+ return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in validation phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public final ListenableFuture<Void> preCommit() {
+ try {
+ candidate = store.prepare(modification);
+ return SUCCESSFUL_FUTURE;
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in pre-commit phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
+ }
+
+ @Override
+ public final ListenableFuture<Void> abort() {
+ candidate = null;
+ return SUCCESSFUL_FUTURE;
+ }
+
+ protected final SnapshotBackedWriteTransaction<String> getTransaction() {
+ return transaction;
+ }
+
+ @Override
+ public ListenableFuture<Void> commit() {
+ checkState(candidate != null, "Proposed subtree must be computed");
+
+ /*
+ * The commit has to occur atomically with regard to listener
+ * registrations.
+ */
+ store.commit(candidate);
+ return SUCCESSFUL_FUTURE;
+ }
+}
\ No newline at end of file
import org.junit.Test;
import org.mockito.Mockito;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedTransactions;
+import org.opendaylight.controller.sal.core.spi.data.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-
public class InMemoryDataStoreTest {
private SchemaContext schemaContext;
Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockSnapshot )
.readNode( Mockito.any( YangInstanceIdentifier.class ) );
- DOMStoreReadTransaction readTx = new SnapshotBackedReadTransaction("1", true, mockSnapshot);
+ DOMStoreReadTransaction readTx = SnapshotBackedTransactions.newReadTransaction("1", true, mockSnapshot);
doReadAndThrowEx( readTx );
}
Mockito.doThrow( new RuntimeException( "mock ex" ) ).when( mockModification )
.readNode( Mockito.any( YangInstanceIdentifier.class ) );
Mockito.doReturn( mockModification ).when( mockSnapshot ).newModification();
- TransactionReadyPrototype mockReady = Mockito.mock( TransactionReadyPrototype.class );
- DOMStoreReadTransaction readTx = new SnapshotBackedReadWriteTransaction("1", false, mockSnapshot, mockReady);
+ @SuppressWarnings("unchecked")
+ TransactionReadyPrototype<String> mockReady = Mockito.mock( TransactionReadyPrototype.class );
+ DOMStoreReadTransaction readTx = SnapshotBackedTransactions.newReadWriteTransaction("1", false, mockSnapshot, mockReady);
doReadAndThrowEx( readTx );
}
- private void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable {
-
+ private static void doReadAndThrowEx( final DOMStoreReadTransaction readTx ) throws Throwable {
try {
readTx.read(TestModel.TEST_PATH).get();
} catch( ExecutionException e ) {