2 * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.impl.lifecycle;
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Function;
12 import com.google.common.base.Verify;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import io.netty.util.HashedWheelTimer;
16 import io.netty.util.Timeout;
17 import io.netty.util.TimerTask;
18 import io.netty.util.internal.ConcurrentSet;
19 import java.util.ArrayList;
20 import java.util.List;
22 import java.util.Objects;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.TimeoutException;
27 import javax.annotation.Nonnull;
28 import javax.annotation.Nullable;
29 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
30 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
31 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
34 import org.opendaylight.openflowplugin.api.openflow.OFPManager;
35 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
36 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus;
37 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
38 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
39 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
40 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
41 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
42 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
43 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
44 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
45 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
46 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
47 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
48 import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
50 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 public class ContextChainHolderImpl implements ContextChainHolder {
56 private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class);
58 private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}";
59 private static final long DEFAULT_CHECK_ROLE_MASTER = 10000L;
60 private static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
61 private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
63 private final ConcurrentHashMap<DeviceInfo, ContextChain> contextChainMap = new ConcurrentHashMap<>();
64 private final ConcurrentHashMap<DeviceInfo, ContextChain> withoutRoleChains = new ConcurrentHashMap<>();
65 private final List<DeviceInfo> markToBeRemoved = new ArrayList<>();
66 private final HashedWheelTimer timer;
67 private final Long checkRoleMaster;
69 private DeviceManager deviceManager;
70 private RpcManager rpcManager;
71 private StatisticsManager statisticsManager;
72 private EntityOwnershipListenerRegistration eosListenerRegistration;
73 private ClusterSingletonServiceProvider singletonServicesProvider;
74 private boolean timerIsRunningRole;
76 public ContextChainHolderImpl(final HashedWheelTimer timer) {
77 this.timerIsRunningRole = false;
79 this.checkRoleMaster = DEFAULT_CHECK_ROLE_MASTER;
83 public <T extends OFPManager> void addManager(final T manager) {
84 if (Objects.isNull(deviceManager) && manager instanceof DeviceManager) {
85 LOG.debug("Device manager was set.");
86 deviceManager = (DeviceManager) manager;
87 } else if (Objects.isNull(rpcManager) && manager instanceof RpcManager) {
88 LOG.debug("RPC manager was set.");
89 rpcManager = (RpcManager) manager;
90 } else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) {
91 LOG.debug("Statistics manager was set.");
92 statisticsManager = (StatisticsManager) manager;
97 public ContextChain createContextChain(final ConnectionContext connectionContext) {
99 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
100 final String deviceInfoLOGValue = deviceInfo.getLOGValue();
102 if (LOG.isDebugEnabled()) {
103 LOG.debug("Creating a new chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
106 final ContextChain contextChain = new ContextChainImpl(connectionContext);
107 final LifecycleService lifecycleService = new LifecycleServiceImpl(this);
108 lifecycleService.registerDeviceRemovedHandler(deviceManager);
109 lifecycleService.registerDeviceRemovedHandler(rpcManager);
110 lifecycleService.registerDeviceRemovedHandler(statisticsManager);
112 if (LOG.isDebugEnabled()) {
113 LOG.debug("Lifecycle services" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
116 final DeviceContext deviceContext = deviceManager.createContext(connectionContext);
118 if (LOG.isDebugEnabled()) {
119 LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
122 final RpcContext rpcContext = rpcManager.createContext(connectionContext.getDeviceInfo(), deviceContext);
124 if (LOG.isDebugEnabled()) {
125 LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
128 final StatisticsContext statisticsContext
129 = statisticsManager.createContext(deviceContext);
131 if (LOG.isDebugEnabled()) {
132 LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue);
135 deviceContext.setLifecycleInitializationPhaseHandler(statisticsContext);
136 statisticsContext.setLifecycleInitializationPhaseHandler(rpcContext);
137 statisticsContext.setInitialSubmitHandler(deviceContext);
139 contextChain.addLifecycleService(lifecycleService);
140 contextChain.addContext(deviceContext);
141 contextChain.addContext(rpcContext);
142 contextChain.addContext(statisticsContext);
143 this.withoutRoleChains.put(deviceInfo, contextChain);
144 if (!this.timerIsRunningRole) {
145 this.startTimerRole();
147 deviceContext.onPublished();
148 contextChain.registerServices(this.singletonServicesProvider);
154 public ListenableFuture<Void> destroyContextChain(final DeviceInfo deviceInfo) {
155 ContextChain chain = contextChainMap.remove(deviceInfo);
159 if (markToBeRemoved.contains(deviceInfo)) {
160 markToBeRemoved.remove(deviceInfo);
161 LOG.info("Removing device: {} from DS", deviceInfo.getLOGValue());
162 return deviceManager.removeDeviceFromOperationalDS(deviceInfo);
164 return Futures.immediateFuture(null);
169 public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception {
171 DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
172 LOG.info("Device {} connected.", deviceInfo.getLOGValue());
173 ContextChain contextChain = contextChainMap.get(deviceInfo);
174 if (contextChain != null) {
175 if (contextChain.addAuxiliaryConnection(connectionContext)) {
176 LOG.info("An auxiliary connection was added to device: {}", deviceInfo.getLOGValue());
177 return ConnectionStatus.MAY_CONTINUE;
179 LOG.warn("Device {} already connected. Closing all connection to the device.", deviceInfo.getLOGValue());
180 destroyContextChain(deviceInfo);
181 return ConnectionStatus.ALREADY_CONNECTED;
184 if (LOG.isDebugEnabled()) {
185 LOG.debug("No context chain found for device: {}, creating new.", deviceInfo.getLOGValue());
187 contextChainMap.put(deviceInfo, createContextChain(connectionContext));
190 return ConnectionStatus.MAY_CONTINUE;
194 public void addSingletonServicesProvider(final ClusterSingletonServiceProvider singletonServicesProvider) {
195 this.singletonServicesProvider = singletonServicesProvider;
199 public void onNotAbleToStartMastership(final DeviceInfo deviceInfo, @Nonnull final String reason) {
200 this.withoutRoleChains.remove(deviceInfo);
201 LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo.getLOGValue(), reason);
202 if (contextChainMap.containsKey(deviceInfo)) {
203 destroyContextChain(deviceInfo);
208 public void onMasterRoleAcquired(final DeviceInfo deviceInfo,
209 @Nonnull final ContextChainMastershipState mastershipState) {
210 this.withoutRoleChains.remove(deviceInfo);
211 ContextChain contextChain = contextChainMap.get(deviceInfo);
212 if (contextChain != null) {
213 if (contextChain.isMastered(mastershipState)) {
214 LOG.info("Role MASTER was granted to device {}", deviceInfo.getLOGValue());
215 this.sendNotificationNodeAdded(deviceInfo);
221 public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) {
222 this.withoutRoleChains.remove(deviceInfo);
223 ContextChain contextChain = contextChainMap.get(deviceInfo);
224 if (contextChain != null) {
225 contextChain.makeContextChainStateSlave();
230 public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) {
231 this.withoutRoleChains.remove(deviceInfo);
232 ContextChain contextChain = contextChainMap.get(deviceInfo);
233 if (contextChain != null) {
234 destroyContextChain(deviceInfo);
239 public void onDeviceDisconnected(final ConnectionContext connectionContext) {
241 final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
242 if (deviceInfo != null) {
243 ContextChain chain = contextChainMap.get(deviceInfo);
245 if (chain.auxiliaryConnectionDropped(connectionContext)) {
246 LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo.getLOGValue());
248 LOG.info("Device {} disconnected.", deviceInfo.getLOGValue());
249 Futures.transform(chain.connectionDropped(), new Function<Void, Object>() {
252 public Object apply(@Nullable Void aVoid) {
253 destroyContextChain(deviceInfo);
263 public void changeEntityOwnershipService(final EntityOwnershipService entityOwnershipService) {
264 if (Objects.nonNull(this.eosListenerRegistration)) {
265 LOG.warn("EOS Listener already registered.");
267 this.eosListenerRegistration = Verify.verifyNotNull(entityOwnershipService.registerListener
268 (ASYNC_SERVICE_ENTITY_TYPE, this));
272 private void startTimerRole() {
273 this.timerIsRunningRole = true;
274 if (LOG.isDebugEnabled()) {
275 LOG.debug("There is a context chain without role, starting timer.");
277 timer.newTimeout(new RoleTimerTask(), this.checkRoleMaster, TimeUnit.MILLISECONDS);
280 private void stopTimerRole() {
281 this.timerIsRunningRole = false;
282 if (LOG.isDebugEnabled()) {
283 LOG.debug("There are no context chains, stopping timer.");
287 private void timerTickRole() {
288 if (!withoutRoleChains.isEmpty()) {
289 this.withoutRoleChains.forEach((deviceInfo, contextChain) -> contextChain.makeDeviceSlave());
290 timer.newTimeout(new RoleTimerTask(), this.checkRoleMaster, TimeUnit.MILLISECONDS);
292 final Set<DeviceInfo> setOfClosedChains = new ConcurrentSet<>();
293 if (!this.contextChainMap.isEmpty()) {
294 this.contextChainMap.forEach((deviceInfo, contextChain) -> {
295 if (!contextChain.hasState()) {
296 LOG.warn("Context chain {} is long time without state. Closing.", deviceInfo);
297 setOfClosedChains.add(deviceInfo);
298 contextChain.close();
301 setOfClosedChains.forEach(this.contextChainMap::remove);
303 if (this.contextChainMap.isEmpty()) {
304 this.stopTimerRole();
306 timer.newTimeout(new RoleTimerTask(), this.checkRoleMaster, TimeUnit.MILLISECONDS);
312 boolean checkAllManagers() {
313 return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager);
317 public void close() throws Exception {
318 this.contextChainMap.forEach((deviceInfo, contextChain) -> {
319 if (contextChain.isMastered(ContextChainMastershipState.CHECK)) {
320 contextChain.stopChain(true);
322 contextChain.close();
324 if (Objects.nonNull(eosListenerRegistration)) {
325 eosListenerRegistration.close();
330 public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) {
331 if (!entityOwnershipChange.hasOwner()) {
332 final YangInstanceIdentifier yii = entityOwnershipChange.getEntity().getId();
333 final YangInstanceIdentifier.NodeIdentifierWithPredicates niiwp =
334 (YangInstanceIdentifier.NodeIdentifierWithPredicates) yii.getLastPathArgument();
335 String entityName = niiwp.getKeyValues().values().iterator().next().toString();
336 if (LOG.isDebugEnabled()) {
337 LOG.debug("Entity {} has no owner", entityName);
340 if (entityName != null ){
341 final NodeId nodeId = new NodeId(entityName);
342 DeviceInfo inMap = null;
343 for (Map.Entry<DeviceInfo, ContextChain> entry : contextChainMap.entrySet()) {
344 if (entry.getKey().getNodeId().equals(nodeId)) {
345 inMap = entry.getKey();
349 if (Objects.nonNull(inMap)) {
350 markToBeRemoved.add(inMap);
353 LOG.info("Removing device: {} from DS", nodeId);
355 .removeDeviceFromOperationalDS(DeviceStateUtil.createNodeInstanceIdentifier(nodeId))
356 .checkedGet(5L, TimeUnit.SECONDS);
357 } catch (TimeoutException | TransactionCommitFailedException e) {
358 LOG.info("Not able to remove device {} from DS. Probably removed by another cluster node.",
366 private void sendNotificationNodeAdded(final DeviceInfo deviceInfo) {
367 this.deviceManager.sendNodeAddedNotification(deviceInfo);
370 private class RoleTimerTask implements TimerTask {
373 public void run(Timeout timeout) throws Exception {