2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.pipeline;
11 import akka.actor.ActorContext;
12 import akka.actor.ActorSystem;
13 import akka.actor.TypedActor;
14 import akka.actor.TypedProps;
15 import akka.dispatch.OnComplete;
16 import akka.japi.Creator;
17 import com.google.common.base.Optional;
18 import com.google.common.collect.Sets;
19 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures;
22 import java.util.concurrent.ExecutorService;
23 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
24 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
25 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
26 import org.opendaylight.netconf.api.NetconfMessage;
27 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceCommunicator;
28 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceHandler;
29 import org.opendaylight.netconf.sal.connect.netconf.NetconfDevice;
30 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCapabilities;
31 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfDeviceCommunicator;
32 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
33 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceRpc;
34 import org.opendaylight.netconf.sal.connect.netconf.schema.mapping.NetconfMessageTransformer;
35 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
36 import org.opendaylight.yangtools.yang.common.QName;
37 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
38 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
41 import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 public class ClusteredNetconfDevice extends NetconfDevice implements EntityOwnershipListener {
47 private static final Logger LOG = LoggerFactory.getLogger(ClusteredNetconfDevice.class);
49 private boolean isMaster = false;
50 private NetconfDeviceCommunicator listener;
51 private NetconfSessionPreferences sessionPreferences;
52 private SchemaRepository schemaRepo;
53 private final ActorSystem actorSystem;
54 private final String topologyId;
55 private final String nodeId;
56 private final ActorContext cachedContext;
58 private MasterSourceProvider masterSourceProvider = null;
59 private ClusteredDeviceSourcesResolver resolver = null;
61 public ClusteredNetconfDevice(final SchemaResourcesDTO schemaResourcesDTO, final RemoteDeviceId id, final RemoteDeviceHandler<NetconfSessionPreferences> salFacade,
62 final ExecutorService globalProcessingExecutor, final ActorSystem actorSystem, final String topologyId, final String nodeId,
63 ActorContext cachedContext) {
64 super(schemaResourcesDTO, id, salFacade, globalProcessingExecutor);
65 this.schemaRepo = (SchemaRepository) schemaResourcesDTO.getSchemaRegistry();
66 this.actorSystem = actorSystem;
67 this.topologyId = topologyId;
69 this.cachedContext = cachedContext;
73 public void onRemoteSessionUp(NetconfSessionPreferences remoteSessionCapabilities, NetconfDeviceCommunicator listener) {
74 LOG.warn("Node {} SessionUp, with capabilities {}", nodeId, remoteSessionCapabilities);
75 this.listener = listener;
76 this.sessionPreferences = remoteSessionCapabilities;
82 protected void handleSalInitializationSuccess(SchemaContext result, NetconfSessionPreferences remoteSessionCapabilities, DOMRpcService deviceRpc) {
83 super.handleSalInitializationSuccess(result, remoteSessionCapabilities, deviceRpc);
85 final Set<SourceIdentifier> sourceIds = Sets.newHashSet();
86 for(ModuleIdentifier id : result.getAllModuleIdentifiers()) {
87 sourceIds.add(SourceIdentifier.create(id.getName(), (SimpleDateFormatUtil.DEFAULT_DATE_REV == id.getRevision() ? Optional.<String>absent() :
88 Optional.of(SimpleDateFormatUtil.getRevisionFormat().format(id.getRevision())))));
91 //TODO extract string constant to util class
92 LOG.debug("Creating master source provider");
93 masterSourceProvider = TypedActor.get(cachedContext).typedActorOf(
94 new TypedProps<>(MasterSourceProvider.class,
95 new Creator<MasterSourceProviderImpl>() {
97 public MasterSourceProviderImpl create() throws Exception {
98 return new MasterSourceProviderImpl(schemaRepo, sourceIds, actorSystem, topologyId, nodeId);
100 }), "masterSourceProvider");
104 public void onRemoteSessionDown() {
105 super.onRemoteSessionDown();
107 sessionPreferences = null;
108 if (masterSourceProvider != null) {
109 // if we have master the slave that started on this node should be already killed via PoisonPill, so stop master only now
110 LOG.debug("Stopping master source provider for node {}", nodeId);
111 TypedActor.get(actorSystem).stop(masterSourceProvider);
112 masterSourceProvider = null;
114 LOG.debug("Stopping slave source resolver for node {}", nodeId);
115 TypedActor.get(actorSystem).stop(resolver);
120 private void slaveSetupSchema() {
121 //TODO extract string constant to util class
122 resolver = TypedActor.get(cachedContext).typedActorOf(
123 new TypedProps<>(ClusteredDeviceSourcesResolver.class,
124 new Creator<ClusteredDeviceSourcesResolverImpl>() {
126 public ClusteredDeviceSourcesResolverImpl create() throws Exception {
127 return new ClusteredDeviceSourcesResolverImpl(topologyId, nodeId, actorSystem, schemaRegistry, sourceRegistrations);
129 }), "clusteredDeviceSourcesResolver");
132 final FutureCallback<SchemaContext> schemaContextFuture = new FutureCallback<SchemaContext>() {
134 public void onSuccess(SchemaContext schemaContext) {
135 LOG.debug("{}: Schema context built successfully.", id);
137 final NetconfDeviceCapabilities deviceCap = sessionPreferences.getNetconfDeviceCapabilities();
138 final Set<QName> providedSourcesQnames = Sets.newHashSet();
139 for(ModuleIdentifier id : schemaContext.getAllModuleIdentifiers()) {
140 providedSourcesQnames.add(QName.create(id.getQNameModule(), id.getName()));
143 deviceCap.addNonModuleBasedCapabilities(sessionPreferences.getNonModuleCaps());
144 deviceCap.addCapabilities(providedSourcesQnames);
146 ClusteredNetconfDevice.super.handleSalInitializationSuccess(
147 schemaContext, sessionPreferences, getDeviceSpecificRpc(schemaContext, listener));
151 public void onFailure(Throwable throwable) {
152 LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
153 handleSalInitializationFailure(throwable, listener);
157 resolver.getResolvedSources().onComplete(
158 new OnComplete<Set<SourceIdentifier>>() {
160 public void onComplete(Throwable throwable, Set<SourceIdentifier> sourceIdentifiers) throws Throwable {
161 if(throwable != null) {
162 if(throwable instanceof MasterSourceProviderOnSameNodeException) {
165 LOG.warn("{}: Unexpected error resolving device sources: {}", id, throwable);
166 handleSalInitializationFailure(throwable, listener);
169 LOG.trace("{}: Trying to build schema context from {}", id, sourceIdentifiers);
170 Futures.addCallback(schemaContextFactory.createSchemaContext(sourceIdentifiers), schemaContextFuture);
173 }, actorSystem.dispatcher());
176 private NetconfDeviceRpc getDeviceSpecificRpc(SchemaContext result, RemoteDeviceCommunicator<NetconfMessage> listener) {
177 return new NetconfDeviceRpc(result, listener, new NetconfMessageTransformer(result, true));
181 public void ownershipChanged(EntityOwnershipChange ownershipChange) {
182 LOG.debug("Entity ownership change received {}", ownershipChange);
183 if(ownershipChange.isOwner()) {
184 super.onRemoteSessionUp(sessionPreferences, listener);
185 } else if (ownershipChange.wasOwner()) {