2 * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.reconciliation.impl;
10 import com.google.common.collect.ImmutableMap;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.ArrayList;
15 import java.util.List;
17 import java.util.Map.Entry;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.ConcurrentSkipListMap;
21 import java.util.concurrent.atomic.AtomicReference;
22 import java.util.stream.Collectors;
23 import javax.annotation.PreDestroy;
24 import javax.inject.Inject;
25 import javax.inject.Singleton;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
28 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeException;
29 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
30 import org.opendaylight.openflowplugin.api.openflow.mastership.ReconciliationFrameworkEvent;
31 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
32 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
33 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
35 import org.opendaylight.yangtools.concepts.Registration;
36 import org.osgi.service.component.annotations.Activate;
37 import org.osgi.service.component.annotations.Component;
38 import org.osgi.service.component.annotations.Deactivate;
39 import org.osgi.service.component.annotations.Reference;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
44 @Component(service = ReconciliationManager.class, immediate = true)
45 public final class ReconciliationManagerImpl
46 implements ReconciliationManager, ReconciliationFrameworkEvent, AutoCloseable {
47 private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManagerImpl.class);
49 private final AtomicReference<ResultState> decidedResultState = new AtomicReference<>(ResultState.DONOTHING);
50 private final ConcurrentMap<Integer, List<ReconciliationNotificationListener>> registeredServices =
51 new ConcurrentSkipListMap<>();
52 private final ConcurrentMap<DeviceInfo, ListenableFuture<ResultState>> futureMap = new ConcurrentHashMap<>();
53 private final ConcurrentMap<ResultState, Integer> resultStateMap = new ConcurrentHashMap<>();
54 private final Registration reg;
58 public ReconciliationManagerImpl(@Reference final MastershipChangeServiceManager mastershipChangeServiceManager)
59 throws MastershipChangeException {
60 reg = mastershipChangeServiceManager.reconciliationFrameworkRegistration(this);
61 LOG.info("ReconciliationManager started");
69 LOG.info("ReconciliationManager stopped");
73 public NotificationRegistration registerService(final ReconciliationNotificationListener reconciliationTask) {
74 LOG.debug("Registered service {} with priority {} and intent {}", reconciliationTask.getName(),
75 reconciliationTask.getPriority(), reconciliationTask.getResultState());
76 registeredServices.computeIfAbsent(reconciliationTask.getPriority(), services -> new ArrayList<>())
77 .add(reconciliationTask);
78 final var registration = new ReconciliationServiceDelegate(() -> {
79 LOG.debug("Service un-registered from Reconciliation framework {}", reconciliationTask.getName());
80 registeredServices.computeIfPresent(reconciliationTask.getPriority(), (priority, services) -> services)
81 .remove(reconciliationTask);
82 decideResultState(reconciliationTask.getResultState());
84 decideResultState(reconciliationTask.getResultState());
88 private void decideResultState(final ResultState resultState) {
89 resultStateMap.compute(resultState, (unused, count) -> count == null ? 1 : count + 1);
91 // FIXME: can we do this more efficiently?
92 Entry<ResultState, Integer> maxEntry = null;
93 for (var entry : resultStateMap.entrySet()) {
94 if (maxEntry == null || entry.getValue() > maxEntry.getValue()) {
98 decidedResultState.set(maxEntry.getKey());
102 public Map<Integer, List<ReconciliationNotificationListener>> getRegisteredServices() {
103 return ImmutableMap.copyOf(registeredServices);
107 public ListenableFuture<ResultState> onDevicePrepared(@NonNull final DeviceInfo node) {
108 LOG.debug("Triggering reconciliation for node : {}", node.getNodeId());
109 return futureMap.computeIfAbsent(node, value -> reconcileNode(node));
113 public ListenableFuture<Void> onDeviceDisconnected(@NonNull final DeviceInfo node) {
114 LOG.info("Stopping reconciliation for node {}", node.getNodeId());
115 return futureMap.containsKey(node) ? cancelNodeReconciliation(node) : Futures.immediateVoidFuture();
118 private ListenableFuture<ResultState> reconcileNode(final DeviceInfo node) {
119 var lastFuture = Futures.<ResultState>immediateFuture(null);
120 for (var services : registeredServices.values()) {
121 lastFuture = reconcileServices(lastFuture, services, node);
126 private ListenableFuture<ResultState> reconcileServices(final ListenableFuture<ResultState> prevFuture,
127 final List<ReconciliationNotificationListener> servicesForPriority, final DeviceInfo node) {
128 return Futures.transformAsync(prevFuture,
129 prevResult -> Futures.transform(Futures.allAsList(servicesForPriority.stream()
130 .map(service -> service.startReconciliation(node)).collect(Collectors.toList())),
131 results -> decidedResultState.get(),
132 MoreExecutors.directExecutor()),
133 MoreExecutors.directExecutor());
136 private ListenableFuture<Void> cancelNodeReconciliation(final DeviceInfo node) {
137 var lastFuture = Futures.immediateVoidFuture();
138 futureMap.get(node).cancel(true);
139 futureMap.remove(node);
140 for (var services : registeredServices.values()) {
141 lastFuture = cancelServiceReconciliation(lastFuture, services, node);
146 private static ListenableFuture<Void> cancelServiceReconciliation(final ListenableFuture<Void> prevFuture,
147 final List<ReconciliationNotificationListener> servicesForPriority, final DeviceInfo node) {
148 return Futures.transformAsync(prevFuture,
149 prevResult -> Futures.transform(Futures.allAsList(servicesForPriority.stream()
150 .map(service -> service.endReconciliation(node))
151 .collect(Collectors.toList())),
153 MoreExecutors.directExecutor()),
154 MoreExecutors.directExecutor());