2 * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.lifecycle;
10 import com.google.common.base.Function;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Verify;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Objects;
20 import javax.annotation.Nullable;
21 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
22 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
23 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
24 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
25 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE;
26 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
27 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
28 import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
29 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
30 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
31 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
32 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 public class LifecycleServiceImpl implements LifecycleService {
39 private static final Logger LOG = LoggerFactory.getLogger(LifecycleServiceImpl.class);
40 private DeviceContext deviceContext;
41 private RpcContext rpcContext;
42 private StatisticsContext statContext;
43 private ClusterSingletonServiceRegistration registration;
44 private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
45 private final List<DeviceRemovedHandler> deviceRemovedHandlers = new ArrayList<>();
46 private volatile CONTEXT_STATE state = CONTEXT_STATE.INITIALIZATION;
50 public void instantiateServiceInstance() {
51 LOG.info("Starting clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
53 if (!clusterInitializationPhaseHandler.onContextInstantiateService(null)) {
59 public ListenableFuture<Void> closeServiceInstance() {
60 final boolean connectionInterrupted =
62 .getPrimaryConnectionContext()
64 .equals(ConnectionContext.CONNECTION_STATE.RIP);
66 // Chain all jobs that will stop our services
67 final List<ListenableFuture<Void>> futureList = new ArrayList<>();
68 futureList.add(statContext.stopClusterServices(connectionInterrupted));
69 futureList.add(rpcContext.stopClusterServices(connectionInterrupted));
70 futureList.add(deviceContext.stopClusterServices(connectionInterrupted));
72 return Futures.transform(Futures.successfulAsList(futureList), new Function<List<Void>, Void>() {
75 public Void apply(@Nullable List<Void> input) {
76 LOG.debug("Closed clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
83 public ServiceGroupIdentifier getIdentifier() {
84 return getServiceIdentifier();
88 public CONTEXT_STATE getState() {
93 public ServiceGroupIdentifier getServiceIdentifier() {
94 return deviceContext.getServiceIdentifier();
98 public DeviceInfo getDeviceInfo() {
99 return deviceContext.getDeviceInfo();
103 public void close() {
104 if (CONTEXT_STATE.TERMINATION.equals(getState())){
105 if (LOG.isDebugEnabled()) {
106 LOG.debug("LifecycleService is already in TERMINATION state.");
109 this.state = CONTEXT_STATE.TERMINATION;
111 // We are closing, so cleanup all managers now
112 deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(getDeviceInfo()));
114 // If we are still registered and we are not already closing, then close the registration
115 if (Objects.nonNull(registration)) {
117 LOG.debug("Closing clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
118 registration.close();
119 } catch (Exception e) {
120 LOG.debug("Failed to close clustering MASTER services for node {} with exception: ",
121 getDeviceInfo().getLOGValue(), e);
128 public void registerService(final ClusterSingletonServiceProvider singletonServiceProvider) {
129 LOG.debug("Registered clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
131 // lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
132 this.clusterInitializationPhaseHandler = deviceContext;
133 this.deviceContext.setLifecycleInitializationPhaseHandler(this.statContext);
134 this.statContext.setLifecycleInitializationPhaseHandler(this.rpcContext);
135 this.rpcContext.setLifecycleInitializationPhaseHandler(this);
136 //Set initial submit handler
137 this.statContext.setInitialSubmitHandler(this.deviceContext);
139 // Register cluster singleton service
141 this.registration = Verify.verifyNotNull(singletonServiceProvider.registerClusterSingletonService(this));
142 LOG.info("Registered clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
143 } catch (Exception e) {
144 LOG.warn("Failed to register cluster singleton service for node {}, with exception: {}", getDeviceInfo(), e);
150 public void registerDeviceRemovedHandler(final DeviceRemovedHandler deviceRemovedHandler) {
151 if (!deviceRemovedHandlers.contains(deviceRemovedHandler)) {
152 deviceRemovedHandlers.add(deviceRemovedHandler);
157 public void setDeviceContext(final DeviceContext deviceContext) {
158 this.deviceContext = deviceContext;
162 public void setRpcContext(final RpcContext rpcContext) {
163 this.rpcContext = rpcContext;
167 public void setStatContext(final StatisticsContext statContext) {
168 this.statContext = statContext;
172 public DeviceContext getDeviceContext() {
173 return this.deviceContext;
177 public void closeConnection() {
178 if (LOG.isDebugEnabled()) {
179 LOG.debug("Closing connection for node {}.", getDeviceInfo().getLOGValue());
182 this.deviceContext.shutdownConnection();
185 private void fillDeviceFlowRegistry() {
186 final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceContext.getDeviceFlowRegistry().fill();
187 Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill));
191 public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
192 this.clusterInitializationPhaseHandler = handler;
196 public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
197 if (CONNECTION_STATE.RIP.equals(connectionContext.getConnectionState())) {
198 if (LOG.isDebugEnabled()) {
199 LOG.debug("Connection to the device {} was interrupted.", getDeviceInfo().getLOGValue());
205 fillDeviceFlowRegistry();
209 private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
210 private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
212 DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
213 this.deviceFlowRegistryFill = deviceFlowRegistryFill;
217 public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
218 if (LOG.isDebugEnabled()) {
219 // Count all flows we read from datastore for debugging purposes.
220 // This number do not always represent how many flows were actually added
221 // to DeviceFlowRegistry, because of possible duplicates.
222 long flowCount = Optional.fromNullable(result).asSet().stream()
223 .flatMap(Collection::stream)
224 .filter(Objects::nonNull)
225 .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
226 .filter(Objects::nonNull)
227 .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
228 .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
229 .filter(Objects::nonNull)
230 .filter(table -> Objects.nonNull(table.getFlow()))
231 .flatMap(table -> table.getFlow().stream())
232 .filter(Objects::nonNull)
235 LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, getDeviceInfo().getLOGValue());
240 public void onFailure(Throwable t) {
241 if (deviceFlowRegistryFill.isCancelled()) {
242 if (LOG.isDebugEnabled()) {
243 LOG.debug("Cancelled filling flow registry with flows for node: {}", getDeviceInfo().getLOGValue());
246 LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", getDeviceInfo().getLOGValue(), t);