2 * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.lifecycle;
10 import com.google.common.collect.Lists;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import java.util.List;
14 import java.util.Objects;
15 import java.util.concurrent.CopyOnWriteArrayList;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.atomic.AtomicBoolean;
18 import java.util.concurrent.atomic.AtomicReference;
19 import java.util.stream.Collectors;
20 import javax.annotation.Nonnull;
21 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
22 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
23 import org.opendaylight.openflowplugin.api.openflow.OFPContext;
24 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
25 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
26 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
27 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
28 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
29 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
30 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
31 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
32 import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ReconciliationFrameworkStep;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 public class ContextChainImpl implements ContextChain {
38 private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class);
39 private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog");
41 private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false);
42 private final AtomicBoolean initialGathering = new AtomicBoolean(false);
43 private final AtomicBoolean initialSubmitting = new AtomicBoolean(false);
44 private final AtomicBoolean registryFilling = new AtomicBoolean(false);
45 private final AtomicBoolean rpcRegistration = new AtomicBoolean(false);
46 private final List<DeviceRemovedHandler> deviceRemovedHandlers = new CopyOnWriteArrayList<>();
47 private final List<GuardedContext> contexts = new CopyOnWriteArrayList<>();
48 private final List<ConnectionContext> auxiliaryConnections = new CopyOnWriteArrayList<>();
49 private final ExecutorService executorService;
50 private final ContextChainMastershipWatcher contextChainMastershipWatcher;
51 private final DeviceInfo deviceInfo;
52 private final ConnectionContext primaryConnection;
53 private final AtomicReference<ContextChainState> contextChainState =
54 new AtomicReference<>(ContextChainState.UNDEFINED);
55 private AutoCloseable registration;
57 ContextChainImpl(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher,
58 @Nonnull final ConnectionContext connectionContext,
59 @Nonnull final ExecutorService executorService) {
60 this.contextChainMastershipWatcher = contextChainMastershipWatcher;
61 this.primaryConnection = connectionContext;
62 this.deviceInfo = connectionContext.getDeviceInfo();
63 this.executorService = executorService;
67 public <T extends OFPContext> void addContext(@Nonnull final T context) {
68 contexts.add(new GuardedContextImpl(context));
72 @SuppressWarnings("checkstyle:IllegalCatch")
73 public void instantiateServiceInstance() {
74 OF_EVENT_LOG.debug("Clustering Service Invocation, Node: {}", deviceInfo);
76 contexts.forEach(OFPContext::instantiateServiceInstance);
77 LOG.info("Started clustering services for node {}", deviceInfo);
78 } catch (final Exception ex) {
79 LOG.warn("Not able to start clustering services for node {}", deviceInfo);
80 executorService.execute(() -> contextChainMastershipWatcher
81 .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString()));
86 public ListenableFuture<?> closeServiceInstance() {
88 contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
90 final ListenableFuture<?> servicesToBeClosed = Futures.allAsList(Lists.reverse(contexts).stream()
91 .map(OFPContext::closeServiceInstance)
92 .collect(Collectors.toList()));
94 return Futures.transform(servicesToBeClosed, (input) -> {
95 OF_EVENT_LOG.debug("Closing clustering Services, Node: {}", deviceInfo);
96 LOG.info("Closed clustering services for node {}", deviceInfo);
103 public ServiceGroupIdentifier getIdentifier() {
104 return deviceInfo.getServiceIdentifier();
108 @SuppressWarnings("checkstyle:IllegalCatch")
109 public void close() {
110 if (ContextChainState.CLOSED.equals(contextChainState.get())) {
111 LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo);
115 contextChainState.set(ContextChainState.CLOSED);
118 // Close all connections to devices
119 auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false));
120 auxiliaryConnections.clear();
122 // If we are still registered and we are not already closing, then close the registration
123 if (registration != null) {
125 registration.close();
127 LOG.info("Closed clustering services registration for node {}", deviceInfo);
128 OF_EVENT_LOG.debug("Closed clustering services registration for node {}", deviceInfo);
129 } catch (final Exception e) {
130 LOG.warn("Failed to close clustering services registration for node {} with exception: ",
136 // Close all contexts (device, statistics, rpc)
137 contexts.forEach(OFPContext::close);
140 // We are closing, so cleanup all managers now
141 deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(deviceInfo));
142 deviceRemovedHandlers.clear();
144 primaryConnection.closeConnection(false);
149 public void makeContextChainStateSlave() {
151 changeMastershipState(ContextChainState.WORKING_SLAVE);
155 public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
156 registration = Objects.requireNonNull(clusterSingletonServiceProvider
157 .registerClusterSingletonService(this));
158 LOG.debug("Registered clustering services for node {}", deviceInfo);
159 OF_EVENT_LOG.debug("Registered Clustering Services, Node: {}", deviceInfo);
163 public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState,
164 boolean inReconciliationFrameworkStep) {
165 switch (mastershipState) {
167 LOG.debug("Device {}, initial submit OK.", deviceInfo);
168 OF_EVENT_LOG.debug("Device {}, initial submit OK.", deviceInfo);
169 this.initialSubmitting.set(true);
171 case MASTER_ON_DEVICE:
172 LOG.debug("Device {}, master state OK.", deviceInfo);
173 OF_EVENT_LOG.debug("Device {}, master state OK.", deviceInfo);
174 this.masterStateOnDevice.set(true);
176 case INITIAL_GATHERING:
177 LOG.debug("Device {}, initial gathering OK.", deviceInfo);
178 OF_EVENT_LOG.debug("Device {}, initial gathering OK.", deviceInfo);
179 this.initialGathering.set(true);
181 case RPC_REGISTRATION:
182 LOG.debug("Device {}, RPC registration OK.", deviceInfo);
183 OF_EVENT_LOG.debug("Device {}, RPC registration OK.", deviceInfo);
184 this.rpcRegistration.set(true);
186 case INITIAL_FLOW_REGISTRY_FILL:
187 // Flow registry fill is not mandatory to work as a master
188 LOG.debug("Device {}, initial registry filling OK.", deviceInfo);
189 OF_EVENT_LOG.debug("Device {}, initial registry filling OK.", deviceInfo);
190 this.registryFilling.set(true);
200 final boolean result = initialGathering.get() && masterStateOnDevice.get() && rpcRegistration.get()
201 && inReconciliationFrameworkStep || initialSubmitting.get();
203 if (!inReconciliationFrameworkStep && result && mastershipState != ContextChainMastershipState.CHECK) {
204 LOG.info("Device {} is able to work as master{}", deviceInfo,
205 registryFilling.get() ? "." : " WITHOUT flow registry !!!");
206 changeMastershipState(ContextChainState.WORKING_MASTER);
213 public boolean isClosing() {
214 return ContextChainState.CLOSED.equals(contextChainState.get());
218 public void continueInitializationAfterReconciliation() {
219 contexts.forEach(context -> {
220 if (context.map(ReconciliationFrameworkStep.class::isInstance)) {
221 context.map(ReconciliationFrameworkStep.class::cast).continueInitializationAfterReconciliation();
227 public boolean addAuxiliaryConnection(@Nonnull ConnectionContext connectionContext) {
228 return connectionContext.getFeatures().getAuxiliaryId() != 0
229 && !ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState())
230 && auxiliaryConnections.add(connectionContext);
234 public boolean auxiliaryConnectionDropped(@Nonnull ConnectionContext connectionContext) {
235 return auxiliaryConnections.remove(connectionContext);
239 public void registerDeviceRemovedHandler(@Nonnull final DeviceRemovedHandler deviceRemovedHandler) {
240 deviceRemovedHandlers.add(deviceRemovedHandler);
243 private void changeMastershipState(final ContextChainState newContextChainState) {
244 if (ContextChainState.CLOSED.equals(this.contextChainState.get())) {
248 boolean propagate = ContextChainState.UNDEFINED.equals(this.contextChainState.get());
249 this.contextChainState.set(newContextChainState);
252 contexts.forEach(context -> {
253 if (context.map(ContextChainStateListener.class::isInstance)) {
254 context.map(ContextChainStateListener.class::cast).onStateAcquired(newContextChainState);
260 private void unMasterMe() {
261 registryFilling.set(false);
262 initialSubmitting.set(false);
263 initialGathering.set(false);
264 masterStateOnDevice.set(false);
265 rpcRegistration.set(false);