2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.netconf.topology.pipeline;
12 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
14 * This program and the accompanying materials are made available under the
15 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
16 * and is available at http://www.eclipse.org/legal/epl-v10.html
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.TypedActor;
22 import akka.actor.TypedProps;
23 import akka.cluster.Cluster;
24 import akka.cluster.Member;
25 import akka.dispatch.Futures;
26 import akka.dispatch.OnComplete;
27 import java.util.List;
29 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
30 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp;
31 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp;
32 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp;
33 import org.opendaylight.netconf.util.NetconfTopologyPathCreator;
34 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
35 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
36 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
37 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
38 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
39 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.concurrent.Future;
43 import scala.concurrent.Promise;
46 public class ClusteredDeviceSourcesResolverImpl implements ClusteredDeviceSourcesResolver {
48 private static Logger LOG = LoggerFactory.getLogger(ClusteredDeviceSourcesResolver.class);
50 private final String topologyId;
51 private final String nodeId;
52 private final ActorSystem actorSystem;
53 private final SchemaSourceRegistry schemaRegistry;
54 private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations;
56 private final Promise<Set<SourceIdentifier>> resolvedSourcesPromise;
57 private MasterSourceProvider remoteYangTextSourceProvider;
59 public ClusteredDeviceSourcesResolverImpl(String topologyId, String nodeId, ActorSystem actorSystem,
60 SchemaSourceRegistry schemaRegistry,
61 List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations) {
62 this.topologyId = topologyId;
64 this.actorSystem = actorSystem;
65 this.schemaRegistry = schemaRegistry;
66 this.sourceRegistrations = sourceRegistrations;
67 resolvedSourcesPromise = Futures.promise();
71 public void preStart(){
72 Cluster cluster = Cluster.get(actorSystem);
73 for(Member node : cluster.state().getMembers()) {
74 if(!node.address().equals(cluster.selfAddress())) {
75 final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(node.address().toString(), topologyId);
76 final String path = pathCreator.withSuffix(nodeId).withSuffix(NetconfTopologyPathCreator.MASTER_SOURCE_PROVIDER).build();
77 actorSystem.actorSelection(path).tell(new AnnounceClusteredDeviceSourcesResolverUp(), TypedActor.context().self());
83 public void onReceive(Object o, ActorRef actorRef) {
84 if(o instanceof AnnounceMasterSourceProviderUp) {
85 if(remoteYangTextSourceProvider == null) {
86 remoteYangTextSourceProvider = TypedActor.get(actorSystem).typedActorOf(
87 new TypedProps<>(MasterSourceProvider.class,
88 MasterSourceProviderImpl.class), actorRef);
89 registerProvidedSourcesToSchemaRegistry();
91 } else if(o instanceof AnnounceMasterOnSameNodeUp) {
92 resolvedSourcesPromise.failure(new MasterSourceProviderOnSameNodeException());
96 private void registerProvidedSourcesToSchemaRegistry() {
97 Future<Set<SourceIdentifier>> sourcesFuture = remoteYangTextSourceProvider.getProvidedSources();
98 resolvedSourcesPromise.completeWith(sourcesFuture);
99 final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, actorSystem.dispatcher());
101 sourcesFuture.onComplete(new OnComplete<Set<SourceIdentifier>>() {
103 public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
104 for (SourceIdentifier sourceId : sourceIdentifiers) {
105 sourceRegistrations.add(schemaRegistry.registerSchemaSource(remoteProvider,
106 PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
109 }, actorSystem.dispatcher());
113 public Future<Set<SourceIdentifier>> getResolvedSources() {
114 return resolvedSourcesPromise.future();