From e1abbf8df6afc50880b4ea3f95f9d455f5cc0237 Mon Sep 17 00:00:00 2001 From: Jakub Morvay Date: Thu, 10 Dec 2015 15:40:05 +0100 Subject: [PATCH] Clustered sources resolution Change-Id: I9fe167ab4185a8ac959b5e1aa6737d1eaff1d90c Signed-off-by: Jakub Morvay --- opendaylight/netconf/netconf-topology/pom.xml | 4 + .../ClusteredDeviceSourcesResolver.java | 19 +++ .../ClusteredDeviceSourcesResolverImpl.java | 114 ++++++++++++++++++ .../pipeline/MasterSourceProvider.java | 15 +++ .../pipeline/MasterSourceProviderImpl.java | 66 ++++++++++ ...sterSourceProviderOnSameNodeException.java | 12 ++ ...ounceClusteredDeviceSourcesResolverUp.java | 17 +++ .../messages/AnnounceMasterOnSameNodeUp.java | 19 +++ .../AnnounceMasterSourceProviderUp.java | 19 +++ 9 files changed, 285 insertions(+) create mode 100644 opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolver.java create mode 100644 opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolverImpl.java create mode 100644 opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProvider.java create mode 100644 opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderImpl.java create mode 100644 opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderOnSameNodeException.java create mode 100644 opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceClusteredDeviceSourcesResolverUp.java create mode 100644 opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterOnSameNodeUp.java create mode 100644 opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterSourceProviderUp.java diff --git a/opendaylight/netconf/netconf-topology/pom.xml b/opendaylight/netconf/netconf-topology/pom.xml index 94b0d40e19..fb36a67f2d 100644 --- a/opendaylight/netconf/netconf-topology/pom.xml +++ b/opendaylight/netconf/netconf-topology/pom.xml @@ -103,6 +103,10 @@ 1.6.5 test + + org.opendaylight.yangtools + yang-model-api + diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolver.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolver.java new file mode 100644 index 0000000000..6e0c1d14ca --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolver.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline; + +import akka.actor.TypedActor; +import java.util.Set; +import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; +import scala.concurrent.Future; + +public interface ClusteredDeviceSourcesResolver extends TypedActor.Receiver, TypedActor.PreStart { + + Future> getResolvedSources(); +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolverImpl.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolverImpl.java new file mode 100644 index 0000000000..3a74e40b01 --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/ClusteredDeviceSourcesResolverImpl.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + + +package org.opendaylight.netconf.topology.pipeline; +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.TypedActor; +import akka.actor.TypedProps; +import akka.cluster.Cluster; +import akka.cluster.Member; +import akka.dispatch.Futures; +import akka.dispatch.OnComplete; +import java.util.List; +import java.util.Set; +import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider; +import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp; +import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp; +import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation; +import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; +import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration; +import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.Promise; + + +public class ClusteredDeviceSourcesResolverImpl implements ClusteredDeviceSourcesResolver { + + private static Logger LOG = LoggerFactory.getLogger(ClusteredDeviceSourcesResolver.class); + + private final String topologyId; + private final String nodeId; + private final ActorSystem actorSystem; + private final SchemaSourceRegistry schemaRegistry; + private final List> sourceRegistrations; + + private final Promise> resolvedSourcesPromise; + private MasterSourceProvider remoteYangTextSourceProvider; + + public ClusteredDeviceSourcesResolverImpl(String topologyId, String nodeId, ActorSystem actorSystem, + SchemaSourceRegistry schemaRegistry, + List> sourceRegistrations) { + this.topologyId = topologyId; + this.nodeId = nodeId; + this.actorSystem = actorSystem; + this.schemaRegistry = schemaRegistry; + this.sourceRegistrations = sourceRegistrations; + resolvedSourcesPromise = Futures.promise(); + } + + @Override + public void preStart(){ + Cluster cluster = Cluster.get(actorSystem); + for(Member node : cluster.state().getMembers()) { + if(!node.address().equals(cluster.selfAddress())) { + final String path = node.address() + "/user/" + topologyId + "/" + nodeId + "/masterSourceProvider"; + actorSystem.actorSelection(path).tell(new AnnounceClusteredDeviceSourcesResolverUp(), TypedActor.context().self()); + } + } + } + + @Override + public void onReceive(Object o, ActorRef actorRef) { + if(o instanceof AnnounceMasterSourceProviderUp) { + if(remoteYangTextSourceProvider == null) { + remoteYangTextSourceProvider = TypedActor.get(actorSystem).typedActorOf( + new TypedProps<>(MasterSourceProvider.class, + MasterSourceProviderImpl.class), actorRef); + registerProvidedSourcesToSchemaRegistry(); + } + } else if(o instanceof AnnounceMasterOnSameNodeUp) { + resolvedSourcesPromise.failure(new MasterSourceProviderOnSameNodeException()); + } + } + + private void registerProvidedSourcesToSchemaRegistry() { + Future> sourcesFuture = remoteYangTextSourceProvider.getProvidedSources(); + resolvedSourcesPromise.completeWith(sourcesFuture); + final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, actorSystem.dispatcher()); + + sourcesFuture.onComplete(new OnComplete>() { + @Override + public void onComplete(Throwable throwable, Set sourceIdentifiers) throws Throwable { + for (SourceIdentifier sourceId : sourceIdentifiers) { + sourceRegistrations.add(schemaRegistry.registerSchemaSource(remoteProvider, + PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue()))); + } + } + }, actorSystem.dispatcher()); + } + + @Override + public Future> getResolvedSources() { + return resolvedSourcesPromise.future(); + } +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProvider.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProvider.java new file mode 100644 index 0000000000..f9726f4e55 --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProvider.java @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.topology.pipeline; + +import akka.actor.TypedActor; +import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider; + +public interface MasterSourceProvider + extends TypedActor.PreStart, TypedActor.Receiver, RemoteYangTextSourceProvider { +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderImpl.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderImpl.java new file mode 100644 index 0000000000..42375dc613 --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderImpl.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.netconf.topology.pipeline; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.TypedActor; +import akka.cluster.Cluster; +import akka.cluster.Member; +import java.util.Set; +import org.opendaylight.controller.cluster.schema.provider.impl.RemoteYangTextSourceProviderImpl; +import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp; +import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp; +import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp; +import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository; +import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MasterSourceProviderImpl extends RemoteYangTextSourceProviderImpl + implements MasterSourceProvider { + + private static Logger LOG = LoggerFactory.getLogger(MasterSourceProviderImpl.class); + + private final ActorSystem actorSystem; + private final String topologyId; + private final String nodeId; + + public MasterSourceProviderImpl(SchemaRepository schemaRepo, Set providedSources, ActorSystem actorSystem, String topologyId, String nodeId) { + super(schemaRepo, providedSources); + this.actorSystem = actorSystem; + this.topologyId = topologyId; + this.nodeId = nodeId; + } + + @Override + public void onReceive(Object o, ActorRef actorRef) { + if(o instanceof AnnounceClusteredDeviceSourcesResolverUp) { + LOG.debug("Received source resolver up"); + actorRef.tell(new AnnounceMasterSourceProviderUp(), TypedActor.context().self()); + } + } + + @Override + public void preStart() { + Cluster cluster = Cluster.get(actorSystem); + cluster.join(cluster.selfAddress()); + LOG.debug("Notifying members master schema source provider is up."); + for(Member node : cluster.state().getMembers()) { + final String path = node.address() + "/user/" + topologyId + "/" + nodeId + "/clusteredDeviceSourcesResolver"; + if(node.address().equals(cluster.selfAddress())) { + actorSystem.actorSelection(path).tell(new AnnounceMasterOnSameNodeUp(), TypedActor.context().self()); + actorSystem.actorSelection(path).tell(PoisonPill.getInstance(), TypedActor.context().self()); + } else { + //TODO extract string constant to util class + actorSystem.actorSelection(path).tell(new AnnounceMasterSourceProviderUp(), TypedActor.context().self()); + } + } + } +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderOnSameNodeException.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderOnSameNodeException.java new file mode 100644 index 0000000000..71f3005edd --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/MasterSourceProviderOnSameNodeException.java @@ -0,0 +1,12 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline; + +public class MasterSourceProviderOnSameNodeException extends Exception { +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceClusteredDeviceSourcesResolverUp.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceClusteredDeviceSourcesResolverUp.java new file mode 100644 index 0000000000..f8328e7264 --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceClusteredDeviceSourcesResolverUp.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline.messages; + +import java.io.Serializable; + +public class AnnounceClusteredDeviceSourcesResolverUp implements Serializable { + public static final long serialVersionUID = 1L; + + public AnnounceClusteredDeviceSourcesResolverUp() {} +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterOnSameNodeUp.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterOnSameNodeUp.java new file mode 100644 index 0000000000..793321c08e --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterOnSameNodeUp.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline.messages; + +import java.io.Serializable; + +public class AnnounceMasterOnSameNodeUp implements Serializable { + public static long serialVersionUID = 1L; + + public AnnounceMasterOnSameNodeUp() { + + } +} diff --git a/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterSourceProviderUp.java b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterSourceProviderUp.java new file mode 100644 index 0000000000..7bec681f51 --- /dev/null +++ b/opendaylight/netconf/netconf-topology/src/main/java/org/opendaylight/netconf/topology/pipeline/messages/AnnounceMasterSourceProviderUp.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.netconf.topology.pipeline.messages; + +import java.io.Serializable; + +public class AnnounceMasterSourceProviderUp implements Serializable { + public static final long serialVersionUID = 1L; + + public AnnounceMasterSourceProviderUp() { + + } +} -- 2.36.6