2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.Timeout;
19 import io.netty.util.TimerTask;
20 import java.util.Collection;
21 import java.util.List;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.Nonnull;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
28 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
29 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceContextChangeListener;
34 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
35 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
37 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
38 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
39 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
41 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
42 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
43 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
44 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
45 import org.opendaylight.openflowplugin.impl.util.MdSalRegistrationUtils;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
53 final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeListener, DeviceContextChangeListener, ExtensionConverterProviderKeeper {
55 private static final Logger LOG = LoggerFactory.getLogger(LifecycleConductorImpl.class);
56 private static final int TICKS_PER_WHEEL = 500;
57 private static final long TICK_DURATION = 10; // 0.5 sec.
59 private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
60 private ExtensionConverterProvider extensionConverterProvider;
61 private DeviceManager deviceManager;
62 private StatisticsManager statisticsManager;
63 private RpcManager rpcManager;
64 private final MessageIntelligenceAgency messageIntelligenceAgency;
65 private ConcurrentHashMap<DeviceInfo, ServiceChangeListener> serviceChangeListeners = new ConcurrentHashMap<>();
66 private NotificationPublishService notificationPublishService;
68 LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency) {
69 this.messageIntelligenceAgency = Preconditions.checkNotNull(messageIntelligenceAgency);
73 public ExtensionConverterProvider getExtensionConverterProvider() {
74 return extensionConverterProvider;
78 public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
79 this.extensionConverterProvider = extensionConverterProvider;
83 public void setSafelyManager(final OFPManager manager){
84 if (manager instanceof RpcManager) {
85 if (rpcManager != null) {
86 LOG.info("RPC manager {} is already defined in conductor. ", manager);
89 this.rpcManager = (RpcManager) manager;
91 if (manager instanceof StatisticsManager) {
92 if (statisticsManager != null) {
93 LOG.info("Statistics manager {} is already defined in conductor. ", manager);
96 this.statisticsManager = (StatisticsManager) manager;
98 if (manager instanceof DeviceManager) {
99 if (deviceManager != null) {
100 LOG.info("Device manager {} is already defined in conductor. ", manager);
103 this.deviceManager = (DeviceManager) manager;
110 public void addOneTimeListenerWhenServicesChangesDone(final ServiceChangeListener manager, final DeviceInfo deviceInfo){
111 LOG.debug("Listener {} for service change for node {} registered.", manager, deviceInfo.getNodeId());
112 serviceChangeListeners.put(deviceInfo, manager);
116 void notifyServiceChangeListeners(final DeviceInfo deviceInfo, final boolean success){
117 if (serviceChangeListeners.size() == 0) {
120 LOG.debug("Notifying registered listeners for service change, no. of listeners {}", serviceChangeListeners.size());
121 for (final Map.Entry<DeviceInfo, ServiceChangeListener> nodeIdServiceChangeListenerEntry : serviceChangeListeners.entrySet()) {
122 if (nodeIdServiceChangeListenerEntry.getKey().equals(deviceInfo)) {
123 LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), deviceInfo, success);
124 nodeIdServiceChangeListenerEntry.getValue().servicesChangeDone(deviceInfo, success);
125 serviceChangeListeners.remove(deviceInfo);
131 public void roleInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
133 LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", deviceInfo);
134 closeConnection(deviceInfo);
136 LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", deviceInfo);
140 public void closeConnection(final DeviceInfo deviceInfo) {
141 LOG.debug("Close connection called for node {}", deviceInfo);
142 final DeviceContext deviceContext = getDeviceContext(deviceInfo);
143 if (null != deviceContext) {
144 deviceManager.notifyDeviceValidListeners(deviceInfo, false);
145 deviceContext.shutdownConnection();
150 public void roleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole newRole) {
152 final DeviceContext deviceContext = Preconditions.checkNotNull(
153 deviceManager.gainContext(deviceInfo),
154 "Something went wrong, device context for nodeId: %s doesn't exists", deviceInfo.getNodeId()
157 final RpcContext rpcContext = Preconditions.checkNotNull(
158 rpcManager.gainContext(deviceInfo),
159 "Something went wrong, rpc context for nodeId: %s doesn't exists", deviceInfo.getNodeId()
162 LOG.info("Role change to {} in role context for node {} was successful.", newRole, deviceInfo);
164 final String logText;
166 if (OfpRole.BECOMEMASTER.equals(newRole)) {
168 fillDeviceFlowRegistry(deviceInfo, deviceContext.getDeviceFlowRegistry());
169 MdSalRegistrationUtils.registerServices(rpcContext, deviceContext, this.extensionConverterProvider);
171 if (rpcContext.isStatisticsRpcEnabled()) {
172 MdSalRegistrationUtils.registerStatCompatibilityServices(
175 notificationPublishService);
179 statisticsManager.stopScheduling(deviceInfo);
181 // Clean device flow registry if we became slave
182 if (OfpRole.BECOMESLAVE.equals(newRole)) {
183 deviceContext.getDeviceFlowRegistry().close();
186 MdSalRegistrationUtils.unregisterServices(rpcContext);
189 final ListenableFuture<Void> onClusterRoleChange = deviceManager.onClusterRoleChange(deviceInfo, newRole);
190 Futures.addCallback(onClusterRoleChange, new FutureCallback<Void>() {
192 public void onSuccess(@Nullable final Void aVoid) {
193 LOG.info("{}ing services for node {} was successful", logText, deviceInfo);
194 if (newRole.equals(OfpRole.BECOMESLAVE)) {
195 notifyServiceChangeListeners(deviceInfo, true);
200 public void onFailure(final Throwable throwable) {
201 LOG.warn("{}ing services for node {} was NOT successful, closing connection", logText, deviceInfo);
202 closeConnection(deviceInfo);
207 private void fillDeviceFlowRegistry(final DeviceInfo deviceInfo, final DeviceFlowRegistry deviceFlowRegistry) {
208 // Fill device flow registry with flows from datastore
209 final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceFlowRegistry.fill();
211 // Start statistics scheduling only after we finished initializing device flow registry
212 Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
214 public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
215 if (LOG.isDebugEnabled()) {
216 // Count all flows we read from datastore for debugging purposes.
217 // This number do not always represent how many flows were actually added
218 // to DeviceFlowRegistry, because of possible duplicates.
219 long flowCount = Optional.fromNullable(result).asSet().stream()
220 .flatMap(Collection::stream)
221 .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
222 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
223 .flatMap(table -> table.getFlow().stream())
226 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId());
229 statisticsManager.startScheduling(deviceInfo);
233 public void onFailure(Throwable t) {
234 // If we manually cancelled this future, do not start scheduling of statistics
235 if (deviceFlowRegistryFill.isCancelled()) {
236 LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId());
238 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId(), t);
239 statisticsManager.startScheduling(deviceInfo);
245 public MessageIntelligenceAgency getMessageIntelligenceAgency() {
246 return messageIntelligenceAgency;
250 public DeviceContext getDeviceContext(DeviceInfo deviceInfo){
251 return deviceManager.gainContext(deviceInfo);
255 public StatisticsContext getStatisticsContext(DeviceInfo deviceInfo){
256 return statisticsManager.gainContext(deviceInfo);
259 public Timeout newTimeout(@Nonnull TimerTask task, long delay, @Nonnull TimeUnit unit) {
260 return hashedWheelTimer.newTimeout(task, delay, unit);
264 public ConnectionContext.CONNECTION_STATE gainConnectionStateSafely(final DeviceInfo deviceInfo){
265 return (null != getDeviceContext(deviceInfo)) ? getDeviceContext(deviceInfo).getPrimaryConnectionContext().getConnectionState() : null;
269 public Long reserveXidForDeviceMessage(final DeviceInfo deviceInfo){
270 return null != getDeviceContext(deviceInfo) ? getDeviceContext(deviceInfo).reserveXidForDeviceMessage() : null;
274 public void deviceStartInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
276 LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo);
277 closeConnection(deviceInfo);
279 LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", deviceInfo);
284 public void deviceInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
286 LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo);
287 closeConnection(deviceInfo);
289 LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", deviceInfo);
294 boolean isServiceChangeListenersEmpty() {
295 return this.serviceChangeListeners.isEmpty();
299 public NotificationPublishService getNotificationPublishService() {
300 return notificationPublishService;
304 public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
305 this.notificationPublishService = notificationPublishService;