Clustered sources resolution
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / pipeline / ClusteredDeviceSourcesResolverImpl.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9
10 package org.opendaylight.netconf.topology.pipeline;
11 /*
12  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
13  *
14  * This program and the accompanying materials are made available under the
15  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
16  * and is available at http://www.eclipse.org/legal/epl-v10.html
17  */
18
19 import akka.actor.ActorRef;
20 import akka.actor.ActorSystem;
21 import akka.actor.TypedActor;
22 import akka.actor.TypedProps;
23 import akka.cluster.Cluster;
24 import akka.cluster.Member;
25 import akka.dispatch.Futures;
26 import akka.dispatch.OnComplete;
27 import java.util.List;
28 import java.util.Set;
29 import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
30 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceClusteredDeviceSourcesResolverUp;
31 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterOnSameNodeUp;
32 import org.opendaylight.netconf.topology.pipeline.messages.AnnounceMasterSourceProviderUp;
33 import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceRepresentation;
34 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
35 import org.opendaylight.yangtools.yang.model.repo.api.YangTextSchemaSource;
36 import org.opendaylight.yangtools.yang.model.repo.spi.PotentialSchemaSource;
37 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistration;
38 import org.opendaylight.yangtools.yang.model.repo.spi.SchemaSourceRegistry;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import scala.concurrent.Future;
42 import scala.concurrent.Promise;
43
44
45 public class ClusteredDeviceSourcesResolverImpl implements ClusteredDeviceSourcesResolver {
46
47     private static Logger LOG = LoggerFactory.getLogger(ClusteredDeviceSourcesResolver.class);
48
49     private final String topologyId;
50     private final String nodeId;
51     private final ActorSystem actorSystem;
52     private final SchemaSourceRegistry schemaRegistry;
53     private final List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations;
54
55     private final Promise<Set<SourceIdentifier>> resolvedSourcesPromise;
56     private MasterSourceProvider remoteYangTextSourceProvider;
57
58     public ClusteredDeviceSourcesResolverImpl(String topologyId, String nodeId, ActorSystem actorSystem,
59                                               SchemaSourceRegistry schemaRegistry,
60                                               List<SchemaSourceRegistration<? extends SchemaSourceRepresentation>> sourceRegistrations) {
61         this.topologyId = topologyId;
62         this.nodeId = nodeId;
63         this.actorSystem = actorSystem;
64         this.schemaRegistry = schemaRegistry;
65         this.sourceRegistrations = sourceRegistrations;
66         resolvedSourcesPromise = Futures.promise();
67     }
68
69     @Override
70     public void preStart(){
71         Cluster cluster = Cluster.get(actorSystem);
72         for(Member node : cluster.state().getMembers()) {
73             if(!node.address().equals(cluster.selfAddress())) {
74                 final String path = node.address() + "/user/" + topologyId + "/" + nodeId + "/masterSourceProvider";
75                 actorSystem.actorSelection(path).tell(new AnnounceClusteredDeviceSourcesResolverUp(), TypedActor.context().self());
76             }
77         }
78     }
79
80     @Override
81     public void onReceive(Object o, ActorRef actorRef) {
82         if(o instanceof AnnounceMasterSourceProviderUp) {
83             if(remoteYangTextSourceProvider == null) {
84                 remoteYangTextSourceProvider = TypedActor.get(actorSystem).typedActorOf(
85                         new TypedProps<>(MasterSourceProvider.class,
86                                 MasterSourceProviderImpl.class), actorRef);
87                 registerProvidedSourcesToSchemaRegistry();
88             }
89         } else if(o instanceof AnnounceMasterOnSameNodeUp) {
90             resolvedSourcesPromise.failure(new MasterSourceProviderOnSameNodeException());
91         }
92     }
93
94     private void registerProvidedSourcesToSchemaRegistry() {
95         Future<Set<SourceIdentifier>> sourcesFuture = remoteYangTextSourceProvider.getProvidedSources();
96         resolvedSourcesPromise.completeWith(sourcesFuture);
97         final RemoteSchemaProvider remoteProvider = new RemoteSchemaProvider(remoteYangTextSourceProvider, actorSystem.dispatcher());
98
99         sourcesFuture.onComplete(new OnComplete<Set<SourceIdentifier>>() {
100             @Override
101             public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
102                 for (SourceIdentifier sourceId : sourceIdentifiers) {
103                    sourceRegistrations.add(schemaRegistry.registerSchemaSource(remoteProvider,
104                            PotentialSchemaSource.create(sourceId, YangTextSchemaSource.class, PotentialSchemaSource.Costs.REMOTE_IO.getValue())));
105                 }
106             }
107         }, actorSystem.dispatcher());
108     }
109
110     @Override
111     public Future<Set<SourceIdentifier>> getResolvedSources() {
112         return resolvedSourcesPromise.future();
113     }
114 }