builder class for path creating
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / pipeline / ClusteredNetconfDevice.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.pipeline;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorSystem;
13 import akka.actor.TypedActor;
14 import akka.actor.TypedProps;
15 import akka.dispatch.OnComplete;
16 import akka.japi.Creator;
17 import com.google.common.base.Optional;
18 import com.google.common.collect.Sets;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import java.util.Set;
22 import java.util.concurrent.ExecutorService;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
24 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
26 import org.opendaylight.netconf.api.NetconfMessage;
27 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
28 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
29 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
30 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
31 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
32 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
33 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceRpc;
34 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
35 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
36 import org.opendaylight.netconf.util.NetconfTopologyPathCreator;
37 import org.opendaylight.yangtools.yang.common.QName;
38 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
39 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
42 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public class ClusteredNetconfDevice extends NetconfDevice implements EntityOwnershipListener {
47
48     private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfDevice.class);
49
50     private boolean isMaster = false;
51     private NetconfDeviceCommunicator listener;
52     private NetconfSessionPreferences sessionPreferences;
53     private SchemaRepository schemaRepo;
54     private final ActorSystem actorSystem;
55     private final String topologyId;
56     private final String nodeId;
57     private final ActorContext cachedContext;
58
59     private MasterSourceProvider masterSourceProvider = null;
60     private ClusteredDeviceSourcesResolver resolver = null;
61
62     public ClusteredNetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
63                                   final ExecutorService globalProcessingExecutor, final ActorSystem actorSystem, final String topologyId, final String nodeId,
64                                   ActorContext cachedContext) {
65         super(schemaResourcesDTO, id, salFacade, globalProcessingExecutor);
66         this.schemaRepo = (SchemaRepository) schemaResourcesDTO.getSchemaRegistry();
67         this.actorSystem = actorSystem;
68         this.topologyId = topologyId;
69         this.nodeId = nodeId;
70         this.cachedContext = cachedContext;
71     }
72
73     @Override
74     public void onRemoteSessionUp(NetconfSessionPreferences remoteSessionCapabilities, NetconfDeviceCommunicator listener) {
75         LOG.warn("Node {} SessionUp, with capabilities {}", nodeId, remoteSessionCapabilities);
76         this.listener = listener;
77         this.sessionPreferences = remoteSessionCapabilities;
78         slaveSetupSchema();
79     }
80
81
82     @Override
83     protected void handleSalInitializationSuccess(SchemaContext result, NetconfSessionPreferences remoteSessionCapabilities, DOMRpcService deviceRpc) {
84         super.handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
85
86         final Set<SourceIdentifier> sourceIds = Sets.newHashSet();
87         for(ModuleIdentifier id : result.getAllModuleIdentifiers()) {
88             sourceIds.add(SourceIdentifier.create(id.getName(), (SimpleDateFormatUtil.DEFAULT_DATE_REV == id.getRevision() ? Optional.<String>absent() :
89                     Optional.of(SimpleDateFormatUtil.getRevisionFormat().format(id.getRevision())))));
90         }
91
92         //TODO extract string constant to util class
93         LOG.debug("Creating master source provider");
94         masterSourceProvider = TypedActor.get(cachedContext).typedActorOf(
95                 new TypedProps<>(MasterSourceProvider.class,
96                         new Creator<MasterSourceProviderImpl>() {
97                             @Override
98                             public MasterSourceProviderImpl create() throws Exception {
99                                 return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
100                             }
101                         }), NetconfTopologyPathCreator.MASTER_SOURCE_PROVIDER);
102     }
103
104     @Override
105     public void onRemoteSessionDown() {
106         super.onRemoteSessionDown();
107         listener = null;
108         sessionPreferences = null;
109         if (masterSourceProvider != null) {
110             // if we have master the slave that started on this node should be already killed via PoisonPill, so stop master only now
111             LOG.debug("Stopping master source provider for node {}", nodeId);
112             TypedActor.get(actorSystem).stop(masterSourceProvider);
113             masterSourceProvider = null;
114         } else {
115             LOG.debug("Stopping slave source resolver for node {}", nodeId);
116             TypedActor.get(actorSystem).stop(resolver);
117             resolver = null;
118         }
119     }
120
121     private void slaveSetupSchema() {
122         //TODO extract string constant to util class
123         resolver = TypedActor.get(cachedContext).typedActorOf(
124                 new TypedProps<>(ClusteredDeviceSourcesResolver.class,
125                         new Creator<ClusteredDeviceSourcesResolverImpl>() {
126                             @Override
127                             public ClusteredDeviceSourcesResolverImpl create() throws Exception {
128                                 return new ClusteredDeviceSourcesResolverImpl(topologyId, nodeId, actorSystem, schemaRegistry, sourceRegistrations);
129                             }
130                         }), NetconfTopologyPathCreator.CLUSTERED_DEVICE_SOURCES_RESOLVER);
131
132         final FutureCallback<SchemaContext> schemaContextFuture = new FutureCallback<SchemaContext>() {
133             @Override
134             public void onSuccess(SchemaContext schemaContext) {
135                 LOG.debug("{}: Schema context built successfully.", id);
136
137                 final NetconfDeviceCapabilities deviceCap = sessionPreferences.getNetconfDeviceCapabilities();
138                 final Set<QName> providedSourcesQnames = Sets.newHashSet();
139                 for(ModuleIdentifier id : schemaContext.getAllModuleIdentifiers()) {
140                     providedSourcesQnames.add(QName.create(id.getQNameModule(), id.getName()));
141                 }
142
143                 deviceCap.addNonModuleBasedCapabilities(sessionPreferences.getNonModuleCaps());
144                 deviceCap.addCapabilities(providedSourcesQnames);
145
146                 ClusteredNetconfDevice.super.handleSalInitializationSuccess(
147                         schemaContext, sessionPreferences, getDeviceSpecificRpc(schemaContext, listener));
148             }
149
150             @Override
151             public void onFailure(Throwable throwable) {
152                 LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
153                 handleSalInitializationFailure(throwable, listener);
154             }
155         };
156
157         resolver.getResolvedSources().onComplete(
158                 new OnComplete<Set<SourceIdentifier>>() {
159                     @Override
160                     public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
161                         if(throwable != null) {
162                             if(throwable instanceof MasterSourceProviderOnSameNodeException) {
163                                 //do nothing
164                             } else {
165                                 LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
166                                 handleSalInitializationFailure(throwable, listener);
167                             }
168                         } else {
169                             LOG.trace("{}: Trying to build schema context from {}", id, sourceIdentifiers);
170                             Futures.addCallback(schemaContextFactory.createSchemaContext(sourceIdentifiers), schemaContextFuture);
171                         }
172                     }
173                 }, actorSystem.dispatcher());
174     }
175
176     private NetconfDeviceRpc getDeviceSpecificRpc(SchemaContext result, RemoteDeviceCommunicator<NetconfMessage> listener) {
177         return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
178     }
179
180     @Override
181     public void ownershipChanged(EntityOwnershipChange ownershipChange) {
182         LOG.debug("Entity ownership change received {}", ownershipChange);
183         if(ownershipChange.isOwner()) {
184             super.onRemoteSessionUp(sessionPreferences, listener);
185         } else if (ownershipChange.wasOwner()) {
186             slaveSetupSchema();
187         }
188     }
189 }