Convert sal-distributed-datastore to OSGi DS 70/91770/4
authorRobert Varga <robert.varga@pantheon.tech>
Thu, 30 Jul 2020 11:28:18 +0000 (13:28 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Thu, 30 Jul 2020 15:34:58 +0000 (17:34 +0200)
This is a follow-up patch, converting more of blueprint wiring to
OSGi Declarative Services.

JIRA: CONTROLLER-1882
Change-Id: Ibb0a9f57c470e5572b64e85fdf87a093594d0367
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
12 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/OSGiDOMDataBroker.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractOSGiDOMStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreInterface.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardChangePublisher.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardFrontend.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/DistributedShardedDOMDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/OSGiDistributedShardedDOMDataTree.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/PrefixedShardConfigUpdateHandler.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/ShardedDataTreeActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/resources/OSGI-INF/blueprint/clustered-datastore.xml [deleted file]

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/OSGiDOMDataBroker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/OSGiDOMDataBroker.java
new file mode 100644 (file)
index 0000000..f0dbf96
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.ClassToInstanceMap;
+import com.google.common.collect.ImmutableMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.CommitStatsMXBeanImpl;
+import org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataBrokerExtension;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChain;
+import org.opendaylight.mdsal.dom.api.DOMTransactionChainListener;
+import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.yangtools.util.DurationStatisticsTracker;
+import org.opendaylight.yangtools.util.concurrent.SpecialExecutors;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.Designate;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Beta
+@Component(immediate = true, configurationPid = "org.opendaylight.controller.cluster.datastore.broker",
+    property = "type=default")
+@Designate(ocd = OSGiDOMDataBroker.Config.class)
+public final class OSGiDOMDataBroker implements DOMDataBroker {
+    @ObjectClassDefinition
+    public @interface Config {
+        @AttributeDefinition(name = "max-data-broker-future-callback-queue-size")
+        int callbackQueueSize() default 1000;
+        @AttributeDefinition(name = "max-data-broker-future-callback-pool-size")
+        int callbackPoolSize() default 20;
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(OSGiDOMDataBroker.class);
+
+    @Reference(target = "(type=distributed-config)")
+    DOMStore configDatastore = null;
+    @Reference(target = "(type=distributed-operational)")
+    DOMStore operDatastore = null;
+
+    private ExecutorService executorService;
+    private ConcurrentDOMDataBroker delegate;
+    private CommitStatsMXBeanImpl commitStats;
+    private ThreadExecutorStatsMXBeanImpl threadStats;
+
+    @Override
+    public DOMDataTreeReadTransaction newReadOnlyTransaction() {
+        return delegate.newReadOnlyTransaction();
+    }
+
+    @Override
+    public DOMDataTreeWriteTransaction newWriteOnlyTransaction() {
+        return delegate.newWriteOnlyTransaction();
+    }
+
+    @Override
+    public DOMDataTreeReadWriteTransaction newReadWriteTransaction() {
+        return delegate.newReadWriteTransaction();
+    }
+
+    @Override
+    public ClassToInstanceMap<DOMDataBrokerExtension> getExtensions() {
+        return delegate.getExtensions();
+    }
+
+    @Override
+    public DOMTransactionChain createTransactionChain(final DOMTransactionChainListener listener) {
+        return delegate.createTransactionChain(listener);
+    }
+
+    @Override
+    public DOMTransactionChain createMergingTransactionChain(final DOMTransactionChainListener listener) {
+        return delegate.createMergingTransactionChain(listener);
+    }
+
+    @Activate
+    void activate(final Config config) {
+        LOG.info("DOM Data Broker starting");
+        final DurationStatisticsTracker commitStatsTracker = DurationStatisticsTracker.createConcurrent();
+
+        executorService = SpecialExecutors.newBlockingBoundedCachedThreadPool(config.callbackPoolSize(),
+            config.callbackQueueSize(), "CommitFutures", ConcurrentDOMDataBroker.class);
+        delegate = new ConcurrentDOMDataBroker(ImmutableMap.of(
+            LogicalDatastoreType.CONFIGURATION, configDatastore, LogicalDatastoreType.OPERATIONAL, operDatastore),
+            executorService, commitStatsTracker);
+
+        commitStats = new CommitStatsMXBeanImpl(commitStatsTracker, "DOMDataBroker");
+        commitStats.register();
+        threadStats = ThreadExecutorStatsMXBeanImpl.create(executorService, "CommitFutureExecutorStats",
+            "DOMDataBroker");
+
+        LOG.info("DOM Data Broker started");
+    }
+
+    @Deactivate
+    void deactivate() {
+        LOG.info("DOM Data Broker stopping");
+        commitStats.unregister();
+        if (threadStats != null) {
+            threadStats.unregister();
+        }
+
+        delegate.close();
+        executorService.shutdown();
+        try {
+            executorService.awaitTermination(1, TimeUnit.MINUTES);
+        } catch (InterruptedException e) {
+            LOG.warn("Future executor failed to finish in time, giving up", e);
+        }
+        LOG.info("DOM Data Broker stopped");
+    }
+}
index 8537c85..d6958a7 100644 (file)
@@ -253,13 +253,9 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
 
         final Duration toWait = initialSettleTime();
         try {
-            if (toWait.isFinite()) {
-                if (!waitTillReadyCountDownLatch.await(toWait.toNanos(), TimeUnit.NANOSECONDS)) {
-                    LOG.error("Shard leaders failed to settle in {}, giving up", toWait);
-                    return;
-                }
-            } else {
-                waitTillReadyCountDownLatch.await();
+            if (!awaitReadiness(toWait)) {
+                LOG.error("Shard leaders failed to settle in {}, giving up", toWait);
+                return;
             }
         } catch (InterruptedException e) {
             LOG.error("Interrupted while waiting for shards to settle", e);
@@ -269,6 +265,21 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         LOG.debug("Data store {} is now ready", identifier);
     }
 
+    @Beta
+    public boolean awaitReadiness() throws InterruptedException {
+        return awaitReadiness(initialSettleTime());
+    }
+
+    @Beta
+    public boolean awaitReadiness(final Duration toWait) throws InterruptedException {
+        if (toWait.isFinite()) {
+            return waitTillReadyCountDownLatch.await(toWait.toNanos(), TimeUnit.NANOSECONDS);
+        }
+
+        waitTillReadyCountDownLatch.await();
+        return true;
+    }
+
     @Beta
     public void awaitReadiness(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
         if (!waitTillReadyCountDownLatch.await(timeout, unit)) {
@@ -301,6 +312,7 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         return waitTillReadyCountDownLatch;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
             final YangInstanceIdentifier shardLookup, final YangInstanceIdentifier insideShard,
@@ -324,10 +336,10 @@ public abstract class AbstractDataStore implements DistributedDataStoreInterface
         return (ListenerRegistration<L>) listenerRegistrationProxy;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
-            final YangInstanceIdentifier internalPath,
-            final DOMDataTreeChangeListener delegate) {
+            final YangInstanceIdentifier internalPath, final DOMDataTreeChangeListener delegate) {
         requireNonNull(delegate, "delegate should not be null");
 
         LOG.debug("Registering a listener for the configuration shard: {}", internalPath);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractOSGiDOMStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractOSGiDOMStore.java
new file mode 100644 (file)
index 0000000..fb8ba36
--- /dev/null
@@ -0,0 +1,204 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.Beta;
+import java.util.Map;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
+import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfigProvider;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMSchemaService;
+import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
+import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Beta
+public abstract class AbstractOSGiDOMStore
+        implements DistributedDataStoreInterface, DOMStoreTreeChangePublisher, DOMDataTreeCommitCohortRegistry {
+    @Component(immediate = true, service = { DOMStore.class,  DistributedDataStoreInterface.class },
+            configurationPid = "org.opendaylight.controller.cluster.datastore",
+            property = "type=distributed-config")
+    public static final class Configuration extends AbstractOSGiDOMStore {
+        @Reference
+        DOMSchemaService schemaService = null;
+        @Reference
+        ActorSystemProvider actorSystemProvider = null;
+        @Reference
+        DatastoreContextIntrospectorFactory introspectorFactory = null;
+        @Reference
+        DatastoreSnapshotRestore snapshotRestore = null;
+
+        public Configuration() {
+            super(LogicalDatastoreType.CONFIGURATION);
+        }
+
+        @Activate
+        void activate(final Map<String, Object> properties) throws InterruptedException {
+            start(schemaService, actorSystemProvider, introspectorFactory, snapshotRestore, null);
+        }
+
+        @Modified
+        void modified(final Map<String, Object> properties) {
+            update(properties);
+        }
+
+        @Deactivate
+        void deactivate() {
+            stop();
+        }
+    }
+
+    @Component(immediate = true, service = { DOMStore.class, DistributedDataStoreInterface.class },
+            configurationPid = "org.opendaylight.controller.cluster.datastore",
+            property = "type=distributed-operational")
+    public static final class Operational extends AbstractOSGiDOMStore {
+        @Reference
+        DOMSchemaService schemaService = null;
+        @Reference
+        ActorSystemProvider actorSystemProvider = null;
+        @Reference
+        DatastoreContextIntrospectorFactory introspectorFactory = null;
+        @Reference
+        DatastoreSnapshotRestore snapshotRestore = null;
+        @Reference
+        ModuleShardConfigProvider configProvider = null;
+
+        public Operational() {
+            super(LogicalDatastoreType.OPERATIONAL);
+        }
+
+        @Activate
+        void activate(final Map<String, Object> properties) throws InterruptedException {
+            start(schemaService, actorSystemProvider, introspectorFactory, snapshotRestore,
+                new ConfigurationImpl(configProvider));
+        }
+
+        @Modified
+        void modified(final Map<String, Object> properties) {
+            update(properties);
+        }
+
+        @Deactivate
+        void deactivate() {
+            stop();
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractOSGiDOMStore.class);
+
+    private final LogicalDatastoreType datastoreType;
+
+    private ListenerRegistration<?> schemaRegistration;
+    private DatastoreContextIntrospector introspector;
+    private AbstractDataStore datastore;
+
+    AbstractOSGiDOMStore(final LogicalDatastoreType datastoreType) {
+        this.datastoreType = requireNonNull(datastoreType);
+    }
+
+    @Override
+    public final ActorUtils getActorUtils() {
+        return datastore.getActorUtils();
+    }
+
+    @Override
+    public final <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
+            final YangInstanceIdentifier internalPath, final DOMDataTreeChangeListener delegate) {
+        return datastore.registerShardConfigListener(internalPath, delegate);
+    }
+
+    @Override
+    public final <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
+            final YangInstanceIdentifier shardLookup, final YangInstanceIdentifier insideShard,
+            final DOMDataTreeChangeListener delegate) {
+        return datastore.registerProxyListener(shardLookup, insideShard, delegate);
+    }
+
+    @Override
+    public final <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+            final YangInstanceIdentifier treeId, final L listener) {
+        return datastore.registerTreeChangeListener(treeId, listener);
+    }
+
+    @Override
+    public final <T extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<T> registerCommitCohort(
+            final DOMDataTreeIdentifier path, final T cohort) {
+        return datastore.registerCommitCohort(path, cohort);
+    }
+
+    @Override
+    public final DOMStoreTransactionChain createTransactionChain() {
+        return datastore.createTransactionChain();
+    }
+
+    @Override
+    public final DOMStoreReadTransaction newReadOnlyTransaction() {
+        return datastore.newReadOnlyTransaction();
+    }
+
+    @Override
+    public final DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        return datastore.newWriteOnlyTransaction();
+    }
+
+    @Override
+    public final DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        return datastore.newReadWriteTransaction();
+    }
+
+    final void start(final DOMSchemaService schemaService, final ActorSystemProvider actorSystemProvider,
+            final DatastoreContextIntrospectorFactory introspectorFactory,
+            final DatastoreSnapshotRestore snapshotRestore,
+            final org.opendaylight.controller.cluster.datastore.config.Configuration config)
+                    throws InterruptedException {
+        LOG.info("Distributed Datastore type {} starting", datastoreType);
+        introspector = introspectorFactory.newInstance(datastoreType);
+
+        datastore = DistributedDataStoreFactory.createInstance(actorSystemProvider, introspector.getContext(),
+            introspector, snapshotRestore, config);
+        schemaRegistration = schemaService.registerSchemaContextListener(datastore);
+
+        datastore.awaitReadiness();
+        LOG.info("Distributed Datastore type {} started", datastoreType);
+    }
+
+    final void update(final Map<String, Object> properties) {
+        LOG.debug("Overlaying settings: {}", properties);
+        if (introspector.update(properties)) {
+            datastore.onDatastoreContextUpdated(introspector.newContextFactory());
+        }
+    }
+
+    final void stop() {
+        LOG.info("Distributed Datastore type {} stopping", datastoreType);
+        schemaRegistration.close();
+        datastore.close();
+        LOG.info("Distributed Datastore type {} stopped", datastoreType);
+    }
+}
index 8da2816..221db2f 100644 (file)
@@ -40,13 +40,30 @@ public final class DistributedDataStoreFactory {
             final ActorSystemProvider actorSystemProvider, final DatastoreContextIntrospector introspector,
             final DatastoreContextPropertiesUpdater updater, final Configuration orgConfig) {
 
+        final AbstractDataStore dataStore = createInstance(actorSystemProvider, initialDatastoreContext,
+            introspector, datastoreSnapshotRestore, orgConfig);
+
+        updater.setListener(dataStore);
+
+        schemaService.registerSchemaContextListener(dataStore);
+
+        dataStore.setCloseable(updater);
+        dataStore.waitTillReady();
+
+        return dataStore;
+    }
+
+    public static AbstractDataStore createInstance(final ActorSystemProvider actorSystemProvider,
+            final DatastoreContext initialDatastoreContext, final DatastoreContextIntrospector introspector,
+            final DatastoreSnapshotRestore datastoreSnapshotRestore, final Configuration orgConfig) {
+
         final String datastoreName = initialDatastoreContext.getDataStoreName();
         LOG.info("Create data store instance of type : {}", datastoreName);
 
         final ActorSystem actorSystem = actorSystemProvider.getActorSystem();
         final DatastoreSnapshot restoreFromSnapshot = datastoreSnapshotRestore.getAndRemove(datastoreName).orElse(null);
 
-        Configuration config;
+        final Configuration config;
         if (orgConfig == null) {
             config = new ConfigurationImpl(DEFAULT_MODULE_SHARDS_PATH, DEFAULT_MODULES_PATH);
         } else {
@@ -68,12 +85,6 @@ public final class DistributedDataStoreFactory {
                 restoreFromSnapshot);
             LOG.info("Data store {} is using ask-based protocol", datastoreName);
         }
-        updater.setListener(dataStore);
-
-        schemaService.registerSchemaContextListener(dataStore);
-
-        dataStore.setCloseable(updater);
-        dataStore.waitTillReady();
 
         return dataStore;
     }
index 8d696c0..59ccc58 100644 (file)
@@ -7,15 +7,28 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import com.google.common.annotations.Beta;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.mdsal.dom.spi.store.DOMStore;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
 /**
- * The public interface exposed vi a DistributedDataStore via the OSGi registry.
+ * The public interface exposed by an AbstractDataStore via the OSGi registry.
  *
  * @author Thomas Pantelis
  */
 public interface DistributedDataStoreInterface extends DOMStore {
 
     ActorUtils getActorUtils();
+
+    @Beta
+    <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerShardConfigListener(
+            YangInstanceIdentifier internalPath, DOMDataTreeChangeListener delegate);
+
+    @Beta
+    <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
+            YangInstanceIdentifier shardLookup, YangInstanceIdentifier insideShard,
+            DOMDataTreeChangeListener delegate);
 }
index 8981988..bcb73db 100644 (file)
@@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
@@ -51,7 +51,7 @@ public class DistributedShardChangePublisher
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
 
-    private final AbstractDataStore distributedDataStore;
+    private final DistributedDataStoreInterface distributedDataStore;
     private final YangInstanceIdentifier shardPath;
 
     private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
@@ -60,7 +60,7 @@ public class DistributedShardChangePublisher
     private final DataTree dataTree;
 
     public DistributedShardChangePublisher(final DataStoreClient client,
-                                           final AbstractDataStore distributedDataStore,
+                                           final DistributedDataStoreInterface distributedDataStore,
                                            final DOMDataTreeIdentifier prefix,
                                            final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
         this.distributedDataStore = distributedDataStore;
index 6d41cd6..4d7f4ef 100644 (file)
@@ -17,7 +17,7 @@ import java.util.List;
 import java.util.Map;
 import org.checkerframework.checker.lock.qual.GuardedBy;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
@@ -48,7 +48,7 @@ class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard {
 
     private final DistributedShardChangePublisher publisher;
 
-    DistributedShardFrontend(final AbstractDataStore distributedDataStore,
+    DistributedShardFrontend(final DistributedDataStoreInterface distributedDataStore,
                              final DataStoreClient client,
                              final DOMDataTreeIdentifier shardRoot) {
         this.client = requireNonNull(client);
index e0d3558..e711bd6 100644 (file)
@@ -50,7 +50,7 @@ import org.opendaylight.controller.cluster.ActorSystemProvider;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
@@ -114,8 +114,8 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
 
     private final ShardedDOMDataTree shardedDOMDataTree;
     private final ActorSystem actorSystem;
-    private final AbstractDataStore distributedOperDatastore;
-    private final AbstractDataStore distributedConfigDatastore;
+    private final DistributedDataStoreInterface distributedOperDatastore;
+    private final DistributedDataStoreInterface distributedConfigDatastore;
 
     private final ActorRef shardedDataTreeActor;
     private final MemberName memberName;
@@ -133,8 +133,8 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     private final PrefixedShardConfigUpdateHandler updateHandler;
 
     public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
-                                         final AbstractDataStore distributedOperDatastore,
-                                         final AbstractDataStore distributedConfigDatastore) {
+                                         final DistributedDataStoreInterface distributedOperDatastore,
+                                         final DistributedDataStoreInterface distributedConfigDatastore) {
         this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
         this.distributedOperDatastore = requireNonNull(distributedOperDatastore);
         this.distributedConfigDatastore = requireNonNull(distributedConfigDatastore);
@@ -160,7 +160,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         createPrefixConfigShard(distributedOperDatastore);
     }
 
-    private static void createPrefixConfigShard(final AbstractDataStore dataStore) {
+    private static void createPrefixConfigShard(final DistributedDataStoreInterface dataStore) {
         Configuration configuration = dataStore.getActorUtils().getConfiguration();
         Collection<MemberName> memberNames = configuration.getUniqueMemberNamesForAllShards();
         CreateShard createShardMessage =
@@ -255,7 +255,7 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
         final Future<Object> ask =
                 Patterns.ask(shardedDataTreeActor, new StartConfigShardLookup(type), SHARD_FUTURE_TIMEOUT);
 
-        ask.onComplete(new OnComplete<Object>() {
+        ask.onComplete(new OnComplete<>() {
             @Override
             public void onComplete(final Throwable throwable, final Object result) {
                 if (throwable != null) {
@@ -377,8 +377,8 @@ public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDat
     private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
         LOG.debug("{}: Creating CDS shard for prefix: {}", memberName, prefix);
         final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
-        final AbstractDataStore distributedDataStore =
-                prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
+        final DistributedDataStoreInterface distributedDataStore =
+                prefix.getDatastoreType().equals(LogicalDatastoreType.CONFIGURATION)
                         ? distributedConfigDatastore : distributedOperDatastore;
 
         try (DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/OSGiDistributedShardedDOMDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/sharding/OSGiDistributedShardedDOMDataTree.java
new file mode 100644 (file)
index 0000000..da92b2d
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.sharding;
+
+import com.google.common.collect.ClassToInstanceMap;
+import java.util.Collection;
+import java.util.concurrent.CompletionStage;
+import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeServiceExtension;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(immediate = true, property = "type=default")
+public final class OSGiDistributedShardedDOMDataTree
+        implements DOMDataTreeService, DOMDataTreeShardingService, DistributedShardFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(OSGiDistributedShardedDOMDataTree.class);
+
+    @Reference
+    ActorSystemProvider actorSystemProvider = null;
+    @Reference(target = "(type=distributed-config)")
+    DistributedDataStoreInterface configDatastore = null;
+    @Reference(target = "(type=distributed-operational)")
+    DistributedDataStoreInterface operDatastore = null;
+
+    private DistributedShardedDOMDataTree delegate;
+
+    @Override
+    public DOMDataTreeProducer createProducer(final Collection<DOMDataTreeIdentifier> subtrees) {
+        return delegate.createProducer(subtrees);
+    }
+
+    @Override
+    public ClassToInstanceMap<DOMDataTreeServiceExtension> getExtensions() {
+        return delegate.getExtensions();
+    }
+
+    @Override
+    public CompletionStage<DistributedShardRegistration> createDistributedShard(final DOMDataTreeIdentifier prefix,
+            final Collection<MemberName> replicaMembers) throws DOMDataTreeShardingConflictException {
+        return delegate.createDistributedShard(prefix, replicaMembers);
+    }
+
+    @Override
+    public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
+            final DOMDataTreeIdentifier prefix, final T shard, final DOMDataTreeProducer producer)
+            throws DOMDataTreeShardingConflictException {
+        return delegate.registerDataTreeShard(prefix, shard, producer);
+    }
+
+    @Override
+    public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(final T listener,
+            final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges,
+            final Collection<DOMDataTreeProducer> producers) throws DOMDataTreeLoopException {
+        return delegate.registerListener(listener, subtrees, allowRxMerges, producers);
+    }
+
+    @Activate
+    void activate() {
+        LOG.info("Distributed DOM Data Tree Service starting");
+        delegate = new DistributedShardedDOMDataTree(actorSystemProvider, operDatastore, configDatastore);
+        delegate.init();
+        LOG.info("Distributed DOM Data Tree Service started");
+    }
+
+    @Deactivate
+    void deactivate() {
+        LOG.info("Distributed DOM Data Tree Service stopping");
+        // TODO: this needs a shutdown hook, I think
+        delegate = null;
+        LOG.info("Distributed DOM Data Tree Service stopped");
+    }
+}
index d7d75c2..3e5a3ad 100644 (file)
@@ -19,7 +19,7 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
@@ -60,7 +60,7 @@ public class PrefixedShardConfigUpdateHandler {
         this.memberName = requireNonNull(memberName);
     }
 
-    public void initListener(final AbstractDataStore dataStore, final LogicalDatastoreType type) {
+    public void initListener(final DistributedDataStoreInterface dataStore, final LogicalDatastoreType type) {
         registrations.put(type, dataStore.registerShardConfigListener(
                 ClusterUtils.SHARD_LIST_PATH, new ShardConfigHandler(memberName, type, handlingActor)));
     }
index c3e4369..f7c0d22 100644 (file)
@@ -38,8 +38,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
@@ -90,8 +90,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
     // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
     private final ActorUtils actorUtils;
     private final ShardingServiceAddressResolver resolver;
-    private final AbstractDataStore distributedConfigDatastore;
-    private final AbstractDataStore distributedOperDatastore;
+    private final DistributedDataStoreInterface distributedConfigDatastore;
+    private final DistributedDataStoreInterface distributedOperDatastore;
     private final int lookupTaskMaxRetries;
 
     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
@@ -499,7 +499,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
 
             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
 
-            ask.onComplete(new OnComplete<Object>() {
+            ask.onComplete(new OnComplete<>() {
                 @Override
                 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
                     if (throwable != null) {
@@ -725,7 +725,7 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
         public void run() {
             final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
 
-            ask.onComplete(new OnComplete<Object>() {
+            ask.onComplete(new OnComplete<>() {
                 @Override
                 public void onComplete(final Throwable throwable, final Object findLeaderReply) {
                     if (throwable != null) {
@@ -750,8 +750,8 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
     public static class ShardedDataTreeActorCreator {
 
         private DistributedShardedDOMDataTree shardingService;
-        private AbstractDataStore distributedConfigDatastore;
-        private AbstractDataStore distributedOperDatastore;
+        private DistributedDataStoreInterface distributedConfigDatastore;
+        private DistributedDataStoreInterface distributedOperDatastore;
         private ActorSystem actorSystem;
         private ClusterWrapper cluster;
         private int maxRetries;
@@ -783,22 +783,22 @@ public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
             return cluster;
         }
 
-        public AbstractDataStore getDistributedConfigDatastore() {
+        public DistributedDataStoreInterface getDistributedConfigDatastore() {
             return distributedConfigDatastore;
         }
 
         public ShardedDataTreeActorCreator setDistributedConfigDatastore(
-                final AbstractDataStore distributedConfigDatastore) {
+                final DistributedDataStoreInterface distributedConfigDatastore) {
             this.distributedConfigDatastore = distributedConfigDatastore;
             return this;
         }
 
-        public AbstractDataStore getDistributedOperDatastore() {
+        public DistributedDataStoreInterface getDistributedOperDatastore() {
             return distributedOperDatastore;
         }
 
         public ShardedDataTreeActorCreator setDistributedOperDatastore(
-                final AbstractDataStore distributedOperDatastore) {
+                final DistributedDataStoreInterface distributedOperDatastore) {
             this.distributedOperDatastore = distributedOperDatastore;
             return this;
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/OSGI-INF/blueprint/clustered-datastore.xml b/opendaylight/md-sal/sal-distributed-datastore/src/main/resources/OSGI-INF/blueprint/clustered-datastore.xml
deleted file mode 100644 (file)
index 1446320..0000000
+++ /dev/null
@@ -1,149 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
-           xmlns:odl="http://opendaylight.org/xmlns/blueprint/v1.0.0"
-           xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.2.0">
-
-  <cm:property-placeholder persistent-id="org.opendaylight.controller.cluster.datastore.broker" update-strategy="none">
-    <cm:default-properties>
-      <cm:property name="max-data-broker-future-callback-queue-size" value="1000"/>
-      <cm:property name="max-data-broker-future-callback-pool-size" value="20"/>
-    </cm:default-properties>
-  </cm:property-placeholder>
-
-  <odl:static-reference id="schemaService" interface="org.opendaylight.mdsal.dom.api.DOMSchemaService" />
-
-  <!-- Datastore properties -->
-  <reference id="actorSystemProvider" interface="org.opendaylight.controller.cluster.ActorSystemProvider"/>
-  <reference id="introspectorFactory" interface="org.opendaylight.controller.cluster.datastore.DatastoreContextIntrospectorFactory"/>
-  <reference id="datastoreSnapshotRestore" interface="org.opendaylight.controller.cluster.datastore.DatastoreSnapshotRestore"/>
-  <reference id="fileModuleShardConfigProvider" interface="org.opendaylight.controller.cluster.datastore.config.ModuleShardConfigProvider"/>
-
-  <cm:cm-properties id="datastoreProps" persistent-id="org.opendaylight.controller.cluster.datastore"/>
-
-  <!-- Distributed Config Datastore -->
-  <bean id="introspectorConfig" factory-ref="introspectorFactory" factory-method="newInstance">
-    <argument type="org.opendaylight.mdsal.common.api.LogicalDatastoreType" value="CONFIGURATION"/>
-  </bean>
-
-  <bean id="updaterConfig" class="org.opendaylight.controller.cluster.datastore.DatastoreContextPropertiesUpdater">
-    <cm:managed-properties persistent-id="org.opendaylight.controller.cluster.datastore" update-strategy="component-managed" update-method="update"/>
-    <argument ref="introspectorConfig"/>
-    <argument ref="datastoreProps"/>
-  </bean>
-
-  <bean id="configDatastore" class="org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory"
-          factory-method="createInstance" destroy-method="close">
-    <argument ref="schemaService"/>
-    <argument>
-      <bean factory-ref="introspectorConfig" factory-method="getContext" />
-    </argument>
-    <argument ref="datastoreSnapshotRestore"/>
-    <argument ref="actorSystemProvider"/>
-    <argument ref="introspectorConfig"/>
-    <argument ref="updaterConfig"/>
-  </bean>
-
-  <service ref="configDatastore" odl:type="distributed-config">
-    <interfaces>
-      <value>org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface</value>
-    </interfaces>
-  </service>
-
-  <!-- Distributed Operational Datastore -->
-
-  <bean id="configurationImpl" class="org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl">
-    <argument ref="fileModuleShardConfigProvider"/>
-  </bean>
-
-  <bean id="introspectorOper" factory-ref="introspectorFactory" factory-method="newInstance">
-    <argument type="org.opendaylight.mdsal.common.api.LogicalDatastoreType" value="OPERATIONAL"/>
-  </bean>
-
-  <bean id="updaterOper" class="org.opendaylight.controller.cluster.datastore.DatastoreContextPropertiesUpdater">
-    <cm:managed-properties persistent-id="org.opendaylight.controller.cluster.datastore" update-strategy="component-managed" update-method="update"/>
-    <argument ref="introspectorOper"/>
-    <argument ref="datastoreProps"/>
-  </bean>
-
-  <bean id="operDatastore" class="org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory"
-          factory-method="createInstance" destroy-method="close">
-    <argument ref="schemaService"/>
-    <argument>
-      <bean factory-ref="introspectorOper" factory-method="getContext" />
-    </argument>
-    <argument ref="datastoreSnapshotRestore"/>
-    <argument ref="actorSystemProvider"/>
-    <argument ref="introspectorOper"/>
-    <argument ref="updaterOper"/>
-    <argument ref="configurationImpl" />
-  </bean>
-
-  <service ref="operDatastore" odl:type="distributed-operational">
-    <interfaces>
-      <value>org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface</value>
-    </interfaces>
-  </service>
-
-  <!-- Concurrent DOMDataBroker -->
-
-  <bean id="listenableFutureExecutor" class="org.opendaylight.yangtools.util.concurrent.SpecialExecutors"
-          factory-method="newBlockingBoundedCachedThreadPool">
-    <argument value="${max-data-broker-future-callback-pool-size}"/>
-    <argument value="${max-data-broker-future-callback-queue-size}"/>
-    <argument value="CommitFutures"/>
-    <argument>
-    <!-- We should use a more specific class -->
-      <bean factory-ref="operDatastore" factory-method="getClass"/>
-    </argument>
-  </bean>
-
-  <bean id="commitStatsTracker" class="org.opendaylight.yangtools.util.DurationStatisticsTracker"
-          factory-method="createConcurrent"/>
-
-  <bean id="clusteredDOMDataBroker" class="org.opendaylight.controller.cluster.databroker.ConcurrentDOMDataBroker"
-          destroy-method="close">
-    <argument>
-      <map>
-        <entry key="CONFIGURATION" value-ref="configDatastore"/>
-        <entry key="OPERATIONAL" value-ref="operDatastore"/>
-      </map>
-    </argument>
-    <argument ref="listenableFutureExecutor"/>
-    <argument ref="commitStatsTracker"/>
-  </bean>
-
-  <service ref="clusteredDOMDataBroker" interface="org.opendaylight.mdsal.dom.api.DOMDataBroker"
-          odl:type="default"/>
-
-  <!-- JMX beans for the data broker -->
-
-  <bean id="commitStatsMXBean" class="org.opendaylight.controller.cluster.datastore.jmx.mbeans.CommitStatsMXBeanImpl"
-          init-method="register" destroy-method="unregister">
-    <argument ref="commitStatsTracker"/>
-    <argument value="DOMDataBroker"/>
-  </bean>
-
-  <bean id="threadStatsMXBean" class="org.opendaylight.controller.md.sal.common.util.jmx.ThreadExecutorStatsMXBeanImpl"
-          factory-method="create" destroy-method="unregister">
-    <argument ref="listenableFutureExecutor"/>
-    <argument value="CommitFutureExecutorStats"/>
-    <argument value="DOMDataBroker"/>
-  </bean>
-
-  <!-- CDS shard manager -->
-  <bean id="cdsNodeManager" class="org.opendaylight.controller.cluster.sharding.DistributedShardedDOMDataTree"
-          init-method="init">
-    <argument ref="actorSystemProvider"/>
-    <argument ref="operDatastore"/>
-    <argument ref="configDatastore"/>
-  </bean>
-
-  <service ref="cdsNodeManager" odl:type="default">
-    <interfaces>
-      <value>org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService</value>
-      <value>org.opendaylight.mdsal.dom.api.DOMDataTreeService</value>
-      <value>org.opendaylight.controller.cluster.sharding.DistributedShardFactory</value>
-    </interfaces>
-  </service>
-
-</blueprint>

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.