Cluster schema resolution pipeline
[netconf.git] / opendaylight / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / pipeline / ClusteredNetconfDevice.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.pipeline;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorSystem;
13 import akka.actor.TypedActor;
14 import akka.actor.TypedProps;
15 import akka.dispatch.OnComplete;
16 import akka.japi.Creator;
17 import com.google.common.base.Optional;
18 import com.google.common.collect.Sets;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import java.util.Set;
22 import java.util.concurrent.ExecutorService;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
24 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
26 import org.opendaylight.netconf.api.NetconfMessage;
27 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
28 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
29 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
30 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
31 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
32 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
33 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceRpc;
34 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
35 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
36 import org.opendaylight.yangtools.yang.common.QName;
37 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
38 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
41 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 public class ClusteredNetconfDevice extends NetconfDevice implements EntityOwnershipListener {
46
47     private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfDevice.class);
48
49     private boolean isMaster = false;
50     private NetconfDeviceCommunicator listener;
51     private NetconfSessionPreferences sessionPreferences;
52     private SchemaRepository schemaRepo;
53     private final ActorSystem actorSystem;
54     private final String topologyId;
55     private final String nodeId;
56     private final ActorContext cachedContext;
57
58     private MasterSourceProvider masterSourceProvider = null;
59     private ClusteredDeviceSourcesResolver resolver = null;
60
61     public ClusteredNetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
62                                   final ExecutorService globalProcessingExecutor, SchemaRepository schemaRepo, ActorSystem actorSystem, String topologyId, String nodeId,
63                                   ActorContext cachedContext) {
64         super(schemaResourcesDTO, id, salFacade, globalProcessingExecutor);
65         this.schemaRepo = schemaRepo;
66         this.actorSystem = actorSystem;
67         this.topologyId = topologyId;
68         this.nodeId = nodeId;
69         this.cachedContext = cachedContext;
70     }
71
72     @Override
73     public void onRemoteSessionUp(NetconfSessionPreferences remoteSessionCapabilities, NetconfDeviceCommunicator listener) {
74         LOG.warn("Node {} SessionUp, with capabilities {}", nodeId, remoteSessionCapabilities);
75         this.listener = listener;
76         this.sessionPreferences = remoteSessionCapabilities;
77         slaveSetupSchema();
78     }
79
80
81     @Override
82     protected void handleSalInitializationSuccess(SchemaContext result, NetconfSessionPreferences remoteSessionCapabilities, DOMRpcService deviceRpc) {
83         super.handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
84
85         final Set<SourceIdentifier> sourceIds = Sets.newHashSet();
86         for(ModuleIdentifier id : result.getAllModuleIdentifiers()) {
87             sourceIds.add(SourceIdentifier.create(id.getName(), (SimpleDateFormatUtil.DEFAULT_DATE_REV == id.getRevision() ? Optional.<String>absent() :
88                     Optional.of(SimpleDateFormatUtil.getRevisionFormat().format(id.getRevision())))));
89         }
90
91         //TODO extract string constant to util class
92         LOG.debug("Creating master source provider");
93         masterSourceProvider = TypedActor.get(cachedContext).typedActorOf(
94                 new TypedProps<>(MasterSourceProvider.class,
95                         new Creator<MasterSourceProviderImpl>() {
96                             @Override
97                             public MasterSourceProviderImpl create() throws Exception {
98                                 return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
99                             }
100                         }), "masterSourceProvider");
101     }
102
103     @Override
104     public void onRemoteSessionDown() {
105         super.onRemoteSessionDown();
106         listener = null;
107         sessionPreferences = null;
108         if (masterSourceProvider != null) {
109             // if we have master the slave that started on this node should be already killed via PoisonPill, so stop master only now
110             LOG.debug("Stopping master source provider for node {}", nodeId);
111             TypedActor.get(actorSystem).stop(masterSourceProvider);
112             masterSourceProvider = null;
113         } else {
114             LOG.debug("Stopping slave source resolver for node {}", nodeId);
115             TypedActor.get(actorSystem).stop(resolver);
116             resolver = null;
117         }
118     }
119
120     private void slaveSetupSchema() {
121         //TODO extract string constant to util class
122         resolver = TypedActor.get(cachedContext).typedActorOf(
123                 new TypedProps<>(ClusteredDeviceSourcesResolver.class,
124                         new Creator<ClusteredDeviceSourcesResolverImpl>() {
125                             @Override
126                             public ClusteredDeviceSourcesResolverImpl create() throws Exception {
127                                 return new ClusteredDeviceSourcesResolverImpl(topologyId, nodeId, actorSystem, schemaRegistry, sourceRegistrations);
128                             }
129                         }), "clusteredDeviceSourcesResolver");
130
131
132         final FutureCallback<SchemaContext> schemaContextFuture = new FutureCallback<SchemaContext>() {
133             @Override
134             public void onSuccess(SchemaContext schemaContext) {
135                 LOG.debug("{}: Schema context built successfully.", id);
136
137                 final NetconfDeviceCapabilities deviceCap = sessionPreferences.getNetconfDeviceCapabilities();
138                 final Set<QName> providedSourcesQnames = Sets.newHashSet();
139                 for(ModuleIdentifier id : schemaContext.getAllModuleIdentifiers()) {
140                     providedSourcesQnames.add(QName.create(id.getQNameModule(), id.getName()));
141                 }
142
143                 deviceCap.addNonModuleBasedCapabilities(sessionPreferences.getNonModuleCaps());
144                 deviceCap.addCapabilities(providedSourcesQnames);
145
146                 ClusteredNetconfDevice.super.handleSalInitializationSuccess(
147                         schemaContext, sessionPreferences, getDeviceSpecificRpc(schemaContext, listener));
148             }
149
150             @Override
151             public void onFailure(Throwable throwable) {
152                 LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
153                 handleSalInitializationFailure(throwable, listener);
154             }
155         };
156
157         resolver.getResolvedSources().onComplete(
158                 new OnComplete<Set<SourceIdentifier>>() {
159                     @Override
160                     public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
161                         if(throwable != null) {
162                             if(throwable instanceof MasterSourceProviderOnSameNodeException) {
163                                 //do nothing
164                             } else {
165                                 LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
166                                 handleSalInitializationFailure(throwable, listener);
167                             }
168                         } else {
169                             LOG.trace("{}: Trying to build schema context from {}", id, sourceIdentifiers);
170                             Futures.addCallback(schemaContextFactory.createSchemaContext(sourceIdentifiers), schemaContextFuture);
171                         }
172                     }
173                 }, actorSystem.dispatcher());
174     }
175
176     private NetconfDeviceRpc getDeviceSpecificRpc(SchemaContext result, RemoteDeviceCommunicator<NetconfMessage> listener) {
177         return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
178     }
179
180     @Override
181     public void ownershipChanged(EntityOwnershipChange ownershipChange) {
182         LOG.debug("Entity ownership change received {}", ownershipChange);
183         if(ownershipChange.isOwner()) {
184             super.onRemoteSessionUp(sessionPreferences, listener);
185         } else if (ownershipChange.wasOwner()) {
186             slaveSetupSchema();
187         }
188     }
189 }