Merge "Bug 6110: Fixed bugs in statistics manager due to race condition." into stable...
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / LifecycleServiceImpl.java
1 /**
2  * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.impl.lifecycle;
9
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Verify;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Objects;
20 import javax.annotation.Nullable;
21
22 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
23 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
24 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
25 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
26 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE;
27 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
29 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
30 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
31 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
32 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
33 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 public class LifecycleServiceImpl implements LifecycleService {
39     private static final Logger LOG = LoggerFactory.getLogger(LifecycleServiceImpl.class);
40
41     private final List<DeviceRemovedHandler> deviceRemovedHandlers = new ArrayList<>();
42     private volatile CONTEXT_STATE state = CONTEXT_STATE.INITIALIZATION;
43     private DeviceContext deviceContext;
44     private RpcContext rpcContext;
45     private StatisticsContext statContext;
46     private ClusterSingletonServiceRegistration registration;
47     private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
48
49
50     @Override
51     public void instantiateServiceInstance() {
52         LOG.info("Starting clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
53
54         if (!clusterInitializationPhaseHandler.onContextInstantiateService(null)) {
55             closeConnection();
56         }
57     }
58
59     @Override
60     public ListenableFuture<Void> closeServiceInstance() {
61
62         // Chain all jobs that will stop our services
63         final List<ListenableFuture<Void>> futureList = new ArrayList<>();
64         futureList.add(statContext.stopClusterServices());
65         futureList.add(rpcContext.stopClusterServices());
66         futureList.add(deviceContext.stopClusterServices());
67
68         return Futures.transform(Futures.successfulAsList(futureList), new Function<List<Void>, Void>() {
69             @Nullable
70             @Override
71             public Void apply(@Nullable List<Void> input) {
72                 LOG.debug("Closed clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
73                 return null;
74             }
75         });
76     }
77
78     @Override
79     public ServiceGroupIdentifier getIdentifier() {
80         return getServiceIdentifier();
81     }
82
83     @Override
84     public CONTEXT_STATE getState() {
85         return this.state;
86     }
87
88     @Override
89     public ServiceGroupIdentifier getServiceIdentifier() {
90         return deviceContext.getServiceIdentifier();
91     }
92
93     @Override
94     public DeviceInfo getDeviceInfo() {
95         return deviceContext.getDeviceInfo();
96     }
97
98     @Override
99     public void close() {
100         if (CONTEXT_STATE.TERMINATION.equals(getState())){
101             if (LOG.isDebugEnabled()) {
102                 LOG.debug("LifecycleService is already in TERMINATION state.");
103             }
104         } else {
105             this.state = CONTEXT_STATE.TERMINATION;
106
107             // We are closing, so cleanup all managers now
108             deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(getDeviceInfo()));
109
110             // If we are still registered and we are not already closing, then close the registration
111             if (Objects.nonNull(registration)) {
112                 try {
113                     LOG.debug("Closing clustering singleton services for node {}", getDeviceInfo().getLOGValue());
114                     registration.close();
115                 } catch (Exception e) {
116                     LOG.debug("Failed to close clustering singleton services for node {} with exception: ",
117                             getDeviceInfo().getLOGValue(), e);
118                 }
119             }
120         }
121     }
122
123     @Override
124     public void registerService(final ClusterSingletonServiceProvider singletonServiceProvider) {
125         LOG.debug("Registered clustering singleton services for node {}", getDeviceInfo().getLOGValue());
126
127         // lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
128         this.clusterInitializationPhaseHandler = deviceContext;
129         this.deviceContext.setLifecycleInitializationPhaseHandler(this.statContext);
130         this.statContext.setLifecycleInitializationPhaseHandler(this.rpcContext);
131         this.rpcContext.setLifecycleInitializationPhaseHandler(this);
132         //Set initial submit handler
133         this.statContext.setInitialSubmitHandler(this.deviceContext);
134
135         // Register cluster singleton service
136         try {
137             this.registration = Verify.verifyNotNull(singletonServiceProvider.registerClusterSingletonService(this));
138             LOG.info("Registered clustering singleton services for node {}", getDeviceInfo().getLOGValue());
139         } catch (Exception e) {
140             LOG.warn("Failed to register cluster singleton service for node {}, with exception: {}", getDeviceInfo(), e);
141             closeConnection();
142         }
143     }
144
145     @Override
146     public void registerDeviceRemovedHandler(final DeviceRemovedHandler deviceRemovedHandler) {
147         if (!deviceRemovedHandlers.contains(deviceRemovedHandler)) {
148             deviceRemovedHandlers.add(deviceRemovedHandler);
149         }
150     }
151
152     @Override
153     public void setDeviceContext(final DeviceContext deviceContext) {
154         this.deviceContext = deviceContext;
155     }
156
157     @Override
158     public void setRpcContext(final RpcContext rpcContext) {
159         this.rpcContext = rpcContext;
160     }
161
162     @Override
163     public void setStatContext(final StatisticsContext statContext) {
164         this.statContext = statContext;
165     }
166
167     @Override
168     public DeviceContext getDeviceContext() {
169         return this.deviceContext;
170     }
171
172     @Override
173     public void closeConnection() {
174         if (LOG.isDebugEnabled()) {
175             LOG.debug("Closing connection for node {}.", getDeviceInfo().getLOGValue());
176         }
177
178         this.deviceContext.shutdownConnection();
179     }
180
181     private void fillDeviceFlowRegistry() {
182         final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceContext.getDeviceFlowRegistry().fill();
183         Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill));
184     }
185
186     @Override
187     public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
188         this.clusterInitializationPhaseHandler = handler;
189     }
190
191     @Override
192     public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
193         if (CONNECTION_STATE.RIP.equals(connectionContext.getConnectionState())) {
194             if (LOG.isDebugEnabled()) {
195                 LOG.debug("Connection to the device {} was interrupted.", getDeviceInfo().getLOGValue());
196             }
197
198             return false;
199         }
200
201         fillDeviceFlowRegistry();
202         return true;
203     }
204
205     private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
206         private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
207
208         DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
209             this.deviceFlowRegistryFill = deviceFlowRegistryFill;
210         }
211
212         @Override
213         public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
214             if (LOG.isDebugEnabled()) {
215                 // Count all flows we read from datastore for debugging purposes.
216                 // This number do not always represent how many flows were actually added
217                 // to DeviceFlowRegistry, because of possible duplicates.
218                 long flowCount = Optional.fromNullable(result).asSet().stream()
219                         .flatMap(Collection::stream)
220                         .filter(Objects::nonNull)
221                         .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
222                         .filter(Objects::nonNull)
223                         .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
224                         .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
225                         .filter(Objects::nonNull)
226                         .filter(table -> Objects.nonNull(table.getFlow()))
227                         .flatMap(table -> table.getFlow().stream())
228                         .filter(Objects::nonNull)
229                         .count();
230
231                 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, getDeviceInfo().getLOGValue());
232             }
233         }
234
235         @Override
236         public void onFailure(Throwable t) {
237             if (deviceFlowRegistryFill.isCancelled()) {
238                 if (LOG.isDebugEnabled()) {
239                     LOG.debug("Cancelled filling flow registry with flows for node: {}", getDeviceInfo().getLOGValue());
240                 }
241             } else {
242                 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", getDeviceInfo().getLOGValue(), t);
243             }
244         }
245     }
246 }