--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.ClassToInstanceMap;
+import com.google.common.collect.ImmutableMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.CommitStatsMXBeanImpl;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
+import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.yangtools.util.DurationStatisticsTracker;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Beta
+@Component(immediate = true, configurationPid = "org.opendaylight.controller.cluster.datastore.broker",
+ property = "type=default")
+@Designate(ocd = OSGiDOMDataBroker.Config.class)
+public final class OSGiDOMDataBroker implements DOMDataBroker {
+ @ObjectClassDefinition
+ public @interface Config {
+ @AttributeDefinition(name = "max-data-broker-future-callback-queue-size")
+ int callbackQueueSize() default 1000;
+ @AttributeDefinition(name = "max-data-broker-future-callback-pool-size")
+ int callbackPoolSize() default 20;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(OSGiDOMDataBroker.class);
+
+ @Reference(target = "(type=distributed-config)")
+ DOMStore configDatastore = null;
+ @Reference(target = "(type=distributed-operational)")
+ DOMStore operDatastore = null;
+
+ private ExecutorService executorService;
+ private ConcurrentDOMDataBroker delegate;
+ private CommitStatsMXBeanImpl commitStats;
+ private ThreadExecutorStatsMXBeanImpl threadStats;
+
+ @Override
+ public DOMDataTreeReadTransaction newReadOnlyTransaction() {
+ return delegate.newReadOnlyTransaction();
+ }
+
+ @Override
+ public DOMDataTreeWriteTransaction newWriteOnlyTransaction() {
+ return delegate.newWriteOnlyTransaction();
+ }
+
+ @Override
+ public DOMDataTreeReadWriteTransaction newReadWriteTransaction() {
+ return delegate.newReadWriteTransaction();
+ }
+
+ @Override
+ public ClassToInstanceMap<DOMDataBrokerExtension> getExtensions() {
+ return delegate.getExtensions();
+ }
+
+ @Override
+ public DOMTransactionChain createTransactionChain(final DOMTransactionChainListener listener) {
+ return delegate.createTransactionChain(listener);
+ }
+
+ @Override
+ public DOMTransactionChain createMergingTransactionChain(final DOMTransactionChainListener listener) {
+ return delegate.createMergingTransactionChain(listener);
+ }
+
+ @Activate
+ void activate(final Config config) {
+ LOG.info("DOM Data Broker starting");
+ final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
+
+ executorService = SpecialExecutors.newBlockingBoundedCachedThreadPool(config.callbackPoolSize(),
+ config.callbackQueueSize(), "CommitFutures", ConcurrentDOMDataBroker.class);
+ delegate = new ConcurrentDOMDataBroker(ImmutableMap.of(
+ LogicalDatastoreType.CONFIGURATION, configDatastore, LogicalDatastoreType.OPERATIONAL, operDatastore),
+ executorService, commitStatsTracker);
+
+ commitStats = new CommitStatsMXBeanImpl(commitStatsTracker, "DOMDataBroker");
+ commitStats.register();
+ threadStats = ThreadExecutorStatsMXBeanImpl.create(executorService, "CommitFutureExecutorStats",
+ "DOMDataBroker");
+
+ LOG.info("DOM Data Broker started");
+ }
+
+ @Deactivate
+ void deactivate() {
+ LOG.info("DOM Data Broker stopping");
+ commitStats.unregister();
+ if (threadStats != null) {
+ threadStats.unregister();
+ }
+
+ delegate.close();
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ LOG.warn("Future executor failed to finish in time, giving up", e);
+ }
+ LOG.info("DOM Data Broker stopped");
+ }
+}
final Duration toWait = initialSettleTime();
try {
- if (toWait.isFinite()) {
- if (!waitTillReadyCountDownLatch.await(toWait.toNanos(), TimeUnit.NANOSECONDS)) {
- LOG.error("Shard leaders failed to settle in {}, giving up", toWait);
- return;
- }
- } else {
- waitTillReadyCountDownLatch.await();
+ if (!awaitReadiness(toWait)) {
+ LOG.error("Shard leaders failed to settle in {}, giving up", toWait);
+ return;
}
} catch (InterruptedException e) {
LOG.error("Interrupted while waiting for shards to settle", e);
LOG.debug("Data store {} is now ready", identifier);
}
+ @Beta
+ public boolean awaitReadiness() throws InterruptedException {
+ return awaitReadiness(initialSettleTime());
+ }
+
+ @Beta
+ public boolean awaitReadiness(final Duration toWait) throws InterruptedException {
+ if (toWait.isFinite()) {
+ return waitTillReadyCountDownLatch.await(toWait.toNanos(), TimeUnit.NANOSECONDS);
+ }
+
+ waitTillReadyCountDownLatch.await();
+ return true;
+ }
+
@Beta
public void awaitReadiness(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
if (!waitTillReadyCountDownLatch.await(timeout, unit)) {
return waitTillReadyCountDownLatch;
}
+ @Override
@SuppressWarnings("unchecked")
public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
final YangInstanceIdentifier shardLookup, final YangInstanceIdentifier insideShard,
return (ListenerRegistration<L>) listenerRegistrationProxy;
}
+ @Override
@SuppressWarnings("unchecked")
public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
- final YangInstanceIdentifier internalPath,
- final DOMDataTreeChangeListener delegate) {
+ final YangInstanceIdentifier internalPath, final DOMDataTreeChangeListener delegate) {
requireNonNull(delegate, "delegate should not be null");
LOG.debug("Registering a listener for the configuration shard: {}", internalPath);
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.Beta;
+import java.util.Map;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfigProvider;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMSchemaService;
+import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Beta
+public abstract class AbstractOSGiDOMStore
+ implements DistributedDataStoreInterface, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry {
+ @Component(immediate = true, service = { DOMStore.class, DistributedDataStoreInterface.class },
+ configurationPid = "org.opendaylight.controller.cluster.datastore",
+ property = "type=distributed-config")
+ public static final class Configuration extends AbstractOSGiDOMStore {
+ @Reference
+ DOMSchemaService schemaService = null;
+ @Reference
+ ActorSystemProvider actorSystemProvider = null;
+ @Reference
+ DatastoreContextIntrospectorFactory introspectorFactory = null;
+ @Reference
+ DatastoreSnapshotRestore snapshotRestore = null;
+
+ public Configuration() {
+ super(LogicalDatastoreType.CONFIGURATION);
+ }
+
+ @Activate
+ void activate(final Map<String, Object> properties) throws InterruptedException {
+ start(schemaService, actorSystemProvider, introspectorFactory, snapshotRestore, null);
+ }
+
+ @Modified
+ void modified(final Map<String, Object> properties) {
+ update(properties);
+ }
+
+ @Deactivate
+ void deactivate() {
+ stop();
+ }
+ }
+
+ @Component(immediate = true, service = { DOMStore.class, DistributedDataStoreInterface.class },
+ configurationPid = "org.opendaylight.controller.cluster.datastore",
+ property = "type=distributed-operational")
+ public static final class Operational extends AbstractOSGiDOMStore {
+ @Reference
+ DOMSchemaService schemaService = null;
+ @Reference
+ ActorSystemProvider actorSystemProvider = null;
+ @Reference
+ DatastoreContextIntrospectorFactory introspectorFactory = null;
+ @Reference
+ DatastoreSnapshotRestore snapshotRestore = null;
+ @Reference
+ ModuleShardConfigProvider configProvider = null;
+
+ public Operational() {
+ super(LogicalDatastoreType.OPERATIONAL);
+ }
+
+ @Activate
+ void activate(final Map<String, Object> properties) throws InterruptedException {
+ start(schemaService, actorSystemProvider, introspectorFactory, snapshotRestore,
+ new ConfigurationImpl(configProvider));
+ }
+
+ @Modified
+ void modified(final Map<String, Object> properties) {
+ update(properties);
+ }
+
+ @Deactivate
+ void deactivate() {
+ stop();
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractOSGiDOMStore.class);
+
+ private final LogicalDatastoreType datastoreType;
+
+ private ListenerRegistration<?> schemaRegistration;
+ private DatastoreContextIntrospector introspector;
+ private AbstractDataStore datastore;
+
+ AbstractOSGiDOMStore(final LogicalDatastoreType datastoreType) {
+ this.datastoreType = requireNonNull(datastoreType);
+ }
+
+ @Override
+ public final ActorUtils getActorUtils() {
+ return datastore.getActorUtils();
+ }
+
+ @Override
+ public final <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
+ final YangInstanceIdentifier internalPath, final DOMDataTreeChangeListener delegate) {
+ return datastore.registerShardConfigListener(internalPath, delegate);
+ }
+
+ @Override
+ public final <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
+ final YangInstanceIdentifier shardLookup, final YangInstanceIdentifier insideShard,
+ final DOMDataTreeChangeListener delegate) {
+ return datastore.registerProxyListener(shardLookup, insideShard, delegate);
+ }
+
+ @Override
+ public final <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+ final YangInstanceIdentifier treeId, final L listener) {
+ return datastore.registerTreeChangeListener(treeId, listener);
+ }
+
+ @Override
+ public final <T extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<T> registerCommitCohort(
+ final DOMDataTreeIdentifier path, final T cohort) {
+ return datastore.registerCommitCohort(path, cohort);
+ }
+
+ @Override
+ public final DOMStoreTransactionChain createTransactionChain() {
+ return datastore.createTransactionChain();
+ }
+
+ @Override
+ public final DOMStoreReadTransaction newReadOnlyTransaction() {
+ return datastore.newReadOnlyTransaction();
+ }
+
+ @Override
+ public final DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ return datastore.newWriteOnlyTransaction();
+ }
+
+ @Override
+ public final DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ return datastore.newReadWriteTransaction();
+ }
+
+ final void start(final DOMSchemaService schemaService, final ActorSystemProvider actorSystemProvider,
+ final DatastoreContextIntrospectorFactory introspectorFactory,
+ final DatastoreSnapshotRestore snapshotRestore,
+ final org.opendaylight.controller.cluster.datastore.config.Configuration config)
+ throws InterruptedException {
+ LOG.info("Distributed Datastore type {} starting", datastoreType);
+ introspector = introspectorFactory.newInstance(datastoreType);
+
+ datastore = DistributedDataStoreFactory.createInstance(actorSystemProvider, introspector.getContext(),
+ introspector, snapshotRestore, config);
+ schemaRegistration = schemaService.registerSchemaContextListener(datastore);
+
+ datastore.awaitReadiness();
+ LOG.info("Distributed Datastore type {} started", datastoreType);
+ }
+
+ final void update(final Map<String, Object> properties) {
+ LOG.debug("Overlaying settings: {}", properties);
+ if (introspector.update(properties)) {
+ datastore.onDatastoreContextUpdated(introspector.newContextFactory());
+ }
+ }
+
+ final void stop() {
+ LOG.info("Distributed Datastore type {} stopping", datastoreType);
+ schemaRegistration.close();
+ datastore.close();
+ LOG.info("Distributed Datastore type {} stopped", datastoreType);
+ }
+}
final ActorSystemProvider actorSystemProvider, final DatastoreContextIntrospector introspector,
final DatastoreContextPropertiesUpdater updater, final Configuration orgConfig) {
+ final AbstractDataStore dataStore = createInstance(actorSystemProvider, initialDatastoreContext,
+ introspector, datastoreSnapshotRestore, orgConfig);
+
+ updater.setListener(dataStore);
+
+ schemaService.registerSchemaContextListener(dataStore);
+
+ dataStore.setCloseable(updater);
+ dataStore.waitTillReady();
+
+ return dataStore;
+ }
+
+ public static AbstractDataStore createInstance(final ActorSystemProvider actorSystemProvider,
+ final DatastoreContext initialDatastoreContext, final DatastoreContextIntrospector introspector,
+ final DatastoreSnapshotRestore datastoreSnapshotRestore, final Configuration orgConfig) {
+
final String datastoreName = initialDatastoreContext.getDataStoreName();
LOG.info("Create data store instance of type : {}", datastoreName);
final ActorSystem actorSystem = actorSystemProvider.getActorSystem();
final DatastoreSnapshot restoreFromSnapshot = datastoreSnapshotRestore.getAndRemove(datastoreName).orElse(null);
- Configuration config;
+ final Configuration config;
if (orgConfig == null) {
config = new ConfigurationImpl(DEFAULT_MODULE_SHARDS_PATH, DEFAULT_MODULES_PATH);
} else {
restoreFromSnapshot);
LOG.info("Data store {} is using ask-based protocol", datastoreName);
}
- updater.setListener(dataStore);
-
- schemaService.registerSchemaContextListener(dataStore);
-
- dataStore.setCloseable(updater);
- dataStore.waitTillReady();
return dataStore;
}
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.annotations.Beta;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
/**
- * The public interface exposed vi a DistributedDataStore via the OSGi registry.
+ * The public interface exposed by an AbstractDataStore via the OSGi registry.
*
* @author Thomas Pantelis
*/
public interface DistributedDataStoreInterface extends DOMStore {
ActorUtils getActorUtils();
+
+ @Beta
+ <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
+ YangInstanceIdentifier internalPath, DOMDataTreeChangeListener delegate);
+
+ @Beta
+ <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
+ YangInstanceIdentifier shardLookup, YangInstanceIdentifier insideShard,
+ DOMDataTreeChangeListener delegate);
}
import java.util.stream.Collectors;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
- private final AbstractDataStore distributedDataStore;
+ private final DistributedDataStoreInterface distributedDataStore;
private final YangInstanceIdentifier shardPath;
private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
private final DataTree dataTree;
public DistributedShardChangePublisher(final DataStoreClient client,
- final AbstractDataStore distributedDataStore,
+ final DistributedDataStoreInterface distributedDataStore,
final DOMDataTreeIdentifier prefix,
final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
this.distributedDataStore = distributedDataStore;
import java.util.Map;
import org.checkerframework.checker.lock.qual.GuardedBy;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
private final DistributedShardChangePublisher publisher;
- DistributedShardFrontend(final AbstractDataStore distributedDataStore,
+ DistributedShardFrontend(final DistributedDataStoreInterface distributedDataStore,
final DataStoreClient client,
final DOMDataTreeIdentifier shardRoot) {
this.client = requireNonNull(client);
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
private final ShardedDOMDataTree shardedDOMDataTree;
private final ActorSystem actorSystem;
- private final AbstractDataStore distributedOperDatastore;
- private final AbstractDataStore distributedConfigDatastore;
+ private final DistributedDataStoreInterface distributedOperDatastore;
+ private final DistributedDataStoreInterface distributedConfigDatastore;
private final ActorRef shardedDataTreeActor;
private final MemberName memberName;
private final PrefixedShardConfigUpdateHandler updateHandler;
public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
- final AbstractDataStore distributedOperDatastore,
- final AbstractDataStore distributedConfigDatastore) {
+ final DistributedDataStoreInterface distributedOperDatastore,
+ final DistributedDataStoreInterface distributedConfigDatastore) {
this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
this.distributedOperDatastore = requireNonNull(distributedOperDatastore);
this.distributedConfigDatastore = requireNonNull(distributedConfigDatastore);
createPrefixConfigShard(distributedOperDatastore);
}
- private static void createPrefixConfigShard(final AbstractDataStore dataStore) {
+ private static void createPrefixConfigShard(final DistributedDataStoreInterface dataStore) {
Configuration configuration = dataStore.getActorUtils().getConfiguration();
Collection<MemberName> memberNames = configuration.getUniqueMemberNamesForAllShards();
CreateShard createShardMessage =
final Future<Object> ask =
Patterns.ask(shardedDataTreeActor, new StartConfigShardLookup(type), SHARD_FUTURE_TIMEOUT);
- ask.onComplete(new OnComplete<Object>() {
+ ask.onComplete(new OnComplete<>() {
@Override
public void onComplete(final Throwable throwable, final Object result) {
if (throwable != null) {
private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
LOG.debug("{}: Creating CDS shard for prefix: {}", memberName, prefix);
final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
- final AbstractDataStore distributedDataStore =
- prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
+ final DistributedDataStoreInterface distributedDataStore =
+ prefix.getDatastoreType().equals(LogicalDatastoreType.CONFIGURATION)
? distributedConfigDatastore : distributedOperDatastore;
try (DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.collect.ClassToInstanceMap;
+import java.util.Collection;
+import java.util.concurrent.CompletionStage;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeServiceExtension;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(immediate = true, property = "type=default")
+public final class OSGiDistributedShardedDOMDataTree
+ implements DOMDataTreeService, DOMDataTreeShardingService, DistributedShardFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(OSGiDistributedShardedDOMDataTree.class);
+
+ @Reference
+ ActorSystemProvider actorSystemProvider = null;
+ @Reference(target = "(type=distributed-config)")
+ DistributedDataStoreInterface configDatastore = null;
+ @Reference(target = "(type=distributed-operational)")
+ DistributedDataStoreInterface operDatastore = null;
+
+ private DistributedShardedDOMDataTree delegate;
+
+ @Override
+ public DOMDataTreeProducer createProducer(final Collection<DOMDataTreeIdentifier> subtrees) {
+ return delegate.createProducer(subtrees);
+ }
+
+ @Override
+ public ClassToInstanceMap<DOMDataTreeServiceExtension> getExtensions() {
+ return delegate.getExtensions();
+ }
+
+ @Override
+ public CompletionStage<DistributedShardRegistration> createDistributedShard(final DOMDataTreeIdentifier prefix,
+ final Collection<MemberName> replicaMembers) throws DOMDataTreeShardingConflictException {
+ return delegate.createDistributedShard(prefix, replicaMembers);
+ }
+
+ @Override
+ public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
+ final DOMDataTreeIdentifier prefix, final T shard, final DOMDataTreeProducer producer)
+ throws DOMDataTreeShardingConflictException {
+ return delegate.registerDataTreeShard(prefix, shard, producer);
+ }
+
+ @Override
+ public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(final T listener,
+ final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges,
+ final Collection<DOMDataTreeProducer> producers) throws DOMDataTreeLoopException {
+ return delegate.registerListener(listener, subtrees, allowRxMerges, producers);
+ }
+
+ @Activate
+ void activate() {
+ LOG.info("Distributed DOM Data Tree Service starting");
+ delegate = new DistributedShardedDOMDataTree(actorSystemProvider, operDatastore, configDatastore);
+ delegate.init();
+ LOG.info("Distributed DOM Data Tree Service started");
+ }
+
+ @Deactivate
+ void deactivate() {
+ LOG.info("Distributed DOM Data Tree Service stopping");
+ // TODO: this needs a shutdown hook, I think
+ delegate = null;
+ LOG.info("Distributed DOM Data Tree Service stopped");
+ }
+}
import java.util.List;
import java.util.stream.Collectors;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
this.memberName = requireNonNull(memberName);
}
- public void initListener(final AbstractDataStore dataStore, final LogicalDatastoreType type) {
+ public void initListener(final DistributedDataStoreInterface dataStore, final LogicalDatastoreType type) {
registrations.put(type, dataStore.registerShardConfigListener(
ClusterUtils.SHARD_LIST_PATH, new ShardConfigHandler(memberName, type, handlingActor)));
}
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
// for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
private final ActorUtils actorUtils;
private final ShardingServiceAddressResolver resolver;
- private final AbstractDataStore distributedConfigDatastore;
- private final AbstractDataStore distributedOperDatastore;
+ private final DistributedDataStoreInterface distributedConfigDatastore;
+ private final DistributedDataStoreInterface distributedOperDatastore;
private final int lookupTaskMaxRetries;
private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
- ask.onComplete(new OnComplete<Object>() {
+ ask.onComplete(new OnComplete<>() {
@Override
public void onComplete(final Throwable throwable, final Object findLeaderReply) {
if (throwable != null) {
public void run() {
final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
- ask.onComplete(new OnComplete<Object>() {
+ ask.onComplete(new OnComplete<>() {
@Override
public void onComplete(final Throwable throwable, final Object findLeaderReply) {
if (throwable != null) {
public static class ShardedDataTreeActorCreator {
private DistributedShardedDOMDataTree shardingService;
- private AbstractDataStore distributedConfigDatastore;
- private AbstractDataStore distributedOperDatastore;
+ private DistributedDataStoreInterface distributedConfigDatastore;
+ private DistributedDataStoreInterface distributedOperDatastore;
private ActorSystem actorSystem;
private ClusterWrapper cluster;
private int maxRetries;
return cluster;
}
- public AbstractDataStore getDistributedConfigDatastore() {
+ public DistributedDataStoreInterface getDistributedConfigDatastore() {
return distributedConfigDatastore;
}
public ShardedDataTreeActorCreator setDistributedConfigDatastore(
- final AbstractDataStore distributedConfigDatastore) {
+ final DistributedDataStoreInterface distributedConfigDatastore) {
this.distributedConfigDatastore = distributedConfigDatastore;
return this;
}
- public AbstractDataStore getDistributedOperDatastore() {
+ public DistributedDataStoreInterface getDistributedOperDatastore() {
return distributedOperDatastore;
}
public ShardedDataTreeActorCreator setDistributedOperDatastore(
- final AbstractDataStore distributedOperDatastore) {
+ final DistributedDataStoreInterface distributedOperDatastore) {
this.distributedOperDatastore = distributedOperDatastore;
return this;
}
+++ /dev/null
-<?xml version="1.0" encoding="UTF-8"?>
-<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
- xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
- xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
-
- <cm:property-placeholder persistent-id="org.opendaylight.controller.cluster.datastore.broker" update-strategy="none">
- <cm:default-properties>
- <cm:property name="max-data-broker-future-callback-queue-size" value="1000"/>
- <cm:property name="max-data-broker-future-callback-pool-size" value="20"/>
- </cm:default-properties>
- </cm:property-placeholder>
-
- <odl:static-reference id="schemaService" interface="org.opendaylight.mdsal.dom.api.DOMSchemaService" />
-
- <!-- Datastore properties -->
- <reference id="actorSystemProvider" interface="org.opendaylight.controller.cluster.ActorSystemProvider"/>
- <reference id="introspectorFactory" interface="org.opendaylight.controller.cluster.datastore.DatastoreContextIntrospectorFactory"/>
- <reference id="datastoreSnapshotRestore" interface="org.opendaylight.controller.cluster.datastore.DatastoreSnapshotRestore"/>
- <reference id="fileModuleShardConfigProvider" interface="org.opendaylight.controller.cluster.datastore.config.ModuleShardConfigProvider"/>
-
- <cm:cm-properties id="datastoreProps" persistent-id="org.opendaylight.controller.cluster.datastore"/>
-
- <!-- Distributed Config Datastore -->
- <bean id="introspectorConfig" factory-ref="introspectorFactory" factory-method="newInstance">
- <argument type="org.opendaylight.mdsal.common.api.LogicalDatastoreType" value="CONFIGURATION"/>
- </bean>
-
- <bean id="updaterConfig" class="org.opendaylight.controller.cluster.datastore.DatastoreContextPropertiesUpdater">
- <cm:managed-properties persistent-id="org.opendaylight.controller.cluster.datastore" update-strategy="component-managed" update-method="update"/>
- <argument ref="introspectorConfig"/>
- <argument ref="datastoreProps"/>
- </bean>
-
- <bean id="configDatastore" class="org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory"
- factory-method="createInstance" destroy-method="close">
- <argument ref="schemaService"/>
- <argument>
- <bean factory-ref="introspectorConfig" factory-method="getContext" />
- </argument>
- <argument ref="datastoreSnapshotRestore"/>
- <argument ref="actorSystemProvider"/>
- <argument ref="introspectorConfig"/>
- <argument ref="updaterConfig"/>
- </bean>
-
- <service ref="configDatastore" odl:type="distributed-config">
- <interfaces>
- <value>org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface</value>
- </interfaces>
- </service>
-
- <!-- Distributed Operational Datastore -->
-
- <bean id="configurationImpl" class="org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl">
- <argument ref="fileModuleShardConfigProvider"/>
- </bean>
-
- <bean id="introspectorOper" factory-ref="introspectorFactory" factory-method="newInstance">
- <argument type="org.opendaylight.mdsal.common.api.LogicalDatastoreType" value="OPERATIONAL"/>
- </bean>
-
- <bean id="updaterOper" class="org.opendaylight.controller.cluster.datastore.DatastoreContextPropertiesUpdater">
- <cm:managed-properties persistent-id="org.opendaylight.controller.cluster.datastore" update-strategy="component-managed" update-method="update"/>
- <argument ref="introspectorOper"/>
- <argument ref="datastoreProps"/>
- </bean>
-
- <bean id="operDatastore" class="org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory"
- factory-method="createInstance" destroy-method="close">
- <argument ref="schemaService"/>
- <argument>
- <bean factory-ref="introspectorOper" factory-method="getContext" />
- </argument>
- <argument ref="datastoreSnapshotRestore"/>
- <argument ref="actorSystemProvider"/>
- <argument ref="introspectorOper"/>
- <argument ref="updaterOper"/>
- <argument ref="configurationImpl" />
- </bean>
-
- <service ref="operDatastore" odl:type="distributed-operational">
- <interfaces>
- <value>org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface</value>
- </interfaces>
- </service>
-
- <!-- Concurrent DOMDataBroker -->
-
- <bean id="listenableFutureExecutor" class="org.opendaylight.yangtools.util.concurrent.SpecialExecutors"
- factory-method="newBlockingBoundedCachedThreadPool">
- <argument value="${max-data-broker-future-callback-pool-size}"/>
- <argument value="${max-data-broker-future-callback-queue-size}"/>
- <argument value="CommitFutures"/>
- <argument>
- <!-- We should use a more specific class -->
- <bean factory-ref="operDatastore" factory-method="getClass"/>
- </argument>
- </bean>
-
- <bean id="commitStatsTracker" class="org.opendaylight.yangtools.util.DurationStatisticsTracker"
- factory-method="createConcurrent"/>
-
- <bean id="clusteredDOMDataBroker" class="org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker"
- destroy-method="close">
- <argument>
- <map>
- <entry key="CONFIGURATION" value-ref="configDatastore"/>
- <entry key="OPERATIONAL" value-ref="operDatastore"/>
- </map>
- </argument>
- <argument ref="listenableFutureExecutor"/>
- <argument ref="commitStatsTracker"/>
- </bean>
-
- <service ref="clusteredDOMDataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker"
- odl:type="default"/>
-
- <!-- JMX beans for the data broker -->
-
- <bean id="commitStatsMXBean" class="org.opendaylight.controller.cluster.datastore.jmx.mbeans.CommitStatsMXBeanImpl"
- init-method="register" destroy-method="unregister">
- <argument ref="commitStatsTracker"/>
- <argument value="DOMDataBroker"/>
- </bean>
-
- <bean id="threadStatsMXBean" class="org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl"
- factory-method="create" destroy-method="unregister">
- <argument ref="listenableFutureExecutor"/>
- <argument value="CommitFutureExecutorStats"/>
- <argument value="DOMDataBroker"/>
- </bean>
-
- <!-- CDS shard manager -->
- <bean id="cdsNodeManager" class="org.opendaylight.controller.cluster.sharding.DistributedShardedDOMDataTree"
- init-method="init">
- <argument ref="actorSystemProvider"/>
- <argument ref="operDatastore"/>
- <argument ref="configDatastore"/>
- </bean>
-
- <service ref="cdsNodeManager" odl:type="default">
- <interfaces>
- <value>org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService</value>
- <value>org.opendaylight.mdsal.dom.api.DOMDataTreeService</value>
- <value>org.opendaylight.controller.cluster.sharding.DistributedShardFactory</value>
- </interfaces>
- </service>
-
-</blueprint>