Fix DeviceFlowRegistry filling
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / LifecycleConductorImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import io.netty.util.HashedWheelTimer;
18 import io.netty.util.Timeout;
19 import io.netty.util.TimerTask;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.Nonnull;
26 import javax.annotation.Nullable;
27 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
28 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
29 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
31 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceContextChangeListener;
34 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
35 import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
37 import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
38 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
39 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
40 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
41 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
42 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
43 import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
44 import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
45 import org.opendaylight.openflowplugin.impl.util.MdSalRegistrationUtils;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 /**
52  */
53 final class LifecycleConductorImpl implements LifecycleConductor, RoleChangeListener, DeviceContextChangeListener, ExtensionConverterProviderKeeper {
54
55     private static final Logger LOG = LoggerFactory.getLogger(LifecycleConductorImpl.class);
56     private static final int TICKS_PER_WHEEL = 500;
57     private static final long TICK_DURATION = 10; // 0.5 sec.
58
59     private final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(TICK_DURATION, TimeUnit.MILLISECONDS, TICKS_PER_WHEEL);
60     private ExtensionConverterProvider extensionConverterProvider;
61     private DeviceManager deviceManager;
62     private StatisticsManager statisticsManager;
63     private RpcManager rpcManager;
64     private final MessageIntelligenceAgency messageIntelligenceAgency;
65     private ConcurrentHashMap<DeviceInfo, ServiceChangeListener> serviceChangeListeners = new ConcurrentHashMap<>();
66     private NotificationPublishService notificationPublishService;
67
68     LifecycleConductorImpl(final MessageIntelligenceAgency messageIntelligenceAgency) {
69         this.messageIntelligenceAgency = Preconditions.checkNotNull(messageIntelligenceAgency);
70     }
71
72     @Override
73     public ExtensionConverterProvider getExtensionConverterProvider() {
74         return extensionConverterProvider;
75     }
76
77     @Override
78     public void setExtensionConverterProvider(ExtensionConverterProvider extensionConverterProvider) {
79         this.extensionConverterProvider = extensionConverterProvider;
80     }
81
82     @Override
83     public void setSafelyManager(final OFPManager manager){
84         if (manager instanceof RpcManager) {
85             if (rpcManager != null) {
86                 LOG.info("RPC manager {} is already defined in conductor. ", manager);
87                 return;
88             }
89             this.rpcManager = (RpcManager) manager;
90         } else {
91             if (manager instanceof StatisticsManager) {
92                 if (statisticsManager != null) {
93                     LOG.info("Statistics manager {} is already defined in conductor. ", manager);
94                     return;
95                 }
96                 this.statisticsManager = (StatisticsManager) manager;
97             } else {
98                 if (manager instanceof DeviceManager) {
99                     if (deviceManager != null) {
100                         LOG.info("Device manager {} is already defined in conductor. ", manager);
101                         return;
102                     }
103                     this.deviceManager = (DeviceManager) manager;
104                 }
105             }
106         }
107     }
108
109     @Override
110     public void addOneTimeListenerWhenServicesChangesDone(final ServiceChangeListener manager, final DeviceInfo deviceInfo){
111         LOG.debug("Listener {} for service change for node {} registered.", manager, deviceInfo.getNodeId());
112         serviceChangeListeners.put(deviceInfo, manager);
113     }
114
115     @VisibleForTesting
116     void notifyServiceChangeListeners(final DeviceInfo deviceInfo, final boolean success){
117         if (serviceChangeListeners.size() == 0) {
118             return;
119         }
120         LOG.debug("Notifying registered listeners for service change, no. of listeners {}", serviceChangeListeners.size());
121         for (final Map.Entry<DeviceInfo, ServiceChangeListener> nodeIdServiceChangeListenerEntry : serviceChangeListeners.entrySet()) {
122             if (nodeIdServiceChangeListenerEntry.getKey().equals(deviceInfo)) {
123                 LOG.debug("Listener {} for service change for node {} was notified. Success was set on {}", nodeIdServiceChangeListenerEntry.getValue(), deviceInfo, success);
124                 nodeIdServiceChangeListenerEntry.getValue().servicesChangeDone(deviceInfo, success);
125                 serviceChangeListeners.remove(deviceInfo);
126             }
127         }
128     }
129
130     @Override
131     public void roleInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
132         if (!success) {
133             LOG.warn("Initialization phase for node {} in role context was NOT successful, closing connection.", deviceInfo);
134             closeConnection(deviceInfo);
135         } else {
136             LOG.info("initialization phase for node {} in role context was successful, continuing to next context.", deviceInfo);
137         }
138     }
139
140     public void closeConnection(final DeviceInfo deviceInfo) {
141         LOG.debug("Close connection called for node {}", deviceInfo);
142         final DeviceContext deviceContext = getDeviceContext(deviceInfo);
143         if (null != deviceContext) {
144             deviceManager.notifyDeviceValidListeners(deviceInfo, false);
145             deviceContext.shutdownConnection();
146         }
147     }
148
149     @Override
150     public void roleChangeOnDevice(final DeviceInfo deviceInfo, final OfpRole newRole) {
151
152         final DeviceContext deviceContext = Preconditions.checkNotNull(
153                 deviceManager.gainContext(deviceInfo),
154                 "Something went wrong, device context for nodeId: %s doesn't exists", deviceInfo.getNodeId()
155         );
156
157         final RpcContext rpcContext =  Preconditions.checkNotNull(
158                 rpcManager.gainContext(deviceInfo),
159                 "Something went wrong, rpc context for nodeId: %s doesn't exists", deviceInfo.getNodeId()
160         );
161
162         LOG.info("Role change to {} in role context for node {} was successful.", newRole, deviceInfo);
163
164         final String logText;
165
166         if (OfpRole.BECOMEMASTER.equals(newRole)) {
167             logText = "Start";
168             fillDeviceFlowRegistry(deviceInfo, deviceContext.getDeviceFlowRegistry());
169             MdSalRegistrationUtils.registerServices(rpcContext, deviceContext, this.extensionConverterProvider);
170
171             if (rpcContext.isStatisticsRpcEnabled()) {
172                 MdSalRegistrationUtils.registerStatCompatibilityServices(
173                         rpcContext,
174                         deviceContext,
175                         notificationPublishService);
176             }
177         } else {
178             logText = "Stopp";
179             statisticsManager.stopScheduling(deviceInfo);
180
181             // Clean device flow registry if we became slave
182             if (OfpRole.BECOMESLAVE.equals(newRole)) {
183                 deviceContext.getDeviceFlowRegistry().close();
184             }
185
186             MdSalRegistrationUtils.unregisterServices(rpcContext);
187         }
188
189         final ListenableFuture<Void> onClusterRoleChange = deviceManager.onClusterRoleChange(deviceInfo, newRole);
190         Futures.addCallback(onClusterRoleChange, new FutureCallback<Void>() {
191             @Override
192             public void onSuccess(@Nullable final Void aVoid) {
193                 LOG.info("{}ing services for node {} was successful", logText, deviceInfo);
194                 if (newRole.equals(OfpRole.BECOMESLAVE)) {
195                     notifyServiceChangeListeners(deviceInfo, true);
196                 }
197             }
198
199             @Override
200             public void onFailure(final Throwable throwable) {
201                 LOG.warn("{}ing services for node {} was NOT successful, closing connection", logText, deviceInfo);
202                 closeConnection(deviceInfo);
203             }
204         });
205     }
206
207     private void fillDeviceFlowRegistry(final DeviceInfo deviceInfo, final DeviceFlowRegistry deviceFlowRegistry) {
208         // Fill device flow registry with flows from datastore
209         final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceFlowRegistry.fill();
210
211         // Start statistics scheduling only after we finished initializing device flow registry
212         Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
213             @Override
214             public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
215                 if (LOG.isDebugEnabled()) {
216                     // Count all flows we read from datastore for debugging purposes.
217                     // This number do not always represent how many flows were actually added
218                     // to DeviceFlowRegistry, because of possible duplicates.
219                     long flowCount = Optional.fromNullable(result).asSet().stream()
220                             .flatMap(Collection::stream)
221                             .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
222                             .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
223                             .flatMap(table -> table.getFlow().stream())
224                             .count();
225
226                     LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId());
227                 }
228
229                 statisticsManager.startScheduling(deviceInfo);
230             }
231
232             @Override
233             public void onFailure(Throwable t) {
234                 // If we manually cancelled this future, do not start scheduling of statistics
235                 if (deviceFlowRegistryFill.isCancelled()) {
236                     LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId());
237                 } else {
238                     LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId(), t);
239                     statisticsManager.startScheduling(deviceInfo);
240                 }
241             }
242         });
243     }
244
245     public MessageIntelligenceAgency getMessageIntelligenceAgency() {
246         return messageIntelligenceAgency;
247     }
248
249     @Override
250     public DeviceContext getDeviceContext(DeviceInfo deviceInfo){
251          return deviceManager.gainContext(deviceInfo);
252     }
253
254     @Override
255     public StatisticsContext getStatisticsContext(DeviceInfo deviceInfo){
256         return statisticsManager.gainContext(deviceInfo);
257     }
258
259     public Timeout newTimeout(@Nonnull TimerTask task, long delay, @Nonnull TimeUnit unit) {
260         return hashedWheelTimer.newTimeout(task, delay, unit);
261     }
262
263     @Override
264     public ConnectionContext.CONNECTION_STATE gainConnectionStateSafely(final DeviceInfo deviceInfo){
265         return (null != getDeviceContext(deviceInfo)) ? getDeviceContext(deviceInfo).getPrimaryConnectionContext().getConnectionState() : null;
266     }
267
268     @Override
269     public Long reserveXidForDeviceMessage(final DeviceInfo deviceInfo){
270         return null != getDeviceContext(deviceInfo) ? getDeviceContext(deviceInfo).reserveXidForDeviceMessage() : null;
271     }
272
273     @Override
274     public void deviceStartInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
275         if (!success) {
276             LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo);
277             closeConnection(deviceInfo);
278         } else {
279             LOG.info("initialization phase for node {} in device context was successful. Continuing to next context.", deviceInfo);
280         }
281     }
282
283     @Override
284     public void deviceInitializationDone(final DeviceInfo deviceInfo, final boolean success) {
285         if (!success) {
286             LOG.warn("Initialization phase for node {} in device context was NOT successful, closing connection.", deviceInfo);
287             closeConnection(deviceInfo);
288         } else {
289             LOG.info("initialization phase for node {} in device context was successful. All phases initialized OK.", deviceInfo);
290         }
291     }
292
293     @VisibleForTesting
294     boolean isServiceChangeListenersEmpty() {
295         return this.serviceChangeListeners.isEmpty();
296     }
297
298     @Override
299     public NotificationPublishService getNotificationPublishService() {
300         return notificationPublishService;
301     }
302
303     @Override
304     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
305         this.notificationPublishService = notificationPublishService;
306     }
307 }