2 * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.applications.reconciliation.impl;
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.ImmutableMap;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.ArrayList;
17 import java.util.List;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ConcurrentSkipListMap;
21 import java.util.concurrent.atomic.AtomicReference;
22 import java.util.stream.Collectors;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
25 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeException;
26 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
27 import org.opendaylight.openflowplugin.api.openflow.mastership.ReconciliationFrameworkEvent;
28 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
29 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
30 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 public class ReconciliationManagerImpl implements ReconciliationManager, ReconciliationFrameworkEvent {
36 private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManagerImpl.class);
38 private final MastershipChangeServiceManager mastershipChangeServiceManager;
39 private final Map<Integer, List<ReconciliationNotificationListener>> registeredServices =
40 new ConcurrentSkipListMap<>();
41 private final Map<DeviceInfo, ListenableFuture<ResultState>> futureMap = new ConcurrentHashMap<>();
42 private final Map<ResultState, Integer> resultStateMap = new ConcurrentHashMap<>();
43 private final AtomicReference<ResultState> decidedResultState = new AtomicReference<>(ResultState.DONOTHING);
45 public ReconciliationManagerImpl(MastershipChangeServiceManager mastershipChangeServiceManager) {
46 this.mastershipChangeServiceManager = Preconditions
47 .checkNotNull(mastershipChangeServiceManager, "MastershipChangeServiceManager can not be null!");
50 public void start() throws MastershipChangeException {
51 mastershipChangeServiceManager.reconciliationFrameworkRegistration(this);
52 LOG.info("ReconciliationManager has started successfully.");
56 public NotificationRegistration registerService(ReconciliationNotificationListener reconciliationTask) {
57 LOG.debug("Registered service {} with priority {} and intent {}", reconciliationTask.getName(),
58 reconciliationTask.getPriority(), reconciliationTask.getResultState());
59 registeredServices.computeIfAbsent(reconciliationTask.getPriority(), services -> new ArrayList<>())
60 .add(reconciliationTask);
61 ReconciliationServiceDelegate registration = new ReconciliationServiceDelegate(() -> {
62 LOG.debug("Service un-registered from Reconciliation framework {}", reconciliationTask.getName());
63 registeredServices.computeIfPresent(reconciliationTask.getPriority(), (priority, services) -> services)
64 .remove(reconciliationTask);
65 decideResultState(reconciliationTask.getResultState());
67 decideResultState(reconciliationTask.getResultState());
71 private void decideResultState(ResultState resultState) {
72 Integer count = resultStateMap.get(resultState);
73 resultStateMap.put(resultState, count = (count == null ? 1 : count + 1));
74 Map.Entry<ResultState, Integer> maxEntry = null;
75 for (Map.Entry<ResultState, Integer> entry : resultStateMap.entrySet()) {
76 if (maxEntry == null || entry.getValue() > maxEntry.getValue()) {
80 decidedResultState.set(maxEntry.getKey());
84 public Map<Integer, List<ReconciliationNotificationListener>> getRegisteredServices() {
85 ImmutableMap.Builder<Integer, List<ReconciliationNotificationListener>> builder = ImmutableMap.builder();
86 builder.putAll(registeredServices);
87 return builder.build();
91 public void close() throws Exception {
95 public ListenableFuture<ResultState> onDevicePrepared(@Nonnull DeviceInfo node) {
96 LOG.debug("Triggering reconciliation for node : {}", node.getNodeId());
97 return futureMap.computeIfAbsent(node, value -> reconcileNode(node));
101 public ListenableFuture<Void> onDeviceDisconnected(@Nonnull DeviceInfo node) {
102 LOG.info("Stopping reconciliation for node {}", node.getNodeId());
103 if (futureMap.containsKey(node)) {
104 return cancelNodeReconciliation(node);
106 return Futures.immediateFuture(null);
109 private ListenableFuture<ResultState> reconcileNode(DeviceInfo node) {
110 ListenableFuture<ResultState> lastFuture = Futures.immediateFuture(null);
111 for (List<ReconciliationNotificationListener> services : registeredServices.values()) {
112 lastFuture = reconcileServices(lastFuture, services, node);
117 private ListenableFuture<ResultState> reconcileServices(ListenableFuture<ResultState> prevFuture,
118 List<ReconciliationNotificationListener>
121 return Futures.transformAsync(prevFuture, prevResult -> Futures.transform(Futures.allAsList(
122 servicesForPriority.stream().map(service -> service.startReconciliation(node))
123 .collect(Collectors.toList())), results -> decidedResultState.get(),
124 MoreExecutors.directExecutor()),
125 MoreExecutors.directExecutor());
128 private ListenableFuture<Void> cancelNodeReconciliation(DeviceInfo node) {
129 ListenableFuture<Void> lastFuture = Futures.immediateFuture(null);
130 futureMap.get(node).cancel(true);
131 futureMap.remove(node);
132 for (List<ReconciliationNotificationListener> services : registeredServices.values()) {
133 lastFuture = cancelServiceReconciliation(lastFuture, services, node);
138 private ListenableFuture<Void> cancelServiceReconciliation(ListenableFuture<Void> prevFuture,
139 List<ReconciliationNotificationListener>
142 return Futures.transformAsync(prevFuture, prevResult -> Futures.transform(Futures.allAsList(
143 servicesForPriority.stream().map(service -> service.endReconciliation(node))
144 .collect(Collectors.toList())), results -> null, MoreExecutors.directExecutor()),
145 MoreExecutors.directExecutor());