package org.opendaylight.controller.config.yang.md.sal.binding.impl;
-import java.util.Collection;
-import java.util.Collections;
import org.opendaylight.controller.md.sal.binding.impl.BindingToNormalizedNodeCodec;
import org.opendaylight.controller.md.sal.binding.impl.ForwardedBindingDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.Provider;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
public class BindingAsyncDataBrokerImplModule extends
- org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractBindingAsyncDataBrokerImplModule implements
- Provider {
+ org.opendaylight.controller.config.yang.md.sal.binding.impl.AbstractBindingAsyncDataBrokerImplModule {
public BindingAsyncDataBrokerImplModule(final org.opendaylight.controller.config.api.ModuleIdentifier identifier,
final org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
@Override
public java.lang.AutoCloseable createInstance() {
- Broker domBroker = getDomAsyncBrokerDependency();
- BindingToNormalizedNodeCodec mappingService = getBindingMappingServiceDependency();
-
- // FIXME: Switch this to DOM Broker registration which would not require
- // BundleContext when API are updated.
- ProviderSession session = domBroker.registerProvider(this, null);
- DOMDataBroker domDataBroker = session.getService(DOMDataBroker.class);
- SchemaService schemaService = session.getService(SchemaService.class);
+ final BindingToNormalizedNodeCodec mappingService = getBindingMappingServiceDependency();
+ final DOMDataBroker domDataBroker = getDomAsyncBrokerDependency();
+ final SchemaService schemaService = getSchemaServiceDependency();
return new ForwardedBindingDataBroker(domDataBroker, mappingService, schemaService);
}
-
-
-
-
- @Override
- public Collection<ProviderFunctionality> getProviderFunctionality() {
- return Collections.emptySet();
- }
-
- @Override
- public void onSessionInitiated(final ProviderSession arg0) {
- // intentional NOOP
- }
-
}
case binding-forwarded-data-broker {
when "/config:modules/config:module/config:type = 'binding-forwarded-data-broker'";
container binding-forwarded-data-broker {
- uses dom-forwarding-component;
+ container dom-async-broker {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity dom:dom-async-data-broker;
+ }
+ }
+ }
+
+ container binding-mapping-service {
+ uses config:service-ref {
+ refine type {
+ mandatory true;
+ config:required-identity binding-dom-mapping-service;
+ }
+ }
+ }
+
+ container schema-service {
+ uses config:service-ref {
+ refine type {
+ mandatory false;
+ config:required-identity dom:schema-service;
+ }
+ }
+ }
}
}
}
}
public DOMStore createConfigurationDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}
public DOMStore createOperationalDatastore() {
- InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
schemaService.registerSchemaContextListener(store);
return store;
}
package org.opendaylight.controller.sal.binding.test.util;
import static com.google.common.base.Preconditions.checkState;
-
import com.google.common.annotations.Beta;
import com.google.common.collect.ClassToInstanceMap;
import com.google.common.collect.ImmutableClassToInstanceMap;
public void startNewDomDataBroker() {
checkState(executor != null, "Executor needs to be set");
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", executor,
- MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", executor,
- MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
newDatastores = ImmutableMap.<LogicalDatastoreType, DOMStore>builder()
.put(LogicalDatastoreType.OPERATIONAL, operStore)
.put(LogicalDatastoreType.CONFIGURATION, configStore)
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.dom.broker.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.util.DurationStatisticsTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of blocking three-phase commit-coordination tasks without
+ * support of cancellation.
+ */
+final class CommitCoordinationTask implements Callable<Void> {
+ private static enum Phase {
+ canCommit,
+ preCommit,
+ doCommit,
+ };
+
+ private static final Logger LOG = LoggerFactory.getLogger(CommitCoordinationTask.class);
+ private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
+ private final DurationStatisticsTracker commitStatTracker;
+ private final DOMDataWriteTransaction tx;
+ private final int cohortSize;
+
+ public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
+ final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
+ final DurationStatisticsTracker commitStatTracker) {
+ this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
+ this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
+ this.commitStatTracker = commitStatTracker;
+ this.cohortSize = Iterables.size(cohorts);
+ }
+
+ @Override
+ public Void call() throws TransactionCommitFailedException {
+ final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
+
+ Phase phase = Phase.canCommit;
+
+ try {
+ LOG.debug("Transaction {}: canCommit Started", tx.getIdentifier());
+ canCommitBlocking();
+
+ phase = Phase.preCommit;
+ LOG.debug("Transaction {}: preCommit Started", tx.getIdentifier());
+ preCommitBlocking();
+
+ phase = Phase.doCommit;
+ LOG.debug("Transaction {}: doCommit Started", tx.getIdentifier());
+ commitBlocking();
+
+ LOG.debug("Transaction {}: doCommit completed", tx.getIdentifier());
+ return null;
+ } catch (TransactionCommitFailedException e) {
+ LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
+ abortBlocking(e);
+ throw e;
+ } finally {
+ if (commitStatTracker != null) {
+ commitStatTracker.addDuration(System.nanoTime() - startTime);
+ }
+ }
+ }
+
+ /**
+ *
+ * Invokes canCommit on underlying cohorts and blocks till
+ * all results are returned.
+ *
+ * Valid state transition is from SUBMITTED to CAN_COMMIT,
+ * if currentPhase is not SUBMITTED throws IllegalStateException.
+ *
+ * @throws TransactionCommitFailedException
+ * If one of cohorts failed can Commit
+ *
+ */
+ private void canCommitBlocking() throws TransactionCommitFailedException {
+ for (ListenableFuture<?> canCommit : canCommitAll()) {
+ try {
+ final Boolean result = (Boolean)canCommit.get();
+ if (result == null || !result) {
+ throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
+ }
+ }
+ }
+
+ /**
+ *
+ * Invokes canCommit on underlying cohorts and returns composite future
+ * which will contains {@link Boolean#TRUE} only and only if
+ * all cohorts returned true.
+ *
+ * Valid state transition is from SUBMITTED to CAN_COMMIT,
+ * if currentPhase is not SUBMITTED throws IllegalStateException.
+ *
+ * @return List of all cohorts futures from can commit phase.
+ *
+ */
+ private ListenableFuture<?>[] canCommitAll() {
+ final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+ int i = 0;
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[i++] = cohort.canCommit();
+ }
+ return ops;
+ }
+
+ /**
+ *
+ * Invokes preCommit on underlying cohorts and blocks till
+ * all results are returned.
+ *
+ * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
+ * state is not CAN_COMMIT
+ * throws IllegalStateException.
+ *
+ * @throws TransactionCommitFailedException
+ * If one of cohorts failed preCommit
+ *
+ */
+ private void preCommitBlocking() throws TransactionCommitFailedException {
+ final ListenableFuture<?>[] preCommitFutures = preCommitAll();
+ try {
+ for(ListenableFuture<?> future : preCommitFutures) {
+ future.get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
+ }
+ }
+
+ /**
+ *
+ * Invokes preCommit on underlying cohorts and returns future
+ * which will complete once all preCommit on cohorts completed or
+ * failed.
+ *
+ *
+ * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
+ * state is not CAN_COMMIT
+ * throws IllegalStateException.
+ *
+ * @return List of all cohorts futures from can commit phase.
+ *
+ */
+ private ListenableFuture<?>[] preCommitAll() {
+ final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+ int i = 0;
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[i++] = cohort.preCommit();
+ }
+ return ops;
+ }
+
+ /**
+ *
+ * Invokes commit on underlying cohorts and blocks till
+ * all results are returned.
+ *
+ * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
+ * IllegalStateException.
+ *
+ * @throws TransactionCommitFailedException
+ * If one of cohorts failed preCommit
+ *
+ */
+ private void commitBlocking() throws TransactionCommitFailedException {
+ final ListenableFuture<?>[] commitFutures = commitAll();
+ try {
+ for(ListenableFuture<?> future : commitFutures) {
+ future.get();
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
+ }
+ }
+
+ /**
+ *
+ * Invokes commit on underlying cohorts and returns future which
+ * completes
+ * once all commits on cohorts are completed.
+ *
+ * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
+ * IllegalStateException
+ *
+ * @return List of all cohorts futures from can commit phase.
+ */
+ private ListenableFuture<?>[] commitAll() {
+ final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+ int i = 0;
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[i++] = cohort.commit();
+ }
+ return ops;
+ }
+
+ /**
+ * Aborts transaction.
+ *
+ * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
+ * cohorts, blocks
+ * for all results. If any of the abort failed throws
+ * IllegalStateException,
+ * which will contains originalCause as suppressed Exception.
+ *
+ * If aborts we're successful throws supplied exception
+ *
+ * @param originalCause
+ * Exception which should be used to fail transaction for
+ * consumers of transaction
+ * future and listeners of transaction failure.
+ * @param phase phase in which the problem ensued
+ * @throws TransactionCommitFailedException
+ * on invocation of this method.
+ * originalCa
+ * @throws IllegalStateException
+ * if abort failed.
+ */
+ private void abortBlocking(final TransactionCommitFailedException originalCause) throws TransactionCommitFailedException {
+ Exception cause = originalCause;
+ try {
+ abortAsyncAll().get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
+ cause = new IllegalStateException("Abort failed.", e);
+ cause.addSuppressed(e);
+ }
+ Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
+ }
+
+ /**
+ * Invokes abort on underlying cohorts and returns future which
+ * completes once all abort on cohorts are completed.
+ *
+ * @return Future which will complete once all cohorts completed
+ * abort.
+ */
+ private ListenableFuture<Void> abortAsyncAll() {
+
+ final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
+ int i = 0;
+ for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
+ ops[i++] = cohort.abort();
+ }
+
+ /*
+ * We are returning all futures as list, not only succeeded ones in
+ * order to fail composite future if any of them failed.
+ * See Futures.allAsList for this description.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
+ return compositeResult;
+ }
+}
package org.opendaylight.controller.md.sal.dom.broker.impl;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
return MappingCheckedFuture.create(commitFuture,
TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER);
}
-
- /**
- *
- * Phase of 3PC commit
- *
- * Represents phase of 3PC Commit
- *
- *
- */
- private static enum CommitPhase {
- /**
- *
- * Commit Coordination Task is submitted for executing
- *
- */
- SUBMITTED,
- /**
- * Commit Coordination Task is in can commit phase of 3PC
- *
- */
- CAN_COMMIT,
- /**
- * Commit Coordination Task is in pre-commit phase of 3PC
- *
- */
- PRE_COMMIT,
- /**
- * Commit Coordination Task is in commit phase of 3PC
- *
- */
- COMMIT,
- /**
- * Commit Coordination Task is in abort phase of 3PC
- *
- */
- ABORT
- }
-
- /**
- * Implementation of blocking three-phase commit-coordination tasks without
- * support of cancellation.
- */
- private static final class CommitCoordinationTask implements Callable<Void> {
- private static final AtomicReferenceFieldUpdater<CommitCoordinationTask, CommitPhase> PHASE_UPDATER =
- AtomicReferenceFieldUpdater.newUpdater(CommitCoordinationTask.class, CommitPhase.class, "currentPhase");
- private final DOMDataWriteTransaction tx;
- private final Iterable<DOMStoreThreePhaseCommitCohort> cohorts;
- private final DurationStatisticsTracker commitStatTracker;
- private final int cohortSize;
- private volatile CommitPhase currentPhase = CommitPhase.SUBMITTED;
-
- public CommitCoordinationTask(final DOMDataWriteTransaction transaction,
- final Iterable<DOMStoreThreePhaseCommitCohort> cohorts,
- final DurationStatisticsTracker commitStatsTracker) {
- this.tx = Preconditions.checkNotNull(transaction, "transaction must not be null");
- this.cohorts = Preconditions.checkNotNull(cohorts, "cohorts must not be null");
- this.commitStatTracker = commitStatsTracker;
- this.cohortSize = Iterables.size(cohorts);
- }
-
- @Override
- public Void call() throws TransactionCommitFailedException {
- final long startTime = commitStatTracker != null ? System.nanoTime() : 0;
-
- try {
- canCommitBlocking();
- preCommitBlocking();
- commitBlocking();
- return null;
- } catch (TransactionCommitFailedException e) {
- final CommitPhase phase = currentPhase;
- LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, e);
- abortBlocking(e, phase);
- throw e;
- } finally {
- if (commitStatTracker != null) {
- commitStatTracker.addDuration(System.nanoTime() - startTime);
- }
- }
- }
-
- /**
- *
- * Invokes canCommit on underlying cohorts and blocks till
- * all results are returned.
- *
- * Valid state transition is from SUBMITTED to CAN_COMMIT,
- * if currentPhase is not SUBMITTED throws IllegalStateException.
- *
- * @throws TransactionCommitFailedException
- * If one of cohorts failed can Commit
- *
- */
- private void canCommitBlocking() throws TransactionCommitFailedException {
- for (ListenableFuture<?> canCommit : canCommitAll()) {
- try {
- final Boolean result = (Boolean)canCommit.get();
- if (result == null || !result) {
- throw new TransactionCommitFailedException("Can Commit failed, no detailed cause available.");
- }
- } catch (InterruptedException | ExecutionException e) {
- throw TransactionCommitFailedExceptionMapper.CAN_COMMIT_ERROR_MAPPER.apply(e);
- }
- }
- }
-
- /**
- *
- * Invokes canCommit on underlying cohorts and returns composite future
- * which will contains {@link Boolean#TRUE} only and only if
- * all cohorts returned true.
- *
- * Valid state transition is from SUBMITTED to CAN_COMMIT,
- * if currentPhase is not SUBMITTED throws IllegalStateException.
- *
- * @return List of all cohorts futures from can commit phase.
- *
- */
- private ListenableFuture<?>[] canCommitAll() {
- changeStateFrom(CommitPhase.SUBMITTED, CommitPhase.CAN_COMMIT);
-
- final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
- int i = 0;
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[i++] = cohort.canCommit();
- }
- return ops;
- }
-
- /**
- *
- * Invokes preCommit on underlying cohorts and blocks till
- * all results are returned.
- *
- * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
- * state is not CAN_COMMIT
- * throws IllegalStateException.
- *
- * @throws TransactionCommitFailedException
- * If one of cohorts failed preCommit
- *
- */
- private void preCommitBlocking() throws TransactionCommitFailedException {
- final ListenableFuture<?>[] preCommitFutures = preCommitAll();
- try {
- for(ListenableFuture<?> future : preCommitFutures) {
- future.get();
- }
- } catch (InterruptedException | ExecutionException e) {
- throw TransactionCommitFailedExceptionMapper.PRE_COMMIT_MAPPER.apply(e);
- }
- }
-
- /**
- *
- * Invokes preCommit on underlying cohorts and returns future
- * which will complete once all preCommit on cohorts completed or
- * failed.
- *
- *
- * Valid state transition is from CAN_COMMIT to PRE_COMMIT, if current
- * state is not CAN_COMMIT
- * throws IllegalStateException.
- *
- * @return List of all cohorts futures from can commit phase.
- *
- */
- private ListenableFuture<?>[] preCommitAll() {
- changeStateFrom(CommitPhase.CAN_COMMIT, CommitPhase.PRE_COMMIT);
-
- final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
- int i = 0;
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[i++] = cohort.preCommit();
- }
- return ops;
- }
-
- /**
- *
- * Invokes commit on underlying cohorts and blocks till
- * all results are returned.
- *
- * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
- * IllegalStateException.
- *
- * @throws TransactionCommitFailedException
- * If one of cohorts failed preCommit
- *
- */
- private void commitBlocking() throws TransactionCommitFailedException {
- final ListenableFuture<?>[] commitFutures = commitAll();
- try {
- for(ListenableFuture<?> future : commitFutures) {
- future.get();
- }
- } catch (InterruptedException | ExecutionException e) {
- throw TransactionCommitFailedExceptionMapper.COMMIT_ERROR_MAPPER.apply(e);
- }
- }
-
- /**
- *
- * Invokes commit on underlying cohorts and returns future which
- * completes
- * once all commits on cohorts are completed.
- *
- * Valid state transition is from PRE_COMMIT to COMMIT, if not throws
- * IllegalStateException
- *
- * @return List of all cohorts futures from can commit phase.
- *
- */
- private ListenableFuture<?>[] commitAll() {
- changeStateFrom(CommitPhase.PRE_COMMIT, CommitPhase.COMMIT);
-
- final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
- int i = 0;
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[i++] = cohort.commit();
- }
- return ops;
- }
-
- /**
- * Aborts transaction.
- *
- * Invokes {@link DOMStoreThreePhaseCommitCohort#abort()} on all
- * cohorts, blocks
- * for all results. If any of the abort failed throws
- * IllegalStateException,
- * which will contains originalCause as suppressed Exception.
- *
- * If aborts we're successful throws supplied exception
- *
- * @param originalCause
- * Exception which should be used to fail transaction for
- * consumers of transaction
- * future and listeners of transaction failure.
- * @param phase phase in which the problem ensued
- * @throws TransactionCommitFailedException
- * on invocation of this method.
- * originalCa
- * @throws IllegalStateException
- * if abort failed.
- */
- private void abortBlocking(final TransactionCommitFailedException originalCause, final CommitPhase phase)
- throws TransactionCommitFailedException {
- LOG.warn("Tx: {} Error during phase {}, starting Abort", tx.getIdentifier(), phase, originalCause);
- Exception cause = originalCause;
- try {
- abortAsyncAll(phase).get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Tx: {} Error during Abort.", tx.getIdentifier(), e);
- cause = new IllegalStateException("Abort failed.", e);
- cause.addSuppressed(e);
- }
- Throwables.propagateIfPossible(cause, TransactionCommitFailedException.class);
- }
-
- /**
- * Invokes abort on underlying cohorts and returns future which
- * completes once all abort on cohorts are completed.
- *
- * @param phase phase in which the problem ensued
- * @return Future which will complete once all cohorts completed
- * abort.
- */
- private ListenableFuture<Void> abortAsyncAll(final CommitPhase phase) {
- changeStateFrom(phase, CommitPhase.ABORT);
-
- final ListenableFuture<?>[] ops = new ListenableFuture<?>[cohortSize];
- int i = 0;
- for (DOMStoreThreePhaseCommitCohort cohort : cohorts) {
- ops[i++] = cohort.abort();
- }
-
- /*
- * We are returning all futures as list, not only succeeded ones in
- * order to fail composite future if any of them failed.
- * See Futures.allAsList for this description.
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- ListenableFuture<Void> compositeResult = (ListenableFuture) Futures.allAsList(ops);
- return compositeResult;
- }
-
- /**
- * Change phase / state of transaction from expected value to new value
- *
- * This method checks state and updates state to new state of
- * of this task if current state equals expected state.
- * If expected state and current state are different raises
- * IllegalStateException
- * which means there is probably bug in implementation of commit
- * coordination.
- *
- * If transition is successful, it logs transition on DEBUG level.
- *
- * @param currentExpected
- * Required phase for change of state
- * @param newState
- * New Phase which will be entered by transaction.
- * @throws IllegalStateException
- * If currentState of task does not match expected state
- */
- private void changeStateFrom(final CommitPhase currentExpected, final CommitPhase newState) {
- final boolean success = PHASE_UPDATER.compareAndSet(this, currentExpected, newState);
- Preconditions.checkState(success, "Invalid state transition: Tx: %s expected: %s current: %s target: %s",
- tx.getIdentifier(), currentExpected, currentPhase, newState);
-
- LOG.debug("Transaction {}: Phase {} Started", tx.getIdentifier(), newState);
- };
- }
-
}
import static org.junit.Assert.assertTrue;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
-
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
public class DOMBrokerPerformanceTest {
private static final Logger log = LoggerFactory.getLogger(DOMBrokerPerformanceTest.class);
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
public void setupStore() {
InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ MoreExecutors.sameThreadExecutor());
InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
import static org.junit.Assert.fail;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.CONFIGURATION;
import static org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType.OPERATIONAL;
-
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
public class DOMTransactionChainTest {
private SchemaContext schemaContext;
@Before
public void setupStore() {
- InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
- InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG",
- MoreExecutors.sameThreadExecutor(), MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore operStore = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
+ InMemoryDOMDataStore configStore = new InMemoryDOMDataStore("CFG", MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
operStore.onGlobalContextUpdated(schemaContext);
getMaxDataChangeExecutorQueueSize(), getMaxDataChangeListenerQueueSize(),
getMaxDataStoreExecutorQueueSize()));
- InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryConfigDataStore",
- dataStore.getDataChangeListenerNotificationManager(), dataStore.getDomStoreExecutor());
-
+ InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryConfigDataStore", dataStore);
dataStore.setCloseable(statsBean);
return dataStore;
getMaxDataStoreExecutorQueueSize()));
- InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryOperationalDataStore",
- dataStore.getDataChangeListenerNotificationManager(), dataStore.getDomStoreExecutor());
+ InMemoryDataStoreStats statsBean = new InMemoryDataStoreStats("InMemoryOperationalDataStore", dataStore);
dataStore.setCloseable(statsBean);
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class InMemoryDOMDataStore extends TransactionReadyPrototype implements DOMStore, Identifiable<String>, SchemaContextListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
private static final ListenableFuture<Void> SUCCESSFUL_FUTURE = Futures.immediateFuture(null);
+ private static final ListenableFuture<Boolean> CAN_COMMIT_FUTURE = Futures.immediateFuture(Boolean.TRUE);
private static final Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
new Invoker<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent>() {
private final QueuedNotificationManager<DataChangeListenerRegistration<?>, DOMImmutableDataChangeEvent> dataChangeListenerNotificationManager;
private final ExecutorService dataChangeListenerExecutor;
- private final ListeningExecutorService commitExecutor;
private final boolean debugTransactions;
private final String name;
private volatile AutoCloseable closeable;
- public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
- final ExecutorService dataChangeListenerExecutor) {
- this(name, commitExecutor, dataChangeListenerExecutor,
- InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false);
+ public InMemoryDOMDataStore(final String name, final ExecutorService dataChangeListenerExecutor) {
+ this(name, dataChangeListenerExecutor, InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE, false);
}
- public InMemoryDOMDataStore(final String name, final ListeningExecutorService commitExecutor,
- final ExecutorService dataChangeListenerExecutor, final int maxDataChangeListenerQueueSize,
- final boolean debugTransactions) {
+ public InMemoryDOMDataStore(final String name, final ExecutorService dataChangeListenerExecutor,
+ final int maxDataChangeListenerQueueSize, final boolean debugTransactions) {
this.name = Preconditions.checkNotNull(name);
- this.commitExecutor = Preconditions.checkNotNull(commitExecutor);
this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
this.debugTransactions = debugTransactions;
return dataChangeListenerNotificationManager;
}
- public ExecutorService getDomStoreExecutor() {
- return commitExecutor;
- }
-
@Override
public final String getIdentifier() {
return name;
@Override
public void close() {
- ExecutorServiceUtil.tryGracefulShutdown(commitExecutor, 30, TimeUnit.SECONDS);
ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
if(closeable != null) {
@Override
public ListenableFuture<Boolean> canCommit() {
- return commitExecutor.submit(new Callable<Boolean>() {
- @Override
- public Boolean call() throws TransactionCommitFailedException {
- try {
- dataTree.validate(modification);
- LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
- return true;
- } catch (ConflictingModificationAppliedException e) {
- LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
- e.getPath());
- transaction.warnDebugContext(LOG);
- throw new OptimisticLockFailedException("Optimistic lock failed.",e);
- } catch (DataValidationFailedException e) {
- LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
- e.getPath(), e);
- transaction.warnDebugContext(LOG);
- throw new TransactionCommitFailedException("Data did not pass validation.",e);
- }
- }
- });
+ try {
+ dataTree.validate(modification);
+ LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
+ return CAN_COMMIT_FUTURE;
+ } catch (ConflictingModificationAppliedException e) {
+ LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
+ e.getPath());
+ transaction.warnDebugContext(LOG);
+ return Futures.immediateFailedFuture(new OptimisticLockFailedException("Optimistic lock failed.", e));
+ } catch (DataValidationFailedException e) {
+ LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
+ e.getPath(), e);
+ transaction.warnDebugContext(LOG);
+ return Futures.immediateFailedFuture(new TransactionCommitFailedException("Data did not pass validation.", e));
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in validation phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
}
@Override
public ListenableFuture<Void> preCommit() {
- return commitExecutor.submit(new Callable<Void>() {
- @Override
- public Void call() {
- candidate = dataTree.prepare(modification);
- listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
- return null;
- }
- });
+ try {
+ candidate = dataTree.prepare(modification);
+ listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
+ return SUCCESSFUL_FUTURE;
+ } catch (Exception e) {
+ LOG.warn("Unexpected failure in pre-commit phase", e);
+ return Futures.immediateFailedFuture(e);
+ }
}
@Override
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.opendaylight.controller.sal.core.api.model.SchemaService;
ExecutorService dataChangeListenerExecutor = SpecialExecutors.newBlockingBoundedFastThreadPool(
dclExecutorMaxPoolSize, dclExecutorMaxQueueSize, name + "-DCL" );
- final ListeningExecutorService commitExecutor = MoreExecutors.sameThreadExecutor();
- final InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name,
- commitExecutor, dataChangeListenerExecutor,
+ final InMemoryDOMDataStore dataStore = new InMemoryDOMDataStore(name, dataChangeListenerExecutor,
actualProperties.getMaxDataChangeListenerQueueSize(), debugTransactions);
if (schemaService != null) {
package org.opendaylight.controller.md.sal.dom.store.impl.jmx;
-import java.util.concurrent.ExecutorService;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.md.sal.common.util.jmx.QueuedNotificationManagerMXBeanImpl;
import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
/**
public class InMemoryDataStoreStats implements AutoCloseable {
private final AbstractMXBean notificationExecutorStatsBean;
- private final AbstractMXBean dataStoreExecutorStatsBean;
private final QueuedNotificationManagerMXBeanImpl notificationManagerStatsBean;
- public InMemoryDataStoreStats(final String mBeanType, final QueuedNotificationManager<?, ?> manager,
- final ExecutorService dataStoreExecutor) {
+ public InMemoryDataStoreStats(final String mBeanType, final QueuedNotificationManager<?, ?> manager) {
notificationManagerStatsBean = new QueuedNotificationManagerMXBeanImpl(manager,
"notification-manager", mBeanType, null);
if (notificationExecutorStatsBean != null) {
notificationExecutorStatsBean.registerMBean();
}
+ }
- dataStoreExecutorStatsBean = ThreadExecutorStatsMXBeanImpl.create(dataStoreExecutor,
- "data-store-executor", mBeanType, null);
- if (dataStoreExecutorStatsBean != null) {
- dataStoreExecutorStatsBean.registerMBean();
- }
+ public InMemoryDataStoreStats(final String name, final InMemoryDOMDataStore dataStore) {
+ this(name, dataStore.getDataChangeListenerNotificationManager());
}
@Override
notificationExecutorStatsBean.unregisterMBean();
}
- if(dataStoreExecutorStatsBean != null) {
- dataStoreExecutorStatsBean.unregisterMBean();
- }
-
if(notificationManagerStatsBean != null) {
notificationManagerStatsBean.unregisterMBean();
}
*/
package org.opendaylight.controller.md.sal.dom.store.impl;
-import com.google.common.util.concurrent.MoreExecutors;
-
import java.util.Collection;
import java.util.Map;
-
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
dclExecutorService = new TestDCLExecutorService(
SpecialExecutors.newBlockingBoundedFastThreadPool(1, 10, "DCL" ));
- datastore = new InMemoryDOMDataStore("TEST",
- MoreExecutors.sameThreadExecutor(), dclExecutorService );
+ datastore = new InMemoryDOMDataStore("TEST", dclExecutorService);
datastore.onGlobalContextUpdated(schemaContext);
}
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-
import java.util.concurrent.ExecutionException;
-
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@Before
public void setupStore() {
- domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
- MoreExecutors.sameThreadExecutor());
+ domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
schemaContext = TestModel.createTestContext();
domStore.onGlobalContextUpdated(schemaContext);
}
package org.opendaylight.controller.md.sal.dom.store.impl;
import static org.junit.Assert.assertNotNull;
-
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;
-
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.MoreExecutors;
-
public class SchemaUpdateForTransactionTest {
private static final YangInstanceIdentifier TOP_PATH = YangInstanceIdentifier.of(Top.QNAME);
@Before
public void setupStore() {
- domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor(),
- MoreExecutors.sameThreadExecutor());
+ domStore = new InMemoryDOMDataStore("TEST", MoreExecutors.sameThreadExecutor());
loadSchemas(RockTheHouseInput.class);
}