Add logging in tx facade along with the RemoteDeviceId
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / pipeline / ClusteredNetconfDevice.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.pipeline;
10
11 import akka.actor.ActorContext;
12 import akka.actor.ActorSystem;
13 import akka.actor.TypedActor;
14 import akka.actor.TypedProps;
15 import akka.dispatch.OnComplete;
16 import akka.japi.Creator;
17 import com.google.common.base.Optional;
18 import com.google.common.collect.Sets;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
21 import java.util.Set;
22 import java.util.concurrent.ExecutorService;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
24 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
26 import org.opendaylight.netconf.api.NetconfMessage;
27 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
28 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
29 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
30 import org.opendaylight.netconf.sal.connect.netconf.NetconfDeviceBuilder;
31 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
32 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
33 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
34 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceRpc;
35 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
36 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
37 import org.opendaylight.netconf.util.NetconfTopologyPathCreator;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapability;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.netconf.node.connection.status.available.capabilities.AvailableCapabilityBuilder;
40 import org.opendaylight.yangtools.yang.common.QName;
41 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
42 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
43 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
44 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
45 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 public class ClusteredNetconfDevice extends NetconfDevice implements EntityOwnershipListener {
50
51     private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfDevice.class);
52
53     private NetconfDeviceCommunicator listener;
54     private NetconfSessionPreferences sessionPreferences;
55     private SchemaRepository schemaRepo;
56     private final ActorSystem actorSystem;
57     private final String topologyId;
58     private final String nodeId;
59     private final ActorContext cachedContext;
60
61     private MasterSourceProvider masterSourceProvider = null;
62     private ClusteredDeviceSourcesResolver resolver = null;
63
64     public ClusteredNetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
65                                   final ExecutorService globalProcessingExecutor, final ActorSystem actorSystem, final String topologyId, final String nodeId,
66                                   final ActorContext cachedContext, final boolean reconnectOnSchemaChanged) {
67         super(schemaResourcesDTO, id, salFacade, globalProcessingExecutor, reconnectOnSchemaChanged);
68         this.schemaRepo = (SchemaRepository) schemaResourcesDTO.getSchemaRegistry();
69         this.actorSystem = actorSystem;
70         this.topologyId = topologyId;
71         this.nodeId = nodeId;
72         this.cachedContext = cachedContext;
73     }
74
75     @Override
76     public void onRemoteSessionUp(NetconfSessionPreferences remoteSessionCapabilities, NetconfDeviceCommunicator listener) {
77         LOG.warn("Node {} SessionUp, with capabilities {}", nodeId, remoteSessionCapabilities);
78         this.listener = listener;
79         this.sessionPreferences = remoteSessionCapabilities;
80         slaveSetupSchema();
81     }
82
83
84     @Override
85     protected void handleSalInitializationSuccess(SchemaContext result, NetconfSessionPreferences remoteSessionCapabilities, DOMRpcService deviceRpc) {
86         super.handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
87
88         final Set<SourceIdentifier> sourceIds = Sets.newHashSet();
89         for(ModuleIdentifier id : result.getAllModuleIdentifiers()) {
90             sourceIds.add(SourceIdentifier.create(id.getName(), (SimpleDateFormatUtil.DEFAULT_DATE_REV == id.getRevision() ? Optional.<String>absent() :
91                     Optional.of(SimpleDateFormatUtil.getRevisionFormat().format(id.getRevision())))));
92         }
93
94         //TODO extract string constant to util class
95         LOG.debug("Creating master source provider");
96         masterSourceProvider = TypedActor.get(cachedContext).typedActorOf(
97                 new TypedProps<>(MasterSourceProvider.class,
98                         new Creator<MasterSourceProviderImpl>() {
99                             @Override
100                             public MasterSourceProviderImpl create() throws Exception {
101                                 return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
102                             }
103                         }), NetconfTopologyPathCreator.MASTER_SOURCE_PROVIDER);
104     }
105
106     @Override
107     public void onRemoteSessionDown() {
108         super.onRemoteSessionDown();
109         listener = null;
110         sessionPreferences = null;
111         if (masterSourceProvider != null) {
112             // if we have master the slave that started on this node should be already killed via PoisonPill, so stop master only now
113             LOG.debug("Stopping master source provider for node {}", nodeId);
114             TypedActor.get(actorSystem).stop(masterSourceProvider);
115             masterSourceProvider = null;
116         } else {
117             LOG.debug("Stopping slave source resolver for node {}", nodeId);
118             TypedActor.get(actorSystem).stop(resolver);
119             resolver = null;
120         }
121     }
122
123     private void slaveSetupSchema() {
124         //TODO extract string constant to util class
125         resolver = TypedActor.get(cachedContext).typedActorOf(
126                 new TypedProps<>(ClusteredDeviceSourcesResolver.class,
127                         new Creator<ClusteredDeviceSourcesResolverImpl>() {
128                             @Override
129                             public ClusteredDeviceSourcesResolverImpl create() throws Exception {
130                                 return new ClusteredDeviceSourcesResolverImpl(topologyId, nodeId, actorSystem, schemaRegistry, sourceRegistrations);
131                             }
132                         }), NetconfTopologyPathCreator.CLUSTERED_DEVICE_SOURCES_RESOLVER);
133
134         final FutureCallback<SchemaContext> schemaContextFuture = new FutureCallback<SchemaContext>() {
135             @Override
136             public void onSuccess(SchemaContext schemaContext) {
137                 LOG.debug("{}: Schema context built successfully.", id);
138
139                 final NetconfDeviceCapabilities deviceCap = sessionPreferences.getNetconfDeviceCapabilities();
140                 final Set<AvailableCapability> providedSourcesQnames = Sets.newHashSet();
141                 final Set<AvailableCapability> providedSourcesNonModuleCaps = Sets.newHashSet();
142                 for(ModuleIdentifier id : schemaContext.getAllModuleIdentifiers()) {
143                     providedSourcesQnames.add(new AvailableCapabilityBuilder()
144                             .setCapability(QName.create(id.getQNameModule(), id.getName()).toString()).build());
145                 }
146                 sessionPreferences.getNonModuleCaps().forEach(e -> providedSourcesNonModuleCaps.add(new AvailableCapabilityBuilder()
147                         .setCapability(e).build()));
148                 deviceCap.addNonModuleBasedCapabilities(providedSourcesNonModuleCaps);
149                 deviceCap.addCapabilities(providedSourcesQnames);
150
151                 ClusteredNetconfDevice.super.handleSalInitializationSuccess(
152                         schemaContext, sessionPreferences, getDeviceSpecificRpc(schemaContext, listener));
153             }
154
155             @Override
156             public void onFailure(Throwable throwable) {
157                 LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
158                 handleSalInitializationFailure(throwable, listener);
159             }
160         };
161
162         resolver.getResolvedSources().onComplete(
163                 new OnComplete<Set<SourceIdentifier>>() {
164                     @Override
165                     public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
166                         if(throwable != null) {
167                             if(throwable instanceof MasterSourceProviderOnSameNodeException) {
168                                 //do nothing
169                             } else {
170                                 LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
171                                 handleSalInitializationFailure(throwable, listener);
172                             }
173                         } else {
174                             LOG.trace("{}: Trying to build schema context from {}", id, sourceIdentifiers);
175                             Futures.addCallback(schemaContextFactory.createSchemaContext(sourceIdentifiers), schemaContextFuture);
176                         }
177                     }
178                 }, actorSystem.dispatcher());
179     }
180
181     private NetconfDeviceRpc getDeviceSpecificRpc(SchemaContext result, RemoteDeviceCommunicator<NetconfMessage> listener) {
182         return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
183     }
184
185     @Override
186     public void ownershipChanged(EntityOwnershipChange ownershipChange) {
187         LOG.debug("Entity ownership change received {}", ownershipChange);
188         if(ownershipChange.isOwner()) {
189             super.onRemoteSessionUp(sessionPreferences, listener);
190         } else if (ownershipChange.wasOwner()) {
191             slaveSetupSchema();
192         }
193     }
194 }