builder class for path creating
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / pipeline / ClusteredDeviceSourcesResolverImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9
10 package org.opendaylight.netconf.topology.pipeline;
11 /*
12  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
13  *
14  * This program and the accompanying materials are made available under the
15  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
16  * and is available at http://www.eclipse.org/legal/epl-v10.html
17  */
18
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.TypedActor;
22 import akka.actor.TypedProps;
23 import akka.cluster.Cluster;
24 import akka.cluster.Member;
25 import akka.dispatch.Futures;
26 import akka.dispatch.OnComplete;
27 import java.util.List;
28 import java.util.Set;
29 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
30 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp;
31 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp;
32 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp;
33 import org.opendaylight.netconf.util.NetconfTopologyPathCreator;
34 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
35 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
36 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
37 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
38 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
39 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.concurrent.Future;
43 import scala.concurrent.Promise;
44
45
46 public class ClusteredDeviceSourcesResolverImpl implements ClusteredDeviceSourcesResolver {
47
48     private static Logger LOG = LoggerFactory.getLogger(ClusteredDeviceSourcesResolver.class);
49
50     private final String topologyId;
51     private final String nodeId;
52     private final ActorSystem actorSystem;
53     private final SchemaSourceRegistry schemaRegistry;
54     private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations;
55
56     private final Promise<Set<SourceIdentifier>> resolvedSourcesPromise;
57     private MasterSourceProvider remoteYangTextSourceProvider;
58
59     public ClusteredDeviceSourcesResolverImpl(String topologyId, String nodeId, ActorSystem actorSystem,
60                                               SchemaSourceRegistry schemaRegistry,
61                                               List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations) {
62         this.topologyId = topologyId;
63         this.nodeId = nodeId;
64         this.actorSystem = actorSystem;
65         this.schemaRegistry = schemaRegistry;
66         this.sourceRegistrations = sourceRegistrations;
67         resolvedSourcesPromise = Futures.promise();
68     }
69
70     @Override
71     public void preStart(){
72         Cluster cluster = Cluster.get(actorSystem);
73         for(Member node : cluster.state().getMembers()) {
74             if(!node.address().equals(cluster.selfAddress())) {
75                 final NetconfTopologyPathCreator pathCreator = new NetconfTopologyPathCreator(node.address().toString(), topologyId);
76                 final String path = pathCreator.withSuffix(nodeId).withSuffix(NetconfTopologyPathCreator.MASTER_SOURCE_PROVIDER).build();
77                 actorSystem.actorSelection(path).tell(new AnnounceClusteredDeviceSourcesResolverUp(), TypedActor.context().self());
78             }
79         }
80     }
81
82     @Override
83     public void onReceive(Object o, ActorRef actorRef) {
84         if(o instanceof AnnounceMasterSourceProviderUp) {
85             if(remoteYangTextSourceProvider == null) {
86                 remoteYangTextSourceProvider = TypedActor.get(actorSystem).typedActorOf(
87                         new TypedProps<>(MasterSourceProvider.class,
88                                 MasterSourceProviderImpl.class), actorRef);
89                 registerProvidedSourcesToSchemaRegistry();
90             }
91         } else if(o instanceof AnnounceMasterOnSameNodeUp) {
92             resolvedSourcesPromise.failure(new MasterSourceProviderOnSameNodeException());
93         }
94     }
95
96     private void registerProvidedSourcesToSchemaRegistry() {
97         Future<Set<SourceIdentifier>> sourcesFuture = remoteYangTextSourceProvider.getProvidedSources();
98         resolvedSourcesPromise.completeWith(sourcesFuture);
99         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, actorSystem.dispatcher());
100
101         sourcesFuture.onComplete(new OnComplete<Set<SourceIdentifier>>() {
102             @Override
103             public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
104                 for (SourceIdentifier sourceId : sourceIdentifiers) {
105                    sourceRegistrations.add(schemaRegistry.registerSchemaSource(remoteProvider,
106                            PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
107                 }
108             }
109         }, actorSystem.dispatcher());
110     }
111
112     @Override
113     public Future<Set<SourceIdentifier>> getResolvedSources() {
114         return resolvedSourcesPromise.future();
115     }
116 }