2 * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.lifecycle;
10 import static org.opendaylight.openflowplugin.api.openflow.OFPContext.ContextState;
12 import com.google.common.collect.Lists;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import java.util.List;
17 import java.util.Objects;
18 import java.util.concurrent.CopyOnWriteArrayList;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import java.util.stream.Collectors;
22 import javax.annotation.Nonnull;
23 import javax.annotation.Nullable;
24 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
25 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
26 import org.opendaylight.openflowplugin.api.ConnectionException;
27 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
28 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
29 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
30 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
31 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
32 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
34 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
35 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
36 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
37 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
39 import org.opendaylight.yangtools.yang.common.RpcResult;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 public class ContextChainImpl implements ContextChain {
44 private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class);
45 private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false);
46 private final AtomicBoolean initialGathering = new AtomicBoolean(false);
47 private final AtomicBoolean initialSubmitting = new AtomicBoolean(false);
48 private final AtomicBoolean registryFilling = new AtomicBoolean(false);
49 private final AtomicBoolean rpcRegistration = new AtomicBoolean(false);
50 private final List<DeviceRemovedHandler> deviceRemovedHandlers = new CopyOnWriteArrayList<>();
51 private final List<OFPContext> contexts = new CopyOnWriteArrayList<>();
52 private final List<ConnectionContext> auxiliaryConnections = new CopyOnWriteArrayList<>();
53 private final ExecutorService executorService;
54 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
55 private final DeviceInfo deviceInfo;
56 private final ConnectionContext primaryConnection;
57 private AutoCloseable registration;
58 private ContextState state = ContextState.INITIALIZATION;
60 private volatile ContextChainState contextChainState = ContextChainState.UNDEFINED;
62 ContextChainImpl(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher,
63 @Nonnull final ConnectionContext connectionContext,
64 @Nonnull final ExecutorService executorService) {
65 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
66 this.primaryConnection = connectionContext;
67 this.deviceInfo = connectionContext.getDeviceInfo();
68 this.executorService = executorService;
72 public <T extends OFPContext> void addContext(@Nonnull final T context) {
73 contexts.add(context);
77 public void instantiateServiceInstance() {
78 LOG.info("Starting clustering services for node {}", deviceInfo);
81 contexts.forEach(this::initializeContextService);
82 LOG.info("Started clustering services for node {}", deviceInfo);
83 } catch (final Exception ex) {
84 executorService.submit(() -> contextChainMastershipWatcher
85 .onNotAbleToStartMastershipMandatory(deviceInfo, ex.getMessage()));
90 public ListenableFuture<Void> closeServiceInstance() {
91 LOG.info("Closing clustering services for node {}", deviceInfo);
92 contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
94 final ListenableFuture<List<Void>> servicesToBeClosed = Futures
95 .successfulAsList(Lists.reverse(contexts)
97 .map(this::closeContextService)
98 .collect(Collectors.toList()));
100 return Futures.transform(servicesToBeClosed, (input) -> {
101 LOG.info("Closed clustering services for node {}", deviceInfo);
108 public ServiceGroupIdentifier getIdentifier() {
109 return deviceInfo.getServiceIdentifier();
113 public void close() {
114 if (ContextState.TERMINATION.equals(state)) {
115 LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo);
119 state = ContextState.TERMINATION;
120 contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
122 // Close all connections to devices
123 auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false));
124 auxiliaryConnections.clear();
125 primaryConnection.closeConnection(true);
127 // Close all contexts (device, statistics, rpc)
128 contexts.forEach(OFPContext::close);
131 // If we are still registered and we are not already closing, then close the registration
132 if (Objects.nonNull(registration)) {
134 LOG.info("Closing clustering services registration for node {}", deviceInfo);
135 registration.close();
137 LOG.info("Closed clustering services registration for node {}", deviceInfo);
138 } catch (final Exception e) {
139 LOG.warn("Failed to close clustering services registration for node {} with exception: ",
144 // We are closing, so cleanup all managers now
145 deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(deviceInfo));
146 deviceRemovedHandlers.clear();
150 public void makeContextChainStateSlave() {
152 changeState(ContextChainState.WORKING_SLAVE);
156 public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
157 LOG.info("Registering clustering services for node {}", deviceInfo);
158 state = ContextState.WORKING;
159 registration = Objects.requireNonNull(clusterSingletonServiceProvider
160 .registerClusterSingletonService(this));
161 LOG.info("Registered clustering services for node {}", deviceInfo);
165 public void makeDeviceSlave() {
169 .filter(DeviceContext.class::isInstance)
170 .map(DeviceContext.class::cast)
172 .ifPresent(deviceContext -> Futures
174 deviceContext.makeDeviceSlave(),
175 new DeviceSlaveCallback(),
180 public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState) {
181 switch (mastershipState) {
183 LOG.debug("Device {}, initial submit OK.", deviceInfo);
184 this.initialSubmitting.set(true);
186 case MASTER_ON_DEVICE:
187 LOG.debug("Device {}, master state OK.", deviceInfo);
188 this.masterStateOnDevice.set(true);
190 case INITIAL_GATHERING:
191 LOG.debug("Device {}, initial gathering OK.", deviceInfo);
192 this.initialGathering.set(true);
194 case RPC_REGISTRATION:
195 LOG.debug("Device {}, RPC registration OK.", deviceInfo);
196 this.rpcRegistration.set(true);
198 case INITIAL_FLOW_REGISTRY_FILL:
199 // Flow registry fill is not mandatory to work as a master
200 LOG.debug("Device {}, initial registry filling OK.", deviceInfo);
201 this.registryFilling.set(true);
206 final boolean result = initialGathering.get() &&
207 masterStateOnDevice.get() &&
208 initialSubmitting.get() &&
209 rpcRegistration.get();
211 if (result && mastershipState != ContextChainMastershipState.CHECK) {
212 LOG.info("Device {} is able to work as master{}",
214 registryFilling.get() ? "." : " WITHOUT flow registry !!!");
215 changeState(ContextChainState.WORKING_MASTER);
222 public boolean isClosing() {
223 return ContextState.TERMINATION.equals(state);
227 public boolean isPrepared() {
228 return this.initialGathering.get() &&
229 this.masterStateOnDevice.get() &&
230 this.rpcRegistration.get();
234 public boolean continueInitializationAfterReconciliation() {
235 return contexts.stream()
236 .filter(StatisticsContext.class::isInstance)
237 .map(StatisticsContext.class::cast)
239 .map(StatisticsContext::initialSubmitAfterReconciliation)
241 isMastered(ContextChainMastershipState.INITIAL_SUBMIT);
245 public boolean addAuxiliaryConnection(@Nonnull ConnectionContext connectionContext) {
246 return (connectionContext.getFeatures().getAuxiliaryId() != 0)
247 && (!ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState()))
248 && auxiliaryConnections.add(connectionContext);
252 public boolean auxiliaryConnectionDropped(@Nonnull ConnectionContext connectionContext) {
253 return auxiliaryConnections.remove(connectionContext);
257 public void registerDeviceRemovedHandler(@Nonnull final DeviceRemovedHandler deviceRemovedHandler) {
258 deviceRemovedHandlers.add(deviceRemovedHandler);
261 private void changeState(final ContextChainState contextChainState) {
262 boolean propagate = this.contextChainState == ContextChainState.UNDEFINED;
263 this.contextChainState = contextChainState;
267 .filter(ContextChainStateListener.class::isInstance)
268 .map(ContextChainStateListener.class::cast)
269 .forEach(listener -> listener.onStateAcquired(contextChainState));
273 private void initializeContextService(final OFPContext context) {
274 if (ConnectionContext.CONNECTION_STATE.WORKING.equals(primaryConnection.getConnectionState())) {
275 context.instantiateServiceInstance();
277 LOG.warn("Device connection for node {} doesn't exist anymore. Primary connection status: {}",
279 primaryConnection.getConnectionState());
283 private ListenableFuture<Void> closeContextService(final OFPContext context) {
284 if (ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState())) {
285 final String errMsg = String
286 .format("Device connection for node %s doesn't exist anymore. Primary connection status: %s",
287 deviceInfo.toString(),
288 primaryConnection.getConnectionState());
290 return Futures.immediateFailedFuture(new ConnectionException(errMsg));
293 return context.closeServiceInstance();
296 private void unMasterMe() {
297 registryFilling.set(false);
298 initialSubmitting.set(false);
299 initialGathering.set(false);
300 masterStateOnDevice.set(false);
301 rpcRegistration.set(false);
304 private final class DeviceSlaveCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
306 public void onSuccess(@Nullable final RpcResult<SetRoleOutput> result) {
307 contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
311 public void onFailure(@Nonnull final Throwable t) {
312 contextChainMastershipWatcher.onSlaveRoleNotAcquired(deviceInfo);