<configuration>
<instructions>
<Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
+
+ <!-- Karaf cannot handle Factory Component requirements, see https://issues.apache.org/jira/browse/KARAF-6625 -->
+ <_dsannotations-options>norequirements</_dsannotations-options>
+
<Export-Package>
org.opendaylight.controller.cluster.datastore;
org.opendaylight.controller.cluster.datastore.config;
+++ /dev/null
-/*
- * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.Beta;
-import java.util.Map;
-import org.opendaylight.controller.cluster.ActorSystemProvider;
-import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
-import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfigProvider;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMSchemaService;
-import org.opendaylight.mdsal.dom.spi.store.DOMStore;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Beta
-public abstract class AbstractOSGiDOMStore
- implements DistributedDataStoreInterface, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry {
- @Component(immediate = true, service = { DOMStore.class, DistributedDataStoreInterface.class },
- configurationPid = "org.opendaylight.controller.cluster.datastore",
- property = "type=distributed-config")
- public static final class Configuration extends AbstractOSGiDOMStore {
- @Reference
- DOMSchemaService schemaService = null;
- @Reference
- ActorSystemProvider actorSystemProvider = null;
- @Reference
- DatastoreContextIntrospectorFactory introspectorFactory = null;
- @Reference
- DatastoreSnapshotRestore snapshotRestore = null;
-
- public Configuration() {
- super(LogicalDatastoreType.CONFIGURATION);
- }
-
- @Activate
- void activate(final Map<String, Object> properties) throws InterruptedException {
- start(schemaService, actorSystemProvider, introspectorFactory, snapshotRestore, null);
- }
-
- @Modified
- void modified(final Map<String, Object> properties) {
- update(properties);
- }
-
- @Deactivate
- void deactivate() {
- stop();
- }
- }
-
- @Component(immediate = true, service = { DOMStore.class, DistributedDataStoreInterface.class },
- configurationPid = "org.opendaylight.controller.cluster.datastore",
- property = "type=distributed-operational")
- public static final class Operational extends AbstractOSGiDOMStore {
- @Reference
- DOMSchemaService schemaService = null;
- @Reference
- ActorSystemProvider actorSystemProvider = null;
- @Reference
- DatastoreContextIntrospectorFactory introspectorFactory = null;
- @Reference
- DatastoreSnapshotRestore snapshotRestore = null;
- @Reference
- ModuleShardConfigProvider configProvider = null;
-
- public Operational() {
- super(LogicalDatastoreType.OPERATIONAL);
- }
-
- @Activate
- void activate(final Map<String, Object> properties) throws InterruptedException {
- start(schemaService, actorSystemProvider, introspectorFactory, snapshotRestore,
- new ConfigurationImpl(configProvider));
- }
-
- @Modified
- void modified(final Map<String, Object> properties) {
- update(properties);
- }
-
- @Deactivate
- void deactivate() {
- stop();
- }
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractOSGiDOMStore.class);
-
- private final LogicalDatastoreType datastoreType;
-
- private ListenerRegistration<?> schemaRegistration;
- private DatastoreContextIntrospector introspector;
- private AbstractDataStore datastore;
-
- AbstractOSGiDOMStore(final LogicalDatastoreType datastoreType) {
- this.datastoreType = requireNonNull(datastoreType);
- }
-
- @Override
- public final ActorUtils getActorUtils() {
- return datastore.getActorUtils();
- }
-
- @Override
- public final <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
- final YangInstanceIdentifier internalPath, final DOMDataTreeChangeListener delegate) {
- return datastore.registerShardConfigListener(internalPath, delegate);
- }
-
- @Override
- public final <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
- final YangInstanceIdentifier shardLookup, final YangInstanceIdentifier insideShard,
- final DOMDataTreeChangeListener delegate) {
- return datastore.registerProxyListener(shardLookup, insideShard, delegate);
- }
-
- @Override
- public final <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
- final YangInstanceIdentifier treeId, final L listener) {
- return datastore.registerTreeChangeListener(treeId, listener);
- }
-
- @Override
- public final <T extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<T> registerCommitCohort(
- final DOMDataTreeIdentifier path, final T cohort) {
- return datastore.registerCommitCohort(path, cohort);
- }
-
- @Override
- public final DOMStoreTransactionChain createTransactionChain() {
- return datastore.createTransactionChain();
- }
-
- @Override
- public final DOMStoreReadTransaction newReadOnlyTransaction() {
- return datastore.newReadOnlyTransaction();
- }
-
- @Override
- public final DOMStoreWriteTransaction newWriteOnlyTransaction() {
- return datastore.newWriteOnlyTransaction();
- }
-
- @Override
- public final DOMStoreReadWriteTransaction newReadWriteTransaction() {
- return datastore.newReadWriteTransaction();
- }
-
- final void start(final DOMSchemaService schemaService, final ActorSystemProvider actorSystemProvider,
- final DatastoreContextIntrospectorFactory introspectorFactory,
- final DatastoreSnapshotRestore snapshotRestore,
- final org.opendaylight.controller.cluster.datastore.config.Configuration config)
- throws InterruptedException {
- LOG.info("Distributed Datastore type {} starting", datastoreType);
- introspector = introspectorFactory.newInstance(datastoreType);
-
- datastore = DistributedDataStoreFactory.createInstance(actorSystemProvider, introspector.getContext(),
- introspector, snapshotRestore, config);
- schemaRegistration = schemaService.registerSchemaContextListener(datastore);
-
- datastore.awaitReadiness();
- LOG.info("Distributed Datastore type {} started", datastoreType);
- }
-
- final void update(final Map<String, Object> properties) {
- LOG.debug("Overlaying settings: {}", properties);
- if (introspector.update(properties)) {
- datastore.onDatastoreContextUpdated(introspector.newContextFactory());
- }
- }
-
- final void stop() {
- LOG.info("Distributed Datastore type {} stopping", datastoreType);
- schemaRegistration.close();
- datastore.close();
- LOG.info("Distributed Datastore type {} stopped", datastoreType);
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static com.google.common.base.Verify.verifyNotNull;
+
+import com.google.common.annotations.Beta;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OSGi manifestation of a the distributed datastore, as represented by {@link AbstractDataStore}. This component's
+ * configuration is managed by {@link OSGiDistributedDataStore}.
+ */
+@Beta
+@Component(factory = OSGiDOMStore.FACTORY_NAME, service = { DOMStore.class, DistributedDataStoreInterface.class })
+public final class OSGiDOMStore
+ implements DistributedDataStoreInterface, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry {
+ // OSGi DS Component Factory name
+ static final String FACTORY_NAME = "org.opendaylight.controller.cluster.datastore.OSGiDOMStore";
+ static final String DATASTORE_INST_PROP = ".datastore.instance";
+ static final String DATASTORE_TYPE_PROP = ".datastore.type";
+
+ private static final Logger LOG = LoggerFactory.getLogger(OSGiDOMStore.class);
+
+ private LogicalDatastoreType datastoreType;
+ private AbstractDataStore datastore;
+
+ @Override
+ public ActorUtils getActorUtils() {
+ return datastore.getActorUtils();
+ }
+
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
+ final YangInstanceIdentifier internalPath, final DOMDataTreeChangeListener delegate) {
+ return datastore.registerShardConfigListener(internalPath, delegate);
+ }
+
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
+ final YangInstanceIdentifier shardLookup, final YangInstanceIdentifier insideShard,
+ final DOMDataTreeChangeListener delegate) {
+ return datastore.registerProxyListener(shardLookup, insideShard, delegate);
+ }
+
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+ final YangInstanceIdentifier treeId, final L listener) {
+ return datastore.registerTreeChangeListener(treeId, listener);
+ }
+
+ @Override
+ public <T extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<T> registerCommitCohort(
+ final DOMDataTreeIdentifier path, final T cohort) {
+ return datastore.registerCommitCohort(path, cohort);
+ }
+
+ @Override
+ public DOMStoreTransactionChain createTransactionChain() {
+ return datastore.createTransactionChain();
+ }
+
+ @Override
+ public DOMStoreReadTransaction newReadOnlyTransaction() {
+ return datastore.newReadOnlyTransaction();
+ }
+
+ @Override
+ public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+ return datastore.newWriteOnlyTransaction();
+ }
+
+ @Override
+ public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+ return datastore.newReadWriteTransaction();
+ }
+
+ @Activate
+ void activate(final Map<String, ?> properties) {
+ datastoreType = (LogicalDatastoreType) verifyNotNull(properties.get(DATASTORE_TYPE_PROP));
+ datastore = (AbstractDataStore) verifyNotNull(properties.get(DATASTORE_INST_PROP));
+ LOG.info("Datastore service type {} activated", datastoreType);
+ }
+
+ @Deactivate
+ void deactivate() {
+ datastore = null;
+ LOG.info("Datastore service type {} deactivated", datastoreType);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.Beta;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Map;
+import org.checkerframework.checker.lock.qual.GuardedBy;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfigProvider;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMSchemaService;
+import org.osgi.service.component.ComponentFactory;
+import org.osgi.service.component.ComponentInstance;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Global bootstrap component. It is responsible to start all distributed datastore instances and activate
+ * {@link OSGiDOMStore} as appropriate. It also provides routing of datastore proprerties towards AbstractDataStore.
+ */
+@Beta
+@Component(immediate = true, configurationPid = "org.opendaylight.controller.cluster.datastore")
+public final class OSGiDistributedDataStore {
+ /**
+ * Internal state associated with a particular datastore. An instance is created for each datastore and once the
+ * datastore settles, we create a new component configuration of {@link OSGiDOMStore}. This indirection is needed
+ * to not block Service Component Runtime from activating other components while we are waiting for the datastore
+ * to settle (which can take a long time).
+ */
+ private final class DatastoreState implements FutureCallback<Object> {
+ private final DatastoreContextIntrospector introspector;
+ private final LogicalDatastoreType datastoreType;
+ private final AbstractDataStore datastore;
+ private final String serviceType;
+
+ @GuardedBy("this")
+ private ComponentInstance component;
+ @GuardedBy("this")
+ private boolean stopped;
+
+ DatastoreState(final DatastoreContextIntrospector introspector, final LogicalDatastoreType datastoreType,
+ final AbstractDataStore datastore, final String serviceType) {
+ this.introspector = requireNonNull(introspector);
+ this.datastoreType = requireNonNull(datastoreType);
+ this.datastore = requireNonNull(datastore);
+ this.serviceType = requireNonNull(serviceType);
+ }
+
+ synchronized void updateProperties(final Map<String, Object> properties) {
+ if (introspector.update(properties)) {
+ datastore.onDatastoreContextUpdated(introspector.newContextFactory());
+ }
+ }
+
+ void stop() {
+ LOG.info("Distributed Datastore type {} stopping", datastoreType);
+
+ synchronized (this) {
+ stopped = true;
+ if (component != null) {
+ component.dispose();
+ component = null;
+ }
+ datastore.close();
+ LOG.info("Distributed Datastore type {} stopped", datastoreType);
+ }
+ }
+
+ @Override
+ public void onSuccess(final Object result) {
+ LOG.debug("Distributed Datastore type {} reached initial settle", datastoreType);
+
+ synchronized (this) {
+ if (!stopped) {
+ final Dictionary<String, Object> dict = new Hashtable<>();
+ dict.put(OSGiDOMStore.DATASTORE_TYPE_PROP, datastoreType);
+ dict.put(OSGiDOMStore.DATASTORE_INST_PROP, datastore);
+ dict.put("type", serviceType);
+ component = datastoreFactory.newInstance(dict);
+ LOG.info("Distributed Datastore type {} started", datastoreType);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void onFailure(final Throwable cause) {
+ LOG.error("Distributed Datastore type {} failed to settle", datastoreType, cause);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(OSGiDistributedDataStore.class);
+
+ @Reference
+ DOMSchemaService schemaService = null;
+ @Reference
+ ActorSystemProvider actorSystemProvider = null;
+ @Reference
+ DatastoreContextIntrospectorFactory introspectorFactory = null;
+ @Reference
+ DatastoreSnapshotRestore snapshotRestore = null;
+ @Reference
+ ModuleShardConfigProvider configProvider = null;
+ @Reference(target = "(component.factory=" + OSGiDOMStore.FACTORY_NAME + ")")
+ ComponentFactory datastoreFactory = null;
+
+ private DatastoreState configDatastore;
+ private DatastoreState operDatastore;
+
+ @Activate
+ void activate(final Map<String, Object> properties) {
+ configDatastore = createDatastore(LogicalDatastoreType.CONFIGURATION, "distributed-config", null);
+ operDatastore = createDatastore(LogicalDatastoreType.OPERATIONAL, "distributed-operational",
+ new ConfigurationImpl(configProvider));
+ modified(properties);
+ }
+
+ @Modified
+ void modified(final Map<String, Object> properties) {
+ LOG.debug("Overlaying settings: {}", properties);
+ configDatastore.updateProperties(properties);
+ operDatastore.updateProperties(properties);
+ }
+
+ @Deactivate
+ void deactivate() {
+ operDatastore.stop();
+ operDatastore = null;
+ configDatastore.stop();
+ configDatastore = null;
+ }
+
+ private DatastoreState createDatastore(final LogicalDatastoreType datastoreType, final String serviceType,
+ final Configuration config) {
+ LOG.info("Distributed Datastore type {} starting", datastoreType);
+ final DatastoreContextIntrospector introspector = introspectorFactory.newInstance(datastoreType);
+ final AbstractDataStore datastore = DistributedDataStoreFactory.createInstance(actorSystemProvider,
+ introspector.getContext(), introspector, snapshotRestore, config);
+ datastore.setCloseable(schemaService.registerSchemaContextListener(datastore));
+ final DatastoreState state = new DatastoreState(introspector, datastoreType, datastore, serviceType);
+
+ Futures.addCallback(datastore.initialSettleFuture(), state,
+ // Note we are invoked from shard manager and therefore could block it, hence the round-trip to executor
+ datastore.getActorUtils().getClientDispatcher()::execute);
+ return state;
+ }
+}