<version>1.6.5</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-model-api</artifactId>
+ </dependency>
</dependencies>
<build>
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.TypedActor;
+import java.util.Set;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import scala.concurrent.Future;
+
+public interface ClusteredDeviceSourcesResolver extends TypedActor.Receiver, TypedActor.PreStart {
+
+ Future<Set<SourceIdentifier>> getResolvedSources();
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+
+package org.opendaylight.netconf.topology.pipeline;
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.TypedActor;
+import akka.actor.TypedProps;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import akka.dispatch.Futures;
+import akka.dispatch.OnComplete;
+import java.util.List;
+import java.util.Set;
+import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
+import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+
+
+public class ClusteredDeviceSourcesResolverImpl implements ClusteredDeviceSourcesResolver {
+
+ private static Logger LOG = LoggerFactory.getLogger(ClusteredDeviceSourcesResolver.class);
+
+ private final String topologyId;
+ private final String nodeId;
+ private final ActorSystem actorSystem;
+ private final SchemaSourceRegistry schemaRegistry;
+ private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations;
+
+ private final Promise<Set<SourceIdentifier>> resolvedSourcesPromise;
+ private MasterSourceProvider remoteYangTextSourceProvider;
+
+ public ClusteredDeviceSourcesResolverImpl(String topologyId, String nodeId, ActorSystem actorSystem,
+ SchemaSourceRegistry schemaRegistry,
+ List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations) {
+ this.topologyId = topologyId;
+ this.nodeId = nodeId;
+ this.actorSystem = actorSystem;
+ this.schemaRegistry = schemaRegistry;
+ this.sourceRegistrations = sourceRegistrations;
+ resolvedSourcesPromise = Futures.promise();
+ }
+
+ @Override
+ public void preStart(){
+ Cluster cluster = Cluster.get(actorSystem);
+ for(Member node : cluster.state().getMembers()) {
+ if(!node.address().equals(cluster.selfAddress())) {
+ final String path = node.address() + "/user/" + topologyId + "/" + nodeId + "/masterSourceProvider";
+ actorSystem.actorSelection(path).tell(new AnnounceClusteredDeviceSourcesResolverUp(), TypedActor.context().self());
+ }
+ }
+ }
+
+ @Override
+ public void onReceive(Object o, ActorRef actorRef) {
+ if(o instanceof AnnounceMasterSourceProviderUp) {
+ if(remoteYangTextSourceProvider == null) {
+ remoteYangTextSourceProvider = TypedActor.get(actorSystem).typedActorOf(
+ new TypedProps<>(MasterSourceProvider.class,
+ MasterSourceProviderImpl.class), actorRef);
+ registerProvidedSourcesToSchemaRegistry();
+ }
+ } else if(o instanceof AnnounceMasterOnSameNodeUp) {
+ resolvedSourcesPromise.failure(new MasterSourceProviderOnSameNodeException());
+ }
+ }
+
+ private void registerProvidedSourcesToSchemaRegistry() {
+ Future<Set<SourceIdentifier>> sourcesFuture = remoteYangTextSourceProvider.getProvidedSources();
+ resolvedSourcesPromise.completeWith(sourcesFuture);
+ final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, actorSystem.dispatcher());
+
+ sourcesFuture.onComplete(new OnComplete<Set<SourceIdentifier>>() {
+ @Override
+ public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
+ for (SourceIdentifier sourceId : sourceIdentifiers) {
+ sourceRegistrations.add(schemaRegistry.registerSchemaSource(remoteProvider,
+ PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
+ }
+ }
+ }, actorSystem.dispatcher());
+ }
+
+ @Override
+ public Future<Set<SourceIdentifier>> getResolvedSources() {
+ return resolvedSourcesPromise.future();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.TypedActor;
+import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
+
+public interface MasterSourceProvider
+ extends TypedActor.PreStart, TypedActor.Receiver, RemoteYangTextSourceProvider {
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.netconf.topology.pipeline;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.TypedActor;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import java.util.Set;
+import org.opendaylight.controller.cluster.schema.provider.impl.RemoteYangTextSourceProviderImpl;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp;
+import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp;
+import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
+import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MasterSourceProviderImpl extends RemoteYangTextSourceProviderImpl
+ implements MasterSourceProvider {
+
+ private static Logger LOG = LoggerFactory.getLogger(MasterSourceProviderImpl.class);
+
+ private final ActorSystem actorSystem;
+ private final String topologyId;
+ private final String nodeId;
+
+ public MasterSourceProviderImpl(SchemaRepository schemaRepo, Set<SourceIdentifier> providedSources, ActorSystem actorSystem, String topologyId, String nodeId) {
+ super(schemaRepo, providedSources);
+ this.actorSystem = actorSystem;
+ this.topologyId = topologyId;
+ this.nodeId = nodeId;
+ }
+
+ @Override
+ public void onReceive(Object o, ActorRef actorRef) {
+ if(o instanceof AnnounceClusteredDeviceSourcesResolverUp) {
+ LOG.debug("Received source resolver up");
+ actorRef.tell(new AnnounceMasterSourceProviderUp(), TypedActor.context().self());
+ }
+ }
+
+ @Override
+ public void preStart() {
+ Cluster cluster = Cluster.get(actorSystem);
+ cluster.join(cluster.selfAddress());
+ LOG.debug("Notifying members master schema source provider is up.");
+ for(Member node : cluster.state().getMembers()) {
+ final String path = node.address() + "/user/" + topologyId + "/" + nodeId + "/clusteredDeviceSourcesResolver";
+ if(node.address().equals(cluster.selfAddress())) {
+ actorSystem.actorSelection(path).tell(new AnnounceMasterOnSameNodeUp(), TypedActor.context().self());
+ actorSystem.actorSelection(path).tell(PoisonPill.getInstance(), TypedActor.context().self());
+ } else {
+ //TODO extract string constant to util class
+ actorSystem.actorSelection(path).tell(new AnnounceMasterSourceProviderUp(), TypedActor.context().self());
+ }
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline;
+
+public class MasterSourceProviderOnSameNodeException extends Exception {
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.messages;
+
+import java.io.Serializable;
+
+public class AnnounceClusteredDeviceSourcesResolverUp implements Serializable {
+ public static final long serialVersionUID = 1L;
+
+ public AnnounceClusteredDeviceSourcesResolverUp() {}
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterOnSameNodeUp implements Serializable {
+ public static long serialVersionUID = 1L;
+
+ public AnnounceMasterOnSameNodeUp() {
+
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.pipeline.messages;
+
+import java.io.Serializable;
+
+public class AnnounceMasterSourceProviderUp implements Serializable {
+ public static final long serialVersionUID = 1L;
+
+ public AnnounceMasterSourceProviderUp() {
+
+ }
+}