2 * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.lifecycle;
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Verify;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Objects;
20 import javax.annotation.Nullable;
22 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
23 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
24 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
25 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
26 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE;
27 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
29 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
30 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
31 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
32 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
33 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 public class LifecycleServiceImpl implements LifecycleService {
39 private static final Logger LOG = LoggerFactory.getLogger(LifecycleServiceImpl.class);
41 private final List<DeviceRemovedHandler> deviceRemovedHandlers = new ArrayList<>();
42 private volatile CONTEXT_STATE state = CONTEXT_STATE.INITIALIZATION;
43 private DeviceContext deviceContext;
44 private RpcContext rpcContext;
45 private StatisticsContext statContext;
46 private ClusterSingletonServiceRegistration registration;
47 private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
51 public void instantiateServiceInstance() {
52 LOG.info("Starting clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
54 if (!clusterInitializationPhaseHandler.onContextInstantiateService(null)) {
60 public ListenableFuture<Void> closeServiceInstance() {
62 // Chain all jobs that will stop our services
63 final List<ListenableFuture<Void>> futureList = new ArrayList<>();
64 futureList.add(statContext.stopClusterServices());
65 futureList.add(rpcContext.stopClusterServices());
66 futureList.add(deviceContext.stopClusterServices());
68 return Futures.transform(Futures.successfulAsList(futureList), new Function<List<Void>, Void>() {
71 public Void apply(@Nullable List<Void> input) {
72 LOG.debug("Closed clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
79 public ServiceGroupIdentifier getIdentifier() {
80 return getServiceIdentifier();
84 public CONTEXT_STATE getState() {
89 public ServiceGroupIdentifier getServiceIdentifier() {
90 return deviceContext.getServiceIdentifier();
94 public DeviceInfo getDeviceInfo() {
95 return deviceContext.getDeviceInfo();
100 if (CONTEXT_STATE.TERMINATION.equals(getState())){
101 if (LOG.isDebugEnabled()) {
102 LOG.debug("LifecycleService is already in TERMINATION state.");
105 this.state = CONTEXT_STATE.TERMINATION;
107 // We are closing, so cleanup all managers now
108 deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(getDeviceInfo()));
110 // If we are still registered and we are not already closing, then close the registration
111 if (Objects.nonNull(registration)) {
113 LOG.debug("Closing clustering singleton services for node {}", getDeviceInfo().getLOGValue());
114 registration.close();
115 } catch (Exception e) {
116 LOG.debug("Failed to close clustering singleton services for node {} with exception: ",
117 getDeviceInfo().getLOGValue(), e);
124 public void registerService(final ClusterSingletonServiceProvider singletonServiceProvider) {
125 LOG.debug("Registered clustering singleton services for node {}", getDeviceInfo().getLOGValue());
127 // lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
128 this.clusterInitializationPhaseHandler = deviceContext;
129 this.deviceContext.setLifecycleInitializationPhaseHandler(this.statContext);
130 this.statContext.setLifecycleInitializationPhaseHandler(this.rpcContext);
131 this.rpcContext.setLifecycleInitializationPhaseHandler(this);
132 //Set initial submit handler
133 this.statContext.setInitialSubmitHandler(this.deviceContext);
135 // Register cluster singleton service
137 this.registration = Verify.verifyNotNull(singletonServiceProvider.registerClusterSingletonService(this));
138 LOG.info("Registered clustering singleton services for node {}", getDeviceInfo().getLOGValue());
139 } catch (Exception e) {
140 LOG.warn("Failed to register cluster singleton service for node {}, with exception: {}", getDeviceInfo(), e);
146 public void registerDeviceRemovedHandler(final DeviceRemovedHandler deviceRemovedHandler) {
147 if (!deviceRemovedHandlers.contains(deviceRemovedHandler)) {
148 deviceRemovedHandlers.add(deviceRemovedHandler);
153 public void setDeviceContext(final DeviceContext deviceContext) {
154 this.deviceContext = deviceContext;
158 public void setRpcContext(final RpcContext rpcContext) {
159 this.rpcContext = rpcContext;
163 public void setStatContext(final StatisticsContext statContext) {
164 this.statContext = statContext;
168 public DeviceContext getDeviceContext() {
169 return this.deviceContext;
173 public void closeConnection() {
174 if (LOG.isDebugEnabled()) {
175 LOG.debug("Closing connection for node {}.", getDeviceInfo().getLOGValue());
178 this.deviceContext.shutdownConnection();
181 private void fillDeviceFlowRegistry() {
182 final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceContext.getDeviceFlowRegistry().fill();
183 Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill));
187 public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
188 this.clusterInitializationPhaseHandler = handler;
192 public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
193 if (CONNECTION_STATE.RIP.equals(connectionContext.getConnectionState())) {
194 if (LOG.isDebugEnabled()) {
195 LOG.debug("Connection to the device {} was interrupted.", getDeviceInfo().getLOGValue());
201 fillDeviceFlowRegistry();
205 private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
206 private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
208 DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
209 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
213 public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
214 if (LOG.isDebugEnabled()) {
215 // Count all flows we read from datastore for debugging purposes.
216 // This number do not always represent how many flows were actually added
217 // to DeviceFlowRegistry, because of possible duplicates.
218 long flowCount = Optional.fromNullable(result).asSet().stream()
219 .flatMap(Collection::stream)
220 .filter(Objects::nonNull)
221 .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
222 .filter(Objects::nonNull)
223 .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
224 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
225 .filter(Objects::nonNull)
226 .filter(table -> Objects.nonNull(table.getFlow()))
227 .flatMap(table -> table.getFlow().stream())
228 .filter(Objects::nonNull)
231 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, getDeviceInfo().getLOGValue());
236 public void onFailure(Throwable t) {
237 if (deviceFlowRegistryFill.isCancelled()) {
238 if (LOG.isDebugEnabled()) {
239 LOG.debug("Cancelled filling flow registry with flows for node: {}", getDeviceInfo().getLOGValue());
242 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", getDeviceInfo().getLOGValue(), t);