2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.netconf.topology.pipeline;
12 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
14 * This program and the accompanying materials are made available under the
15 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
16 * and is available at http://www.eclipse.org/legal/epl-v10.html
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.TypedActor;
22 import akka.actor.TypedProps;
23 import akka.cluster.Cluster;
24 import akka.cluster.Member;
25 import akka.dispatch.Futures;
26 import akka.dispatch.OnComplete;
27 import java.util.List;
29 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
30 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp;
31 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp;
32 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp;
33 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
34 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
35 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
36 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
37 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
38 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import scala.concurrent.Future;
42 import scala.concurrent.Promise;
45 public class ClusteredDeviceSourcesResolverImpl implements ClusteredDeviceSourcesResolver {
47 private static Logger LOG = LoggerFactory.getLogger(ClusteredDeviceSourcesResolver.class);
49 private final String topologyId;
50 private final String nodeId;
51 private final ActorSystem actorSystem;
52 private final SchemaSourceRegistry schemaRegistry;
53 private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations;
55 private final Promise<Set<SourceIdentifier>> resolvedSourcesPromise;
56 private MasterSourceProvider remoteYangTextSourceProvider;
58 public ClusteredDeviceSourcesResolverImpl(String topologyId, String nodeId, ActorSystem actorSystem,
59 SchemaSourceRegistry schemaRegistry,
60 List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations) {
61 this.topologyId = topologyId;
63 this.actorSystem = actorSystem;
64 this.schemaRegistry = schemaRegistry;
65 this.sourceRegistrations = sourceRegistrations;
66 resolvedSourcesPromise = Futures.promise();
70 public void preStart(){
71 Cluster cluster = Cluster.get(actorSystem);
72 for(Member node : cluster.state().getMembers()) {
73 if(!node.address().equals(cluster.selfAddress())) {
74 final String path = node.address() + "/user/" + topologyId + "/" + nodeId + "/masterSourceProvider";
75 actorSystem.actorSelection(path).tell(new AnnounceClusteredDeviceSourcesResolverUp(), TypedActor.context().self());
81 public void onReceive(Object o, ActorRef actorRef) {
82 if(o instanceof AnnounceMasterSourceProviderUp) {
83 if(remoteYangTextSourceProvider == null) {
84 remoteYangTextSourceProvider = TypedActor.get(actorSystem).typedActorOf(
85 new TypedProps<>(MasterSourceProvider.class,
86 MasterSourceProviderImpl.class), actorRef);
87 registerProvidedSourcesToSchemaRegistry();
89 } else if(o instanceof AnnounceMasterOnSameNodeUp) {
90 resolvedSourcesPromise.failure(new MasterSourceProviderOnSameNodeException());
94 private void registerProvidedSourcesToSchemaRegistry() {
95 Future<Set<SourceIdentifier>> sourcesFuture = remoteYangTextSourceProvider.getProvidedSources();
96 resolvedSourcesPromise.completeWith(sourcesFuture);
97 final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, actorSystem.dispatcher());
99 sourcesFuture.onComplete(new OnComplete<Set<SourceIdentifier>>() {
101 public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
102 for (SourceIdentifier sourceId : sourceIdentifiers) {
103 sourceRegistrations.add(schemaRegistry.registerSchemaSource(remoteProvider,
104 PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
107 }, actorSystem.dispatcher());
111 public Future<Set<SourceIdentifier>> getResolvedSources() {
112 return resolvedSourcesPromise.future();