2 * Copyright (c) 2011,2012 Big Switch Networks, Inc.
4 * Licensed under the Eclipse Public License, Version 1.0 (the
5 * "License"); you may not use this file except in compliance with the
6 * License. You may obtain a copy of the License at
8 * http://www.eclipse.org/legal/epl-v10.html
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 * implied. See the License for the specific language governing
14 * permissions and limitations under the License.
16 * This file incorporates work covered by the following copyright and
19 * Originally created by David Erickson, Stanford University
21 * Licensed under the Apache License, Version 2.0 (the "License");
22 * you may not use this file except in compliance with the
23 * License. You may obtain a copy of the License at
25 * http://www.apache.org/licenses/LICENSE-2.0
27 * Unless required by applicable law or agreed to in writing,
28 * software distributed under the License is distributed on an "AS
29 * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
30 * express or implied. See the License for the specific language
31 * governing permissions and limitations under the License.
34 package org.opendaylight.controller.hosttracker.internal;
36 import static org.opendaylight.controller.hosttracker.internal.DeviceManagerImpl.DeviceUpdate.Change.ADD;
37 import static org.opendaylight.controller.hosttracker.internal.DeviceManagerImpl.DeviceUpdate.Change.CHANGE;
38 import static org.opendaylight.controller.hosttracker.internal.DeviceManagerImpl.DeviceUpdate.Change.DELETE;
40 import java.net.InetAddress;
41 import java.util.ArrayList;
42 import java.util.Calendar;
43 import java.util.Collection;
44 import java.util.Collections;
45 import java.util.Comparator;
46 import java.util.Date;
47 import java.util.EnumSet;
48 import java.util.HashMap;
49 import java.util.HashSet;
50 import java.util.Iterator;
51 import java.util.LinkedList;
52 import java.util.List;
54 import java.util.Queue;
56 import java.util.concurrent.ConcurrentHashMap;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.Future;
59 import java.util.concurrent.ScheduledExecutorService;
60 import java.util.concurrent.TimeUnit;
62 import org.opendaylight.controller.hosttracker.Entity;
63 import org.opendaylight.controller.hosttracker.IDevice;
64 import org.opendaylight.controller.hosttracker.IDeviceListener;
65 import org.opendaylight.controller.hosttracker.IDeviceService;
66 import org.opendaylight.controller.hosttracker.IEntityClass;
67 import org.opendaylight.controller.hosttracker.IEntityClassListener;
68 import org.opendaylight.controller.hosttracker.IEntityClassifierService;
69 import org.opendaylight.controller.hosttracker.IfIptoHost;
70 import org.opendaylight.controller.hosttracker.IfNewHostNotify;
71 import org.opendaylight.controller.hosttracker.SwitchPort;
72 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
73 import org.opendaylight.controller.sal.core.Edge;
74 import org.opendaylight.controller.sal.core.NodeConnector;
75 import org.opendaylight.controller.sal.core.NodeConnector.NodeConnectorIDType;
76 import org.opendaylight.controller.sal.packet.ARP;
77 import org.opendaylight.controller.sal.packet.Ethernet;
78 import org.opendaylight.controller.sal.packet.IDataPacketService;
79 import org.opendaylight.controller.sal.packet.IListenDataPacket;
80 import org.opendaylight.controller.sal.packet.Packet;
81 import org.opendaylight.controller.sal.packet.PacketResult;
82 import org.opendaylight.controller.sal.packet.RawPacket;
83 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
84 import org.opendaylight.controller.sal.utils.ListenerDispatcher;
85 import org.opendaylight.controller.sal.utils.MultiIterator;
86 import org.opendaylight.controller.sal.utils.SingletonTask;
87 import org.opendaylight.controller.sal.utils.Status;
88 import org.opendaylight.controller.switchmanager.ISwitchManager;
89 import org.opendaylight.controller.topologymanager.ITopologyManager;
90 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
91 import org.slf4j.Logger;
92 import org.slf4j.LoggerFactory;
95 * DeviceManager creates Devices based upon MAC addresses seen in the network.
96 * It tracks any network addresses mapped to the Device, and its location within
101 public class DeviceManagerImpl implements IDeviceService, IEntityClassListener,
102 IListenDataPacket, ITopologyManagerAware, IfIptoHost {
103 protected static Logger logger = LoggerFactory
104 .getLogger(DeviceManagerImpl.class);
106 public static final String MODULE_NAME = "devicemanager";
108 // protected ITopologyService topology;
109 // protected IStorageSourceService storageSource;
110 // protected IRestApiService restApi;
111 // protected IThreadPoolService threadPool;
112 // protected IFlowReconcileService flowReconcileMgr;
113 // protected IFlowReconcileEngineService flowReconcileEngine;
114 // protected IDebugCounterService debugCounters;
115 // private ISyncService syncService;
116 // private IStoreClient<String,DeviceSyncRepresentation> storeClient;
117 // private DeviceSyncManager deviceSyncManager;
119 private ITopologyManager topology;
120 private ISwitchManager switchManager = null;
121 private IDataPacketService dataPacketService = null;
123 public static final String CNT_INCOMING = MODULE_NAME + "-incoming";
124 public static final String CNT_RECONCILE_REQUEST = MODULE_NAME
125 + "-reconcileRequest";
126 public static final String CNT_RECONCILE_NO_SOURCE = MODULE_NAME
127 + "-reconcileNoSourceDevice";
128 public static final String CNT_RECONCILE_NO_DEST = MODULE_NAME
129 + "-reconcileNoDestDevice";
130 public static final String CNT_BROADCAST_SOURCE = MODULE_NAME
131 + "-broadcastSource";
132 public static final String CNT_NO_SOURCE = MODULE_NAME + "-noSourceDevice";
133 public static final String CNT_NO_DEST = MODULE_NAME + "-noDestDevice";
134 public static final String CNT_DHCP_CLIENT_NAME_SNOOPED = MODULE_NAME
135 + "-dhcpClientNameSnooped";
136 public static final String CNT_DEVICE_ON_INTERAL_PORT_NOT_LEARNED = MODULE_NAME
137 + "-deviceOnInternalPortNotLearned";
138 public static final String CNT_PACKET_NOT_ALLOWED = MODULE_NAME
139 + "-packetNotAllowed";
140 public static final String CNT_NEW_DEVICE = MODULE_NAME + "-newDevice";
141 public static final String CNT_PACKET_ON_INTERNAL_PORT_FOR_KNOWN_DEVICE = MODULE_NAME
142 + "-packetOnInternalPortForKnownDevice";
143 public static final String CNT_NEW_ENTITY = MODULE_NAME + "-newEntity";
144 public static final String CNT_DEVICE_CHANGED = MODULE_NAME
146 public static final String CNT_DEVICE_MOVED = MODULE_NAME + "-deviceMoved";
147 public static final String CNT_CLEANUP_ENTITIES_RUNS = MODULE_NAME
148 + "-cleanupEntitiesRuns";
149 public static final String CNT_ENTITY_REMOVED_TIMEOUT = MODULE_NAME
150 + "-entityRemovedTimeout";
151 public static final String CNT_DEVICE_DELETED = MODULE_NAME
153 public static final String CNT_DEVICE_RECLASSIFY_DELETE = MODULE_NAME
154 + "-deviceReclassifyDelete";
155 public static final String CNT_DEVICE_STORED = MODULE_NAME
157 public static final String CNT_DEVICE_STORE_THROTTLED = MODULE_NAME
158 + "-deviceStoreThrottled";
159 public static final String CNT_DEVICE_REMOVED_FROM_STORE = MODULE_NAME
160 + "-deviceRemovedFromStore";
161 public static final String CNT_SYNC_EXCEPTION = MODULE_NAME
163 public static final String CNT_DEVICES_FROM_STORE = MODULE_NAME
164 + "-devicesFromStore";
165 public static final String CNT_CONSOLIDATE_STORE_RUNS = MODULE_NAME
166 + "-consolidateStoreRuns";
167 public static final String CNT_CONSOLIDATE_STORE_DEVICES_REMOVED = MODULE_NAME
168 + "-consolidateStoreDevicesRemoved";
170 static final String DEVICE_SYNC_STORE_NAME = DeviceManagerImpl.class
171 .getCanonicalName() + ".stateStore";
174 * Time interval between writes of entries for the same device to the sync
177 // static final int DEFAULT_SYNC_STORE_WRITE_INTERVAL_MS =
178 // 5*60*1000; // 5 min
179 // private int syncStoreWriteIntervalMs =
180 // DEFAULT_SYNC_STORE_WRITE_INTERVAL_MS;
183 * Time after SLAVE->MASTER until we run the consolidate store code.
185 // static final int DEFAULT_INITIAL_SYNC_STORE_CONSOLIDATE_MS =
186 // 15*1000; // 15 sec
187 // private int initialSyncStoreConsolidateMs =
188 // DEFAULT_INITIAL_SYNC_STORE_CONSOLIDATE_MS;
191 * Time interval between consolidate store runs.
193 // static final int DEFAULT_SYNC_STORE_CONSOLIDATE_INTERVAL_MS =
194 // 75*60*1000; // 75 min
195 // private final int syncStoreConsolidateIntervalMs =
196 // DEFAULT_SYNC_STORE_CONSOLIDATE_INTERVAL_MS;
199 * Time in milliseconds before entities will expire
201 protected static final int ENTITY_TIMEOUT = 60 * 60 * 1000;
204 * Time in seconds between cleaning up old entities/devices
206 protected static final int ENTITY_CLEANUP_INTERVAL = 60 * 60;
209 * This is the master device map that maps device IDs to {@link Device}
212 protected ConcurrentHashMap<Long, Device> deviceMap;
215 * Counter used to generate device keys
217 protected long deviceKeyCounter = 0;
220 * Lock for incrementing the device key counter
222 protected Object deviceKeyLock = new Object();
225 * This is the primary entity index that contains all entities
227 protected DeviceUniqueIndex primaryIndex;
230 * This stores secondary indices over the fields in the devices
232 protected Map<EnumSet<DeviceField>, DeviceIndex> secondaryIndexMap;
235 * This map contains state for each of the {@ref IEntityClass} that exist
237 protected ConcurrentHashMap<String, ClassState> classStateMap;
240 * This is the list of indices we want on a per-class basis
242 protected Set<EnumSet<DeviceField>> perClassIndices;
245 * The entity classifier currently in use
247 protected IEntityClassifierService entityClassifier;
250 * Used to cache state about specific entity classes
252 protected class ClassState {
257 protected DeviceUniqueIndex classIndex;
260 * This stores secondary indices over the fields in the device for the
263 protected Map<EnumSet<DeviceField>, DeviceIndex> secondaryIndexMap;
266 * Allocate a new {@link ClassState} object for the class
269 * the class to use for the state
271 public ClassState(IEntityClass clazz) {
272 EnumSet<DeviceField> keyFields = clazz.getKeyFields();
273 EnumSet<DeviceField> primaryKeyFields = entityClassifier
275 boolean keyFieldsMatchPrimary = primaryKeyFields.equals(keyFields);
277 if (!keyFieldsMatchPrimary)
278 classIndex = new DeviceUniqueIndex(keyFields);
280 secondaryIndexMap = new HashMap<EnumSet<DeviceField>, DeviceIndex>();
281 for (EnumSet<DeviceField> fields : perClassIndices) {
282 secondaryIndexMap.put(fields, new DeviceMultiIndex(fields));
288 * Device manager event listeners reclassifyDeviceListeners are notified
289 * first before reconcileDeviceListeners. This is to make sure devices are
290 * correctly reclassified before reconciliation.
292 protected ListenerDispatcher<String, IDeviceListener> deviceListeners;
295 * Using the IfNewHostNotify to notify listeners of host changes.
297 private Set<IfNewHostNotify> newHostNotify = Collections.synchronizedSet(new HashSet<IfNewHostNotify>());
299 * A device update event to be dispatched
301 protected static class DeviceUpdate {
307 * The affected device
309 protected Device device;
312 * The change that was made
314 protected Change change;
317 * If not added, then this is the list of fields changed
319 protected EnumSet<DeviceField> fieldsChanged;
321 public DeviceUpdate(Device device, Change change,
322 EnumSet<DeviceField> fieldsChanged) {
324 this.device = device;
325 this.change = change;
326 this.fieldsChanged = fieldsChanged;
330 public String toString() {
331 String devIdStr = device.getEntityClass().getName() + "::"
332 + device.getMACAddressString();
333 return "DeviceUpdate [device=" + devIdStr + ", change=" + change
334 + ", fieldsChanged=" + fieldsChanged + "]";
340 * AttachmentPointComparator
342 * Compares two attachment points and returns the latest one. It is assumed
343 * that the two attachment points are in the same L2 domain.
347 protected class AttachmentPointComparator implements
348 Comparator<AttachmentPoint> {
349 public AttachmentPointComparator() {
354 public int compare(AttachmentPoint oldAP, AttachmentPoint newAP) {
355 // First compare based on L2 domain ID;
357 // XXX - missing functionality -- need topology
358 // long oldDomain = topology.getL2DomainId(oldSw);
359 // boolean oldBD = topology.isBroadcastDomainPort(oldSw, oldPort);
361 boolean oldBD = false;
363 // XXX - missing functionality -- need topology
364 // long newDomain = topology.getL2DomainId(newSw);
365 // boolean newBD = topology.isBroadcastDomainPort(newSw, newPort);
367 boolean newBD = false;
369 if (oldDomain < newDomain)
371 else if (oldDomain > newDomain)
374 // Give preference to OFPP_LOCAL always
375 if (!oldAP.getPort().getType().equals(NodeConnectorIDType.SWSTACK)
376 && newAP.getPort().getType()
377 .equals(NodeConnectorIDType.SWSTACK)) {
379 } else if (oldAP.getPort().getType()
380 .equals(NodeConnectorIDType.SWSTACK)
381 && !newAP.getPort().getType()
382 .equals(NodeConnectorIDType.SWSTACK)) {
386 // We expect that the last seen of the new AP is higher than
387 // old AP, if it is not, just reverse and send the negative
389 if (oldAP.getActiveSince() > newAP.getActiveSince())
390 return -compare(newAP, oldAP);
392 long activeOffset = 0;
393 // XXX - missing functionality -- need topology
394 // if (!topology.isConsistent(oldSw, oldPort, newSw, newPort)) {
395 if (!newBD && oldBD) {
398 if (newBD && oldBD) {
399 activeOffset = AttachmentPoint.EXTERNAL_TO_EXTERNAL_TIMEOUT;
400 } else if (newBD && !oldBD) {
401 activeOffset = AttachmentPoint.OPENFLOW_TO_EXTERNAL_TIMEOUT;
405 // // The attachment point is consistent.
406 // activeOffset = AttachmentPoint.CONSISTENT_TIMEOUT;
409 if ((newAP.getActiveSince() > oldAP.getLastSeen() + activeOffset)
410 || (newAP.getLastSeen() > oldAP.getLastSeen()
411 + AttachmentPoint.INACTIVITY_INTERVAL)) {
419 * Comparator for sorting by cluster ID
421 public AttachmentPointComparator apComparator;
424 * Switch ports where attachment points shouldn't be learned
426 private Set<SwitchPort> suppressAPs;
429 * Periodic task to clean up expired entities
431 public SingletonTask entityCleanupTask;
433 // ********************
434 // Dependency injection
435 // ********************
437 void setNewHostNotify(IfNewHostNotify obj){
438 this.newHostNotify.add(obj);
441 void unsetNewHostNotify(IfNewHostNotify obj){
442 this.newHostNotify.remove(obj);
445 void setDataPacketService(IDataPacketService s) {
446 this.dataPacketService = s;
449 void unsetDataPacketService(IDataPacketService s) {
450 if (this.dataPacketService == s) {
451 this.dataPacketService = null;
455 public void setTopologyManager(ITopologyManager s) {
459 public void unsetTopologyManager(ITopologyManager s) {
460 if (this.topology == s) {
461 logger.debug("Topology Manager Service removed!");
462 this.topology = null;
466 private volatile boolean stopped = true;
467 private ScheduledExecutorService ses;
475 public void start() {
476 this.perClassIndices = new HashSet<EnumSet<DeviceField>>();
478 // XXX - TODO need to make it possible to register a non-default
480 entityClassifier = new DefaultEntityClassifier();
481 this.deviceListeners = new ListenerDispatcher<String, IDeviceListener>();
482 this.suppressAPs = Collections
483 .newSetFromMap(new ConcurrentHashMap<SwitchPort, Boolean>());
484 primaryIndex = new DeviceUniqueIndex(entityClassifier.getKeyFields());
485 secondaryIndexMap = new HashMap<EnumSet<DeviceField>, DeviceIndex>();
487 deviceMap = new ConcurrentHashMap<Long, Device>();
488 classStateMap = new ConcurrentHashMap<String, ClassState>();
489 apComparator = new AttachmentPointComparator();
491 addIndex(true, EnumSet.of(DeviceField.IPV4));
493 // floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
494 // floodlightProvider.addHAListener(this.haListenerDelegate);
495 // if (topology != null)
496 // topology.addListener(this);
497 // flowReconcileMgr.addFlowReconcileListener(this);
498 // entityClassifier.addListener(this);
501 // XXX - Should use a common threadpool but this doesn't currently exist
502 ses = Executors.newScheduledThreadPool(1);
503 Runnable ecr = new Runnable() {
508 entityCleanupTask.reschedule(ENTITY_CLEANUP_INTERVAL,
512 entityCleanupTask = new SingletonTask(ses, ecr);
513 entityCleanupTask.reschedule(ENTITY_CLEANUP_INTERVAL, TimeUnit.SECONDS);
516 * XXX Missing functionality if (restApi != null) {
517 * restApi.addRestletRoutable(new DeviceRoutable()); } else {
518 * logger.debug("Could not instantiate REST API"); }
521 registerDeviceManagerDebugCounters();
524 * XXX Missing functionality try {
525 * this.syncService.registerStore(DEVICE_SYNC_STORE_NAME, Scope.LOCAL);
526 * this.storeClient = this.syncService
527 * .getStoreClient(DEVICE_SYNC_STORE_NAME, String.class,
528 * DeviceSyncRepresentation.class); } catch (SyncException e) { throw
529 * new FloodlightModuleException("Error while setting up sync service",
532 * Runnable consolidateStoreRunner = new Runnable() {
534 * @Override public void run() { deviceSyncManager.consolidateStore();
535 * storeConsolidateTask.reschedule(syncStoreConsolidateIntervalMs,
536 * TimeUnit.MILLISECONDS); debugCounters.flushCounters(); } };
537 * storeConsolidateTask = new SingletonTask(ses,
538 * consolidateStoreRunner); if (isMaster)
539 * storeConsolidateTask.reschedule(syncStoreConsolidateIntervalMs,
540 * TimeUnit.MILLISECONDS);
545 * Periodic task to consolidate entries in the store. I.e., delete entries
546 * in the store that are not known to DeviceManager
548 // XXX - Missing functionality
549 // private SingletonTask storeConsolidateTask;
551 // *********************
552 // IDeviceManagerService
553 // *********************
555 void setSwitchManager(ISwitchManager s) {
556 logger.debug("SwitchManager set");
557 this.switchManager = s;
560 void unsetSwitchManager(ISwitchManager s) {
561 if (this.switchManager == s) {
562 logger.debug("SwitchManager removed!");
563 this.switchManager = null;
568 public IDevice getDevice(Long deviceKey) {
569 return deviceMap.get(deviceKey);
573 public IDevice findDevice(long macAddress, Short vlan, Integer ipv4Address,
574 NodeConnector port) throws IllegalArgumentException {
575 if (vlan != null && vlan.shortValue() <= 0)
577 if (ipv4Address != null && ipv4Address == 0)
579 Entity e = new Entity(macAddress, vlan, ipv4Address, port, null);
580 if (!allKeyFieldsPresent(e, entityClassifier.getKeyFields())) {
581 throw new IllegalArgumentException("Not all key fields specified."
582 + " Required fields: " + entityClassifier.getKeyFields());
584 return findDeviceByEntity(e);
588 public IDevice findClassDevice(IEntityClass entityClass, long macAddress,
589 Short vlan, Integer ipv4Address) throws IllegalArgumentException {
590 if (vlan != null && vlan.shortValue() <= 0)
592 if (ipv4Address != null && ipv4Address == 0)
594 Entity e = new Entity(macAddress, vlan, ipv4Address, null, null);
595 if (entityClass == null
596 || !allKeyFieldsPresent(e, entityClass.getKeyFields())) {
597 throw new IllegalArgumentException("Not all key fields and/or "
598 + " no source device specified. Required fields: "
599 + entityClassifier.getKeyFields());
601 return findDestByEntity(entityClass, e);
605 public Collection<? extends IDevice> getAllDevices() {
606 return Collections.unmodifiableCollection(deviceMap.values());
610 public void addIndex(boolean perClass, EnumSet<DeviceField> keyFields) {
612 perClassIndices.add(keyFields);
614 secondaryIndexMap.put(keyFields, new DeviceMultiIndex(keyFields));
619 public Iterator<? extends IDevice> queryDevices(Long macAddress,
620 Short vlan, Integer ipv4Address, NodeConnector port) {
621 DeviceIndex index = null;
622 if (secondaryIndexMap.size() > 0) {
623 EnumSet<DeviceField> keys = getEntityKeys(macAddress, vlan,
625 index = secondaryIndexMap.get(keys);
628 Iterator<Device> deviceIterator = null;
630 // Do a full table scan
631 deviceIterator = deviceMap.values().iterator();
634 Entity entity = new Entity((macAddress == null ? 0 : macAddress),
635 vlan, ipv4Address, port, null);
636 deviceIterator = new DeviceIndexInterator(this,
637 index.queryByEntity(entity));
640 DeviceIterator di = new DeviceIterator(deviceIterator, null,
641 macAddress, vlan, ipv4Address, port);
646 public Iterator<? extends IDevice> queryClassDevices(
647 IEntityClass entityClass, Long macAddress, Short vlan,
648 Integer ipv4Address, NodeConnector port) {
649 ArrayList<Iterator<Device>> iterators = new ArrayList<Iterator<Device>>();
650 ClassState classState = getClassState(entityClass);
652 DeviceIndex index = null;
653 if (classState.secondaryIndexMap.size() > 0) {
654 EnumSet<DeviceField> keys = getEntityKeys(macAddress, vlan,
656 index = classState.secondaryIndexMap.get(keys);
659 Iterator<Device> iter;
661 index = classState.classIndex;
664 return new DeviceIterator(deviceMap.values().iterator(),
665 new IEntityClass[] { entityClass }, macAddress, vlan,
668 // scan the entire class
669 iter = new DeviceIndexInterator(this, index.getAll());
673 Entity entity = new Entity((macAddress == null ? 0 : macAddress),
674 vlan, ipv4Address, port, null);
675 iter = new DeviceIndexInterator(this, index.queryByEntity(entity));
679 return new MultiIterator<Device>(iterators.iterator());
682 protected Iterator<Device> getDeviceIteratorForQuery(Long macAddress,
683 Short vlan, Integer ipv4Address, NodeConnector port) {
684 DeviceIndex index = null;
685 if (secondaryIndexMap.size() > 0) {
686 EnumSet<DeviceField> keys = getEntityKeys(macAddress, vlan,
688 index = secondaryIndexMap.get(keys);
691 Iterator<Device> deviceIterator = null;
693 // Do a full table scan
694 deviceIterator = deviceMap.values().iterator();
697 Entity entity = new Entity((macAddress == null ? 0 : macAddress),
698 vlan, ipv4Address, port, null);
699 deviceIterator = new DeviceIndexInterator(this,
700 index.queryByEntity(entity));
703 DeviceIterator di = new DeviceIterator(deviceIterator, null,
704 macAddress, vlan, ipv4Address, port);
709 public void addListener(IDeviceListener listener) {
710 deviceListeners.addListener("device", listener);
715 public void addSuppressAPs(NodeConnector port) {
716 suppressAPs.add(new SwitchPort(port));
720 public void removeSuppressAPs(NodeConnector port) {
721 suppressAPs.remove(new SwitchPort(port));
725 public Set<SwitchPort> getSuppressAPs() {
726 return Collections.unmodifiableSet(suppressAPs);
729 private void logListeners() {
730 List<IDeviceListener> listeners = deviceListeners.getOrderedListeners();
731 if (listeners != null) {
732 StringBuffer sb = new StringBuffer();
733 sb.append("DeviceListeners: ");
734 for (IDeviceListener l : listeners) {
735 sb.append(l.getName());
738 logger.debug(sb.toString());
743 // IFlowReconcileListener
746 * XXX - Missing functionality
748 * @Override public Command reconcileFlows(ArrayList<OFMatchReconcile>
749 * ofmRcList) { ListIterator<OFMatchReconcile> iter =
750 * ofmRcList.listIterator(); while (iter.hasNext()) { OFMatchReconcile ofm =
753 * // Remove the STOPPed flow. if (Command.STOP == reconcileFlow(ofm)) {
756 * if (ofmRcList.size() > 0) { return Command.CONTINUE; } else { return
759 * protected Command reconcileFlow(OFMatchReconcile ofm) {
760 * debugCounters.updateCounter(CNT_RECONCILE_REQUEST); // Extract source
761 * entity information Entity srcEntity =
762 * getEntityFromFlowMod(ofm.ofmWithSwDpid, true); if (srcEntity == null) {
763 * debugCounters.updateCounter(CNT_RECONCILE_NO_SOURCE); return
766 * // Find the device by source entity Device srcDevice =
767 * findDeviceByEntity(srcEntity); if (srcDevice == null) {
768 * debugCounters.updateCounter(CNT_RECONCILE_NO_SOURCE); return
769 * Command.STOP; } // Store the source device in the context
770 * fcStore.put(ofm.cntx, CONTEXT_SRC_DEVICE, srcDevice);
772 * // Find the device matching the destination from the entity // classes of
773 * the source. Entity dstEntity = getEntityFromFlowMod(ofm.ofmWithSwDpid,
774 * false); Device dstDevice = null; if (dstEntity != null) { dstDevice =
775 * findDestByEntity(srcDevice.getEntityClass(), dstEntity); if (dstDevice !=
776 * null) fcStore.put(ofm.cntx, CONTEXT_DST_DEVICE, dstDevice); else
777 * debugCounters.updateCounter(CNT_RECONCILE_NO_DEST); } else {
778 * debugCounters.updateCounter(CNT_RECONCILE_NO_DEST); } if
779 * (logger.isTraceEnabled()) {
780 * logger.trace("Reconciling flow: match={}, srcEntity={}, srcDev={}, " +
781 * "dstEntity={}, dstDev={}", new Object[] {ofm.ofmWithSwDpid.getOfMatch(),
782 * srcEntity, srcDevice, dstEntity, dstDevice } ); } return
783 * Command.CONTINUE; }
791 public PacketResult receiveDataPacket(RawPacket inPkt) {
792 // XXX - Can this really pass in null? Why would you ever want that?
794 return PacketResult.IGNORED;
797 // throw new Exception("Sample");
798 // } catch (Exception e) {
799 // logger.error("Sample stack trace", e);
802 Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
804 if (formattedPak instanceof Ethernet) {
805 eth = (Ethernet) formattedPak;
807 return PacketResult.IGNORED;
810 // Extract source entity information
811 NodeConnector inPort = inPkt.getIncomingNodeConnector();
812 Entity srcEntity = getSourceEntityFromPacket(eth, inPort);
813 if (srcEntity == null) {
814 // debugCounters.updateCounter(CNT_BROADCAST_SOURCE);
815 return PacketResult.CONSUME;
818 // Learn from ARP packet for special VRRP settings.
819 // In VRRP settings, the source MAC address and sender MAC
820 // addresses can be different. In such cases, we need to learn
821 // the IP to MAC mapping of the VRRP IP address. The source
822 // entity will not have that information. Hence, a separate call
823 // to learn devices in such cases.
824 learnDeviceFromArpResponseData(eth, inPort);
826 // Learn/lookup device information
827 Device srcDevice = learnDeviceByEntity(srcEntity);
828 if (srcDevice == null) {
829 // debugCounters.updateCounter(CNT_NO_SOURCE);
830 return PacketResult.CONSUME;
832 logger.trace("Saw packet from device {}", srcDevice);
834 // // Store the source device in the context
835 // fcStore.put(cntx, CONTEXT_SRC_DEVICE, srcDevice);
837 // // Find the device matching the destination from the entity
838 // // classes of the source.
839 // Entity dstEntity = getDestEntityFromPacket(eth);
840 // Device dstDevice = null;
841 // if (dstEntity != null) {
843 // findDestByEntity(srcDevice.getEntityClass(), dstEntity);
844 // if (dstDevice != null)
845 // fcStore.put(cntx, CONTEXT_DST_DEVICE, dstDevice);
847 // //debugCounters.updateCounter(CNT_NO_DEST);
849 // //debugCounters.updateCounter(CNT_NO_DEST);
852 // if (logger.isTraceEnabled()) {
853 // logger.trace("Received PI: {} on switch {}, port {} *** eth={}" +
854 // " *** srcDev={} *** dstDev={} *** ",
855 // new Object[] { pi, sw.getStringId(), pi.getInPort(), eth,
856 // srcDevice, dstDevice });
859 // snoopDHCPClientName(eth, srcDevice);
861 return PacketResult.KEEP_PROCESSING;
869 * Snoop and record client-provided host name from DHCP requests
874 // private void snoopDHCPClientName(Ethernet eth, Device srcDevice) {
875 // if (! (eth.getPayload() instanceof IPv4) )
877 // IPv4 ipv4 = (IPv4) eth.getPayload();
878 // if (! (ipv4.getPayload() instanceof UDP) )
880 // UDP udp = (UDP) ipv4.getPayload();
881 // if (!(udp.getPayload() instanceof DHCP))
883 // DHCP dhcp = (DHCP) udp.getPayload();
884 // byte opcode = dhcp.getOpCode();
885 // if (opcode == DHCP.OPCODE_REQUEST) {
886 // DHCPOption dhcpOption = dhcp.getOption(
887 // DHCPOptionCode.OptionCode_Hostname);
888 // if (dhcpOption != null) {
889 // debugCounters.updateCounter(CNT_DHCP_CLIENT_NAME_SNOOPED);
890 // srcDevice.dhcpClientName = new String(dhcpOption.getData());
896 * Check whether the given attachment point is valid given the current
903 * @return true if it's a valid attachment point
905 public boolean isValidAttachmentPoint(NodeConnector port) {
906 // XXX - missing functionality -- need topology module
907 // if (topology.isAttachmentPointPort(port) == false)
909 if (topology.isInternal(port))
911 if (!switchManager.isNodeConnectorEnabled(port))
913 if (suppressAPs.contains(new SwitchPort(port)))
920 * Get sender IP address from packet if the packet is either an ARP packet.
926 private int getSrcNwAddr(Ethernet eth, long dlAddr) {
927 if (eth.getPayload() instanceof ARP) {
928 ARP arp = (ARP) eth.getPayload();
929 if ((arp.getProtocolType() == ARP.PROTO_TYPE_IP)
930 && (toLong(arp.getSenderHardwareAddress()) == dlAddr)) {
931 return toIPv4Address(arp.getSenderProtocolAddress());
938 * Parse an entity from an {@link Ethernet} packet.
941 * the packet to parse
943 * the switch on which the packet arrived
945 * the original packetin
946 * @return the entity from the packet
948 protected Entity getSourceEntityFromPacket(Ethernet eth, NodeConnector port) {
949 byte[] dlAddrArr = eth.getSourceMACAddress();
950 long dlAddr = toLong(dlAddrArr);
952 // Ignore broadcast/multicast source
953 if ((dlAddrArr[0] & 0x1) != 0)
956 // XXX missing functionality
958 int nwSrc = getSrcNwAddr(eth, dlAddr);
959 return new Entity(dlAddr, null, ((nwSrc != 0) ? nwSrc : null), port,
964 * Learn device from ARP data in scenarios where the Ethernet source MAC is
965 * different from the sender hardware address in ARP data.
967 protected void learnDeviceFromArpResponseData(Ethernet eth,
968 NodeConnector port) {
970 if (!(eth.getPayload() instanceof ARP))
972 ARP arp = (ARP) eth.getPayload();
974 byte[] dlAddrArr = eth.getSourceMACAddress();
975 long dlAddr = toLong(dlAddrArr);
977 byte[] senderHardwareAddr = arp.getSenderHardwareAddress();
978 long senderAddr = toLong(senderHardwareAddr);
980 if (dlAddr == senderAddr)
983 // Ignore broadcast/multicast source
984 if ((senderHardwareAddr[0] & 0x1) != 0)
987 // short vlan = eth.getVlanID();
988 int nwSrc = toIPv4Address(arp.getSenderProtocolAddress());
990 Entity e = new Entity(senderAddr, null, ((nwSrc != 0) ? nwSrc : null),
993 learnDeviceByEntity(e);
997 * Get a (partial) entity for the destination from the packet.
1002 // protected Entity getDestEntityFromPacket(Ethernet eth) {
1003 // byte[] dlAddrArr = eth.getDestinationMACAddress();
1004 // long dlAddr = Ethernet.toLong(dlAddrArr);
1005 // short vlan = eth.getVlanID();
1008 // // Ignore broadcast/multicast destination
1009 // if ((dlAddrArr[0] & 0x1) != 0)
1012 // if (eth.getPayload() instanceof IPv4) {
1013 // IPv4 ipv4 = (IPv4) eth.getPayload();
1014 // nwDst = ipv4.getDestinationAddress();
1017 // return new Entity(dlAddr,
1018 // ((vlan >= 0) ? vlan : null),
1019 // ((nwDst != 0) ? nwDst : null),
1026 * Parse an entity from an OFMatchWithSwDpid.
1028 * @param ofmWithSwDpid
1029 * @return the entity from the packet
1031 // private Entity getEntityFromFlowMod(OFMatchWithSwDpid ofmWithSwDpid,
1032 // boolean isSource) {
1033 // byte[] dlAddrArr = ofmWithSwDpid.getOfMatch().getDataLayerSource();
1034 // int nwSrc = ofmWithSwDpid.getOfMatch().getNetworkSource();
1036 // dlAddrArr = ofmWithSwDpid.getOfMatch().getDataLayerDestination();
1037 // nwSrc = ofmWithSwDpid.getOfMatch().getNetworkDestination();
1040 // long dlAddr = Ethernet.toLong(dlAddrArr);
1042 // // Ignore broadcast/multicast source
1043 // if ((dlAddrArr[0] & 0x1) != 0)
1046 // Long swDpid = null;
1047 // Short inPort = null;
1050 // swDpid = ofmWithSwDpid.getSwitchDataPathId();
1051 // inPort = ofmWithSwDpid.getOfMatch().getInputPort();
1054 // /**for the new flow cache design, the flow mods retrived are not always
1055 // from the source, learn AP should be disabled --meiyang*/
1056 // boolean learnap = false;
1058 // * if (swDpid == null ||
1059 // inPort == null ||
1060 // !isValidAttachmentPoint(swDpid, inPort)) {
1061 // // If this is an internal port or we otherwise don't want
1062 // // to learn on these ports. In the future, we should
1063 // // handle this case by labeling flows with something that
1064 // // will give us the entity class. For now, we'll do our
1065 // // best assuming attachment point information isn't used
1066 // // as a key field.
1071 // short vlan = ofmWithSwDpid.getOfMatch().getDataLayerVirtualLan();
1072 // return new Entity(dlAddr,
1073 // ((vlan >= 0) ? vlan : null),
1074 // ((nwSrc != 0) ? nwSrc : null),
1075 // (learnap ? swDpid : null),
1076 // (learnap ? (int)inPort : null),
1081 * Look up a {@link Device} based on the provided {@link Entity}. We first
1082 * check the primary index. If we do not find an entry there we classify the
1083 * device into its IEntityClass and query the classIndex. This implies that
1084 * all key field of the current IEntityClassifier must be present in the
1085 * entity for the lookup to succeed!
1088 * the entity to search for
1089 * @return The {@link Device} object if found
1091 protected Device findDeviceByEntity(Entity entity) {
1092 // Look up the fully-qualified entity to see if it already
1093 // exists in the primary entity index.
1094 Long deviceKey = primaryIndex.findByEntity(entity);
1095 IEntityClass entityClass = null;
1097 if (deviceKey == null) {
1098 // If the entity does not exist in the primary entity index,
1099 // use the entity classifier for find the classes for the
1100 // entity. Look up the entity in the returned class'
1101 // class entity index.
1102 entityClass = entityClassifier.classifyEntity(entity);
1103 if (entityClass == null) {
1106 ClassState classState = getClassState(entityClass);
1108 if (classState.classIndex != null) {
1109 deviceKey = classState.classIndex.findByEntity(entity);
1112 if (deviceKey == null)
1114 return deviceMap.get(deviceKey);
1118 * Get a destination device using entity fields that corresponds with the
1119 * given source device. The source device is important since there could be
1120 * ambiguity in the destination device without the attachment point
1124 * the source device's entity class. The returned destination
1125 * will be in the same entity class as the source.
1127 * the entity to look up
1128 * @return an {@link Device} or null if no device is found.
1130 protected Device findDestByEntity(IEntityClass reference, Entity dstEntity) {
1132 // Look up the fully-qualified entity to see if it
1133 // exists in the primary entity index
1134 Long deviceKey = primaryIndex.findByEntity(dstEntity);
1136 if (deviceKey == null) {
1137 // This could happen because:
1138 // 1) no destination known, or a broadcast destination
1139 // 2) if we have attachment point key fields since
1140 // attachment point information isn't available for
1141 // destination devices.
1142 // For the second case, we'll need to match up the
1143 // destination device with the class of the source
1145 ClassState classState = getClassState(reference);
1146 if (classState.classIndex == null) {
1149 deviceKey = classState.classIndex.findByEntity(dstEntity);
1151 if (deviceKey == null)
1153 return deviceMap.get(deviceKey);
1157 * Look up a {@link Device} within a particular entity class based on the
1158 * provided {@link Entity}.
1161 * the entity class to search for the entity
1163 * the entity to search for
1164 * @return The {@link Device} object if found private Device
1165 * findDeviceInClassByEntity(IEntityClass clazz, Entity entity) { //
1166 * XXX - TODO throw new UnsupportedOperationException(); }
1170 * Look up a {@link Device} based on the provided {@link Entity}. Also
1171 * learns based on the new entity, and will update existing devices as
1175 * the {@link Entity}
1176 * @return The {@link Device} object if found
1178 protected Device learnDeviceByEntity(Entity entity) {
1179 logger.info("Primary index {}", primaryIndex);
1180 ArrayList<Long> deleteQueue = null;
1181 LinkedList<DeviceUpdate> deviceUpdates = null;
1182 Device device = null;
1184 // we may need to restart the learning process if we detect
1185 // concurrent modification. Note that we ensure that at least
1186 // one thread should always succeed so we don't get into infinite
1189 deviceUpdates = null;
1191 // Look up the fully-qualified entity to see if it already
1192 // exists in the primary entity index.
1193 Long deviceKey = primaryIndex.findByEntity(entity);
1194 IEntityClass entityClass = null;
1196 if (deviceKey == null) {
1197 // If the entity does not exist in the primary entity index,
1198 // use the entity classifier for find the classes for the
1199 // entity. Look up the entity in the returned class'
1200 // class entity index.
1201 entityClass = entityClassifier.classifyEntity(entity);
1202 if (entityClass == null) {
1203 // could not classify entity. No device
1207 ClassState classState = getClassState(entityClass);
1209 if (classState.classIndex != null) {
1210 deviceKey = classState.classIndex.findByEntity(entity);
1213 if (deviceKey != null) {
1214 // If the primary or secondary index contains the entity
1215 // use resulting device key to look up the device in the
1216 // device map, and use the referenced Device below.
1217 device = deviceMap.get(deviceKey);
1218 if (device == null) {
1219 // This can happen due to concurrent modification
1220 if (logger.isDebugEnabled()) {
1221 logger.debug("No device for deviceKey {} while "
1222 + "while processing entity {}", deviceKey,
1225 // if so, then try again till we don't even get the device
1227 // and so we recreate the device
1231 // If the secondary index does not contain the entity,
1232 // create a new Device object containing the entity, and
1233 // generate a new device ID if the the entity is on an
1234 // attachment point port. Otherwise ignore.
1235 if (entity.hasSwitchPort()
1236 && !isValidAttachmentPoint(entity.getPort())) {
1237 // debugCounters.updateCounter(CNT_DEVICE_ON_INTERAL_PORT_NOT_LEARNED);
1238 if (logger.isDebugEnabled()) {
1239 logger.debug("Not learning new device on internal"
1240 + " link: {}", entity);
1245 // Before we create the new device also check if
1246 // the entity is allowed (e.g., for spoofing protection)
1247 if (!isEntityAllowed(entity, entityClass)) {
1248 // debugCounters.updateCounter(CNT_PACKET_NOT_ALLOWED);
1249 if (logger.isDebugEnabled()) {
1250 logger.debug("PacketIn is not allowed {} {}",
1251 entityClass.getName(), entity);
1256 synchronized (deviceKeyLock) {
1257 deviceKey = Long.valueOf(deviceKeyCounter++);
1259 device = allocateDevice(deviceKey, entity, entityClass);
1261 // Add the new device to the primary map with a simple put
1262 deviceMap.put(deviceKey, device);
1265 if (!updateIndices(device, deviceKey)) {
1266 if (deleteQueue == null)
1267 deleteQueue = new ArrayList<Long>();
1268 deleteQueue.add(deviceKey);
1272 updateSecondaryIndices(entity, entityClass, deviceKey);
1274 // We need to count and log here. If we log earlier we could
1275 // hit a concurrent modification and restart the dev creation
1276 // and potentially count the device twice.
1277 // debugCounters.updateCounter(CNT_NEW_DEVICE);
1278 if (logger.isDebugEnabled()) {
1280 "New device created: {} deviceKey={}, entity={}",
1281 new Object[] { device, deviceKey, entity });
1283 // generate new device update
1284 deviceUpdates = updateUpdates(deviceUpdates, new DeviceUpdate(
1285 device, ADD, null));
1289 // if it gets here, we have a pre-existing Device for this Entity
1290 if (!isEntityAllowed(entity, device.getEntityClass())) {
1291 // debugCounters.updateCounter(CNT_PACKET_NOT_ALLOWED);
1292 if (logger.isDebugEnabled()) {
1293 logger.info("PacketIn is not allowed {} {}", device
1294 .getEntityClass().getName(), entity);
1298 // If this is not an attachment point port we don't learn the new
1300 // and don't update indexes. But we do allow the device to continue
1303 if (entity.hasSwitchPort()
1304 && !isValidAttachmentPoint(entity.getPort())) {
1305 // debugCounters.updateCounter(CNT_PACKET_ON_INTERNAL_PORT_FOR_KNOWN_DEVICE);
1308 int entityindex = -1;
1309 if ((entityindex = device.entityIndex(entity)) >= 0) {
1310 // Entity already exists
1311 // update timestamp on the found entity
1312 Date lastSeen = entity.getLastSeenTimestamp();
1313 if (lastSeen == null) {
1314 lastSeen = new Date();
1315 entity.setLastSeenTimestamp(lastSeen);
1317 device.entities[entityindex].setLastSeenTimestamp(lastSeen);
1318 // we break the loop after checking for changes to the AP
1320 // New entity for this device
1321 // compute the insertion point for the entity.
1322 // see Arrays.binarySearch()
1323 entityindex = -(entityindex + 1);
1324 Device newDevice = allocateDevice(device, entity, entityindex);
1327 EnumSet<DeviceField> changedFields = findChangedFields(device,
1330 // update the device map with a replace call
1331 boolean res = deviceMap.replace(deviceKey, device, newDevice);
1332 // If replace returns false, restart the process from the
1333 // beginning (this implies another thread concurrently
1334 // modified this Device).
1340 if (!updateIndices(device, deviceKey)) {
1343 updateSecondaryIndices(entity, device.getEntityClass(),
1346 // We need to count here after all the possible "continue"
1347 // statements in this branch
1348 // debugCounters.updateCounter(CNT_NEW_ENTITY);
1349 if (changedFields.size() > 0) {
1350 // debugCounters.updateCounter(CNT_DEVICE_CHANGED);
1351 deviceUpdates = updateUpdates(deviceUpdates,
1352 new DeviceUpdate(newDevice, CHANGE, changedFields));
1354 // we break the loop after checking for changed AP
1356 // Update attachment point (will only be hit if the device
1357 // already existed and no concurrent modification)
1358 if (entity.hasSwitchPort()) {
1359 boolean moved = device.updateAttachmentPoint(entity.getPort(),
1360 entity.getLastSeenTimestamp().getTime());
1361 // TODO: use update mechanism instead of sending the
1362 // notification directly
1364 // we count device moved events in
1365 // sendDeviceMovedNotification()
1366 sendDeviceMovedNotification(device);
1367 if (logger.isTraceEnabled()) {
1368 logger.trace("Device moved: attachment points {},"
1369 + "entities {}", device.attachmentPoints,
1373 if (logger.isTraceEnabled()) {
1374 logger.trace("Device attachment point updated: "
1375 + "attachment points {}," + "entities {}",
1376 device.attachmentPoints, device.entities);
1383 if (deleteQueue != null) {
1384 for (Long l : deleteQueue) {
1385 Device dev = deviceMap.get(l);
1386 this.deleteDevice(dev);
1390 processUpdates(deviceUpdates);
1391 // deviceSyncManager.storeDeviceThrottled(device);
1396 protected boolean isEntityAllowed(Entity entity, IEntityClass entityClass) {
1400 protected EnumSet<DeviceField> findChangedFields(Device device,
1402 EnumSet<DeviceField> changedFields = EnumSet.of(DeviceField.IPV4,
1403 DeviceField.VLAN, DeviceField.SWITCHPORT);
1405 if (newEntity.getIpv4Address() == null)
1406 changedFields.remove(DeviceField.IPV4);
1407 if (newEntity.getVlan() == null)
1408 changedFields.remove(DeviceField.VLAN);
1409 if (newEntity.getPort() == null)
1410 changedFields.remove(DeviceField.SWITCHPORT);
1412 if (changedFields.size() == 0)
1413 return changedFields;
1415 for (Entity entity : device.getEntities()) {
1416 if (newEntity.getIpv4Address() == null
1417 || (entity.getIpv4Address() != null && entity
1419 .equals(newEntity.getIpv4Address())))
1420 changedFields.remove(DeviceField.IPV4);
1421 if (newEntity.getVlan() == null
1422 || (entity.getVlan() != null && entity.getVlan().equals(
1423 newEntity.getVlan())))
1424 changedFields.remove(DeviceField.VLAN);
1425 if (newEntity.getPort() == null
1426 || (entity.getPort() != null && entity.getPort().equals(
1427 newEntity.getPort())))
1428 changedFields.remove(DeviceField.SWITCHPORT);
1431 return changedFields;
1435 * Send update notifications to listeners
1438 * the updates to process.
1440 protected void processUpdates(Queue<DeviceUpdate> updates) {
1441 if (updates == null)
1443 DeviceUpdate update = null;
1444 while (null != (update = updates.poll())) {
1445 if (logger.isTraceEnabled()) {
1446 logger.trace("Dispatching device update: {}", update);
1448 // if (update.change == DeviceUpdate.Change.DELETE)
1449 // deviceSyncManager.removeDevice(update.device);
1451 // deviceSyncManager.storeDevice(update.device);
1452 List<IDeviceListener> listeners = deviceListeners
1453 .getOrderedListeners();
1454 notifyListeners(listeners, update);
1458 protected void notifyListeners(List<IDeviceListener> listeners,
1459 DeviceUpdate update) {
1460 if (listeners == null && newHostNotify.isEmpty()) {
1464 * TODO: IfNewHostNotify is needed for current controller API.
1465 * Adding logic so that existing apps (like SimpleForwardingManager)
1466 * work. IDeviceListener adds additional methods and uses IListener's
1467 * callback ordering. The two interfaces need to be merged.
1470 for (IfNewHostNotify notify : newHostNotify){
1471 switch (update.change) {
1473 notify.notifyHTClient(update.device.toHostNodeConnector());
1479 * TODO: Remove this section as IDeviceListener functionality gets
1480 * merged with IfNewHostNotify
1482 for (IDeviceListener listener : listeners) {
1483 switch (update.change) {
1485 listener.deviceAdded(update.device);
1488 listener.deviceRemoved(update.device);
1491 for (DeviceField field : update.fieldsChanged) {
1494 listener.deviceIPV4AddrChanged(update.device);
1497 // listener.deviceMoved(update.device);
1500 listener.deviceVlanChanged(update.device);
1503 logger.debug("Unknown device field changed {}",
1504 update.fieldsChanged.toString());
1514 * Check if the entity e has all the keyFields set. Returns false if not
1519 * the key fields to check e against
1522 protected boolean allKeyFieldsPresent(Entity e,
1523 EnumSet<DeviceField> keyFields) {
1524 for (DeviceField f : keyFields) {
1527 // MAC address is always present
1530 if (e.getIpv4Address() == null)
1534 if (e.getPort() == null)
1538 // FIXME: vlan==null is ambiguous: it can mean: not present
1540 // if (e.vlan == null) return false;
1543 // we should never get here. unless somebody extended
1545 throw new IllegalStateException();
1551 private LinkedList<DeviceUpdate> updateUpdates(
1552 LinkedList<DeviceUpdate> list, DeviceUpdate update) {
1556 list = new LinkedList<DeviceUpdate>();
1563 * Get the secondary index for a class. Will return null if the secondary
1564 * index was created concurrently in another thread.
1567 * the class for the index
1570 private ClassState getClassState(IEntityClass clazz) {
1571 ClassState classState = classStateMap.get(clazz.getName());
1572 if (classState != null)
1575 classState = new ClassState(clazz);
1576 ClassState r = classStateMap.putIfAbsent(clazz.getName(), classState);
1585 * Update both the primary and class indices for the provided device. If the
1586 * update fails because of an concurrent update, will return false.
1589 * the device to update
1591 * the device key for the device
1592 * @return true if the update succeeded, false otherwise.
1594 private boolean updateIndices(Device device, Long deviceKey) {
1595 if (!primaryIndex.updateIndex(device, deviceKey)) {
1598 IEntityClass entityClass = device.getEntityClass();
1599 ClassState classState = getClassState(entityClass);
1601 if (classState.classIndex != null) {
1602 if (!classState.classIndex.updateIndex(device, deviceKey))
1609 * Update the secondary indices for the given entity and associated entity
1613 * the entity to update
1614 * @param entityClass
1615 * the entity class for the entity
1617 * the device key to set up
1619 private void updateSecondaryIndices(Entity entity,
1620 IEntityClass entityClass, Long deviceKey) {
1621 for (DeviceIndex index : secondaryIndexMap.values()) {
1622 index.updateIndex(entity, deviceKey);
1624 ClassState state = getClassState(entityClass);
1625 for (DeviceIndex index : state.secondaryIndexMap.values()) {
1626 index.updateIndex(entity, deviceKey);
1631 * Clean up expired entities/devices
1633 protected void cleanupEntities() {
1634 // debugCounters.updateCounter(CNT_CLEANUP_ENTITIES_RUNS);
1636 Calendar c = Calendar.getInstance();
1637 c.add(Calendar.MILLISECOND, -ENTITY_TIMEOUT);
1638 Date cutoff = c.getTime();
1640 ArrayList<Entity> toRemove = new ArrayList<Entity>();
1641 ArrayList<Entity> toKeep = new ArrayList<Entity>();
1643 Iterator<Device> diter = deviceMap.values().iterator();
1644 LinkedList<DeviceUpdate> deviceUpdates = new LinkedList<DeviceUpdate>();
1646 while (diter.hasNext()) {
1647 Device d = diter.next();
1650 deviceUpdates.clear();
1653 for (Entity e : d.getEntities()) {
1654 if (e.getLastSeenTimestamp() != null
1655 && 0 > e.getLastSeenTimestamp().compareTo(cutoff)) {
1656 // individual entity needs to be removed
1662 if (toRemove.size() == 0) {
1666 // debugCounters.updateCounter(CNT_ENTITY_REMOVED_TIMEOUT);
1667 for (Entity e : toRemove) {
1668 removeEntity(e, d.getEntityClass(), d.getDeviceKey(),
1672 if (toKeep.size() > 0) {
1673 Device newDevice = allocateDevice(d.getDeviceKey(),
1674 d.getDHCPClientName(), d.oldAPs,
1675 d.attachmentPoints, toKeep, d.getEntityClass());
1677 EnumSet<DeviceField> changedFields = EnumSet
1678 .noneOf(DeviceField.class);
1679 for (Entity e : toRemove) {
1680 changedFields.addAll(findChangedFields(newDevice, e));
1682 DeviceUpdate update = null;
1683 if (changedFields.size() > 0) {
1684 update = new DeviceUpdate(d, CHANGE, changedFields);
1687 if (!deviceMap.replace(newDevice.getDeviceKey(), d,
1689 // concurrent modification; try again
1690 // need to use device that is the map now for the next
1692 d = deviceMap.get(d.getDeviceKey());
1696 if (update != null) {
1697 // need to count after all possibly continue stmts in
1699 // debugCounters.updateCounter(CNT_DEVICE_CHANGED);
1700 deviceUpdates.add(update);
1703 DeviceUpdate update = new DeviceUpdate(d, DELETE, null);
1704 if (!deviceMap.remove(d.getDeviceKey(), d)) {
1705 // concurrent modification; try again
1706 // need to use device that is the map now for the next
1708 d = deviceMap.get(d.getDeviceKey());
1711 // debugCounters.updateCounter(CNT_DEVICE_DELETED);
1713 deviceUpdates.add(update);
1715 processUpdates(deviceUpdates);
1721 protected void removeEntity(Entity removed, IEntityClass entityClass,
1722 Long deviceKey, Collection<Entity> others) {
1723 // Don't count in this method. This method CAN BE called to clean-up
1724 // after concurrent device adds/updates and thus counting here
1726 for (DeviceIndex index : secondaryIndexMap.values()) {
1727 index.removeEntityIfNeeded(removed, deviceKey, others);
1729 ClassState classState = getClassState(entityClass);
1730 for (DeviceIndex index : classState.secondaryIndexMap.values()) {
1731 index.removeEntityIfNeeded(removed, deviceKey, others);
1734 primaryIndex.removeEntityIfNeeded(removed, deviceKey, others);
1736 if (classState.classIndex != null) {
1737 classState.classIndex.removeEntityIfNeeded(removed, deviceKey,
1743 * method to delete a given device, remove all entities first and then
1744 * finally delete the device itself.
1748 protected void deleteDevice(Device device) {
1749 // Don't count in this method. This method CAN BE called to clean-up
1750 // after concurrent device adds/updates and thus counting here
1752 ArrayList<Entity> emptyToKeep = new ArrayList<Entity>();
1753 for (Entity entity : device.getEntities()) {
1754 this.removeEntity(entity, device.getEntityClass(),
1755 device.getDeviceKey(), emptyToKeep);
1757 if (!deviceMap.remove(device.getDeviceKey(), device)) {
1758 if (logger.isDebugEnabled())
1759 logger.debug("device map does not have this device -"
1760 + device.toString());
1764 private EnumSet<DeviceField> getEntityKeys(Long macAddress, Short vlan,
1765 Integer ipv4Address, NodeConnector port) {
1766 // FIXME: vlan==null is a valid search. Need to handle this
1767 // case correctly. Note that the code will still work correctly.
1768 // But we might do a full device search instead of using an index.
1769 EnumSet<DeviceField> keys = EnumSet.noneOf(DeviceField.class);
1770 if (macAddress != null)
1771 keys.add(DeviceField.MAC);
1773 keys.add(DeviceField.VLAN);
1774 if (ipv4Address != null)
1775 keys.add(DeviceField.IPV4);
1777 keys.add(DeviceField.SWITCHPORT);
1781 protected Iterator<Device> queryClassByEntity(IEntityClass clazz,
1782 EnumSet<DeviceField> keyFields, Entity entity) {
1783 ClassState classState = getClassState(clazz);
1784 DeviceIndex index = classState.secondaryIndexMap.get(keyFields);
1786 return Collections.<Device> emptySet().iterator();
1787 return new DeviceIndexInterator(this, index.queryByEntity(entity));
1790 protected Device allocateDevice(Long deviceKey, Entity entity,
1791 IEntityClass entityClass) {
1792 return new Device(this, deviceKey, entity, entityClass);
1796 protected Device allocateDevice(Long deviceKey, String dhcpClientName,
1797 List<AttachmentPoint> aps, List<AttachmentPoint> trueAPs,
1798 Collection<Entity> entities, IEntityClass entityClass) {
1799 return new Device(this, deviceKey, dhcpClientName, aps, trueAPs,
1800 entities, entityClass);
1803 protected Device allocateDevice(Device device, Entity entity,
1804 int insertionpoint) {
1805 return new Device(device, entity, insertionpoint);
1809 protected Device allocateDevice(Device device, Set<Entity> entities) {
1810 List<AttachmentPoint> newPossibleAPs = new ArrayList<AttachmentPoint>();
1811 List<AttachmentPoint> newAPs = new ArrayList<AttachmentPoint>();
1812 for (Entity entity : entities) {
1813 if (entity.getPort() != null) {
1814 AttachmentPoint aP = new AttachmentPoint(entity.getPort(), 0);
1815 newPossibleAPs.add(aP);
1818 if (device.attachmentPoints != null) {
1819 for (AttachmentPoint oldAP : device.attachmentPoints) {
1820 if (newPossibleAPs.contains(oldAP)) {
1825 if (newAPs.isEmpty())
1827 Device d = new Device(this, device.getDeviceKey(),
1828 device.getDHCPClientName(), newAPs, null, entities,
1829 device.getEntityClass());
1830 d.updateAttachmentPoint();
1834 // *********************
1835 // ITopologyManagerAware
1836 // *********************
1839 public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
1840 Iterator<Device> diter = deviceMap.values().iterator();
1842 while (diter.hasNext()) {
1843 Device d = diter.next();
1844 if (d.updateAttachmentPoint()) {
1845 if (logger.isDebugEnabled()) {
1846 logger.debug("Attachment point changed for device: {}", d);
1848 sendDeviceMovedNotification(d);
1854 public void edgeOverUtilized(Edge edge) {
1859 public void edgeUtilBackToNormal(Edge edge) {
1863 // *********************
1864 // IEntityClassListener
1865 // *********************
1868 public void entityClassChanged(Set<String> entityClassNames) {
1870 * iterate through the devices, reclassify the devices that belong to
1871 * these entity class names
1873 Iterator<Device> diter = deviceMap.values().iterator();
1874 while (diter.hasNext()) {
1875 Device d = diter.next();
1876 if (d.getEntityClass() == null
1877 || entityClassNames.contains(d.getEntityClass().getName()))
1878 reclassifyDevice(d);
1886 * Send update notifications to listeners
1889 * the updates to process.
1891 protected void sendDeviceMovedNotification(Device d) {
1892 // debugCounters.updateCounter(CNT_DEVICE_MOVED);
1893 // deviceSyncManager.storeDevice(d);
1894 List<IDeviceListener> listeners = deviceListeners.getOrderedListeners();
1895 if (listeners != null) {
1896 for (IDeviceListener listener : listeners) {
1897 listener.deviceMoved(d);
1903 * this method will reclassify and reconcile a device - possibilities are -
1904 * create new device(s), remove entities from this device. If the device
1905 * entity class did not change then it returns false else true.
1909 protected boolean reclassifyDevice(Device device) {
1910 // first classify all entities of this device
1911 if (device == null) {
1912 logger.debug("In reclassify for null device");
1915 boolean needToReclassify = false;
1916 for (Entity entity : device.entities) {
1917 IEntityClass entityClass = this.entityClassifier
1918 .classifyEntity(entity);
1919 if (entityClass == null || device.getEntityClass() == null) {
1920 needToReclassify = true;
1923 if (!entityClass.getName()
1924 .equals(device.getEntityClass().getName())) {
1925 needToReclassify = true;
1929 if (needToReclassify == false) {
1933 // debugCounters.updateCounter(CNT_DEVICE_RECLASSIFY_DELETE);
1934 LinkedList<DeviceUpdate> deviceUpdates = new LinkedList<DeviceUpdate>();
1935 // delete this device and then re-learn all the entities
1936 this.deleteDevice(device);
1937 deviceUpdates.add(new DeviceUpdate(device, DeviceUpdate.Change.DELETE,
1939 if (!deviceUpdates.isEmpty())
1940 processUpdates(deviceUpdates);
1941 for (Entity entity : device.entities) {
1942 this.learnDeviceByEntity(entity);
1948 * For testing: sets the interval between writes of the same device to the
1953 // void setSyncStoreWriteInterval(int intervalMs) {
1954 // this.syncStoreWriteIntervalMs = intervalMs;
1958 * For testing: sets the time between transition to MASTER and consolidate
1963 // void setInitialSyncStoreConsolidateMs(int intervalMs) {
1964 // this.initialSyncStoreConsolidateMs = intervalMs;
1967 private long toLong(byte[] address) {
1969 for (int i = 0; i < 6; i++) {
1970 long t = (address[i] & 0xffL) << ((5 - i) * 8);
1977 * Accepts an IPv4 address in a byte array and returns the corresponding
1978 * 32-bit integer value.
1983 private static int toIPv4Address(byte[] ipAddress) {
1985 for (int i = 0; i < 4; i++) {
1986 int t = (ipAddress[i] & 0xff) << ((3 - i) * 8);
1992 private void registerDeviceManagerDebugCounters() {
1994 * XXX Missing functionality if (debugCounters == null) {
1995 * logger.error("Debug Counter Service not found."); debugCounters = new
1996 * NullDebugCounter(); return; }
1997 * debugCounters.registerCounter(CNT_INCOMING,
1998 * "All incoming packets seen by this module",
1999 * CounterType.ALWAYS_COUNT);
2000 * debugCounters.registerCounter(CNT_RECONCILE_REQUEST,
2001 * "Number of flows that have been received for reconciliation by " +
2002 * "this module", CounterType.ALWAYS_COUNT);
2003 * debugCounters.registerCounter(CNT_RECONCILE_NO_SOURCE,
2004 * "Number of flow reconcile events that failed because no source " +
2005 * "device could be identified", CounterType.WARN); // is this really a
2006 * warning debugCounters.registerCounter(CNT_RECONCILE_NO_DEST,
2007 * "Number of flow reconcile events that failed because no " +
2008 * "destination device could be identified", CounterType.WARN); // is
2009 * this really a warning
2010 * debugCounters.registerCounter(CNT_BROADCAST_SOURCE,
2011 * "Number of packetIns that were discarded because the source " +
2012 * "MAC was broadcast or multicast", CounterType.WARN);
2013 * debugCounters.registerCounter(CNT_NO_SOURCE,
2014 * "Number of packetIns that were discarded because the " +
2015 * "could not identify a source device. This can happen if a " +
2016 * "packet is not allowed, appears on an illegal port, does not " +
2017 * "have a valid address space, etc.", CounterType.WARN);
2018 * debugCounters.registerCounter(CNT_NO_DEST,
2019 * "Number of packetIns that did not have an associated " +
2020 * "destination device. E.g., because the destination MAC is " +
2021 * "broadcast/multicast or is not yet known to the controller.",
2022 * CounterType.ALWAYS_COUNT);
2023 * debugCounters.registerCounter(CNT_DHCP_CLIENT_NAME_SNOOPED,
2024 * "Number of times a DHCP client name was snooped from a " +
2025 * "packetIn.", CounterType.ALWAYS_COUNT);
2026 * debugCounters.registerCounter(CNT_DEVICE_ON_INTERAL_PORT_NOT_LEARNED,
2027 * "Number of times packetIn was received on an internal port and" +
2028 * "no source device is known for the source MAC. The packetIn is " +
2029 * "discarded.", CounterType.WARN);
2030 * debugCounters.registerCounter(CNT_PACKET_NOT_ALLOWED,
2031 * "Number of times a packetIn was not allowed due to spoofing " +
2032 * "protection configuration.", CounterType.WARN); // is this really a
2033 * warning? debugCounters.registerCounter(CNT_NEW_DEVICE,
2034 * "Number of times a new device was learned",
2035 * CounterType.ALWAYS_COUNT); debugCounters.registerCounter(
2036 * CNT_PACKET_ON_INTERNAL_PORT_FOR_KNOWN_DEVICE,
2037 * "Number of times a packetIn was received on an internal port " +
2038 * "for a known device.", CounterType.ALWAYS_COUNT);
2039 * debugCounters.registerCounter(CNT_NEW_ENTITY,
2040 * "Number of times a new entity was learned for an existing device",
2041 * CounterType.ALWAYS_COUNT);
2042 * debugCounters.registerCounter(CNT_DEVICE_CHANGED,
2043 * "Number of times device properties have changed",
2044 * CounterType.ALWAYS_COUNT);
2045 * debugCounters.registerCounter(CNT_DEVICE_MOVED,
2046 * "Number of times devices have moved", CounterType.ALWAYS_COUNT);
2047 * debugCounters.registerCounter(CNT_CLEANUP_ENTITIES_RUNS,
2048 * "Number of times the entity cleanup task has been run",
2049 * CounterType.ALWAYS_COUNT);
2050 * debugCounters.registerCounter(CNT_ENTITY_REMOVED_TIMEOUT,
2051 * "Number of times entities have been removed due to timeout " +
2052 * "(entity has been inactive for " + ENTITY_TIMEOUT/1000 + "s)",
2053 * CounterType.ALWAYS_COUNT);
2054 * debugCounters.registerCounter(CNT_DEVICE_DELETED,
2055 * "Number of devices that have been removed due to inactivity",
2056 * CounterType.ALWAYS_COUNT);
2057 * debugCounters.registerCounter(CNT_DEVICE_RECLASSIFY_DELETE,
2058 * "Number of devices that required reclassification and have been " +
2059 * "temporarily delete for reclassification", CounterType.ALWAYS_COUNT);
2060 * debugCounters.registerCounter(CNT_DEVICE_STORED,
2061 * "Number of device entries written or updated to the sync store",
2062 * CounterType.ALWAYS_COUNT);
2063 * debugCounters.registerCounter(CNT_DEVICE_STORE_THROTTLED,
2064 * "Number of times a device update to the sync store was " +
2065 * "requested but not performed because the same device entities " +
2066 * "have recently been updated already", CounterType.ALWAYS_COUNT);
2067 * debugCounters.registerCounter(CNT_DEVICE_REMOVED_FROM_STORE,
2068 * "Number of devices that were removed from the sync store " +
2069 * "because the local controller removed the device due to " +
2070 * "inactivity", CounterType.ALWAYS_COUNT);
2071 * debugCounters.registerCounter(CNT_SYNC_EXCEPTION,
2072 * "Number of times an operation on the sync store resulted in " +
2073 * "sync exception", CounterType.WARN); // it this an error?
2074 * debugCounters.registerCounter(CNT_DEVICES_FROM_STORE,
2075 * "Number of devices that were read from the sync store after " +
2076 * "the local controller transitioned from SLAVE to MASTER",
2077 * CounterType.ALWAYS_COUNT);
2078 * debugCounters.registerCounter(CNT_CONSOLIDATE_STORE_RUNS,
2079 * "Number of times the task to consolidate entries in the " +
2080 * "store witch live known devices has been run",
2081 * CounterType.ALWAYS_COUNT);
2082 * debugCounters.registerCounter(CNT_CONSOLIDATE_STORE_DEVICES_REMOVED,
2083 * "Number of times a device has been removed from the sync " +
2084 * "store because no corresponding live device is known. " +
2085 * "This indicates a remote controller still writing device " +
2086 * "entries despite the local controller being MASTER or an " +
2087 * "incosistent store update from the local controller.",
2088 * CounterType.WARN);
2089 * debugCounters.registerCounter(CNT_TRANSITION_TO_MASTER,
2090 * "Number of times this controller has transitioned from SLAVE " +
2091 * "to MASTER role. Will be 0 or 1.", CounterType.ALWAYS_COUNT);
2096 public HostNodeConnector hostFind(InetAddress networkAddress) {
2097 // TODO Auto-generated method stub
2102 public HostNodeConnector hostQuery(InetAddress networkAddress) {
2103 // TODO Auto-generated method stub
2108 public Future<HostNodeConnector> discoverHost(InetAddress networkAddress) {
2109 // TODO Auto-generated method stub
2114 public List<List<String>> getHostNetworkHierarchy(InetAddress hostAddress) {
2115 // TODO Auto-generated method stub
2120 public Set<HostNodeConnector> getAllHosts() {
2121 Collection<Device> devices = Collections
2122 .unmodifiableCollection(deviceMap.values());
2123 Iterator<Device> i = devices.iterator();
2124 Set<HostNodeConnector> nc = new HashSet<HostNodeConnector>();
2125 while (i.hasNext()) {
2126 Device device = i.next();
2127 nc.add(device.toHostNodeConnector());
2133 public Set<HostNodeConnector> getActiveStaticHosts() {
2134 // TODO Auto-generated method stub
2139 public Set<HostNodeConnector> getInactiveStaticHosts() {
2140 // TODO Auto-generated method stub
2145 public Status addStaticHost(String networkAddress, String dataLayerAddress,
2146 NodeConnector nc, String vlan) {
2147 // TODO Auto-generated method stub
2152 public Status removeStaticHost(String networkAddress) {
2153 // TODO Auto-generated method stub
2158 * For testing: consolidate the store NOW
2160 // void scheduleConsolidateStoreNow() {
2161 // this.storeConsolidateTask.reschedule(0, TimeUnit.MILLISECONDS);
2164 // private class DeviceSyncManager {
2165 // // maps (opaque) deviceKey to the time in System.nanoTime() when we
2166 // // last wrote the device to the sync store
2167 // private ConcurrentMap<Long, Long> lastWriteTimes =
2168 // new ConcurrentHashMap<Long, Long>();
2171 // * Write the given device to storage if we are MASTER.
2172 // * Use this method if the device has significantly changed (e.g.,
2173 // * new AP, new IP, entities removed).
2174 // * @param d the device to store
2176 // public void storeDevice(Device d) {
2181 // long now = System.nanoTime();
2182 // writeUpdatedDeviceToStorage(d);
2183 // lastWriteTimes.put(d.getDeviceKey(), now);
2187 // * Write the given device to storage if we are MASTER and if the
2188 // * last write for the device was more than this.syncStoreIntervalNs
2190 // * Use this method to updated last active times in the store.
2191 // * @param d the device to store
2193 // public void storeDeviceThrottled(Device d) {
2194 // long intervalNs = syncStoreWriteIntervalMs*1000L*1000L;
2199 // long now = System.nanoTime();
2200 // Long last = lastWriteTimes.get(d.getDeviceKey());
2201 // if (last == null ||
2202 // now - last > intervalNs) {
2203 // writeUpdatedDeviceToStorage(d);
2204 // lastWriteTimes.put(d.getDeviceKey(), now);
2206 // debugCounters.updateCounter(CNT_DEVICE_STORE_THROTTLED);
2211 // * Remove the given device from the store. If only some entities have
2212 // * been removed the updated device should be written using
2213 // * {@link #storeDevice(Device)}
2216 // public void removeDevice(Device d) {
2219 // // FIXME: could we have a problem with concurrent put to the
2220 // // hashMap? I.e., we write a stale entry to the map after the
2221 // // delete and now are left with an entry we'll never clean up
2222 // lastWriteTimes.remove(d.getDeviceKey());
2224 // // TODO: should probably do versioned delete. OTOH, even
2225 // // if we accidentally delete, we'll write it again after
2226 // // the next entity ....
2227 // debugCounters.updateCounter(CNT_DEVICE_REMOVED_FROM_STORE);
2228 // storeClient.delete(DeviceSyncRepresentation.computeKey(d));
2229 // } catch(ObsoleteVersionException e) {
2231 // } catch (SyncException e) {
2232 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2233 // logger.error("Could not remove device " + d + " from store", e);
2238 // * Remove the given Versioned device from the store. If the device
2239 // * was locally modified ignore the delete request.
2240 // * @param syncedDeviceKey
2242 // private void removeDevice(Versioned<DeviceSyncRepresentation> dev) {
2244 // debugCounters.updateCounter(CNT_DEVICE_REMOVED_FROM_STORE);
2245 // storeClient.delete(dev.getValue().getKey(),
2246 // dev.getVersion());
2247 // } catch(ObsoleteVersionException e) {
2248 // // Key was locally modified by another thread.
2249 // // Do not delete and ignore.
2250 // } catch(SyncException e) {
2251 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2252 // logger.error("Failed to remove device entry for " +
2253 // dev.toString() + " from store.", e);
2258 // * Synchronously transition from SLAVE to MASTER. By iterating through
2259 // * the store and learning all devices from the store
2261 // private void goToMaster() {
2262 // if (logger.isDebugEnabled()) {
2263 // logger.debug("Transitioning to MASTER role");
2265 // debugCounters.updateCounter(CNT_TRANSITION_TO_MASTER);
2266 // IClosableIterator<Map.Entry<String,Versioned<DeviceSyncRepresentation>>>
2269 // iter = storeClient.entries();
2270 // } catch (SyncException e) {
2271 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2272 // logger.error("Failed to read devices from sync store", e);
2276 // while(iter.hasNext()) {
2277 // Versioned<DeviceSyncRepresentation> versionedDevice =
2278 // iter.next().getValue();
2279 // DeviceSyncRepresentation storedDevice =
2280 // versionedDevice.getValue();
2281 // if (storedDevice == null)
2283 // debugCounters.updateCounter(CNT_DEVICES_FROM_STORE);
2284 // for(SyncEntity se: storedDevice.getEntities()) {
2285 // learnDeviceByEntity(se.asEntity());
2289 // if (iter != null)
2292 // storeConsolidateTask.reschedule(initialSyncStoreConsolidateMs,
2293 // TimeUnit.MILLISECONDS);
2297 // * Actually perform the write of the device to the store
2298 // * FIXME: concurrent modification behavior
2299 // * @param device The device to write
2301 // private void writeUpdatedDeviceToStorage(Device device) {
2303 // debugCounters.updateCounter(CNT_DEVICE_STORED);
2304 // // FIXME: use a versioned put
2305 // DeviceSyncRepresentation storeDevice =
2306 // new DeviceSyncRepresentation(device);
2307 // storeClient.put(storeDevice.getKey(), storeDevice);
2308 // } catch (ObsoleteVersionException e) {
2309 // // FIXME: what's the right behavior here. Can the store client
2310 // // even throw this error?
2311 // } catch (SyncException e) {
2312 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2313 // logger.error("Could not write device " + device +
2314 // " to sync store:", e);
2319 // * Iterate through all entries in the sync store. For each device
2320 // * in the store check if any stored entity matches a live device. If
2321 // * no entities match a live device we remove the entry from the store.
2323 // * Note: we do not check if all devices known to device manager are
2324 // * in the store. We rely on regular packetIns for that.
2325 // * Note: it's possible that multiple entries in the store map to the
2326 // * same device. We don't check or handle this case.
2328 // * We need to perform this check after a SLAVE->MASTER transition to
2329 // * get rid of all entries the old master might have written to the
2330 // * store after we took over. We also run it regularly in MASTER
2331 // * state to ensure we don't have stale entries in the store
2333 // private void consolidateStore() {
2336 // debugCounters.updateCounter(CNT_CONSOLIDATE_STORE_RUNS);
2337 // if (logger.isDebugEnabled()) {
2338 // logger.debug("Running consolidateStore.");
2340 // IClosableIterator<Map.Entry<String,Versioned<DeviceSyncRepresentation>>>
2343 // iter = storeClient.entries();
2344 // } catch (SyncException e) {
2345 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2346 // logger.error("Failed to read devices from sync store", e);
2350 // while(iter.hasNext()) {
2351 // boolean found = false;
2352 // Versioned<DeviceSyncRepresentation> versionedDevice =
2353 // iter.next().getValue();
2354 // DeviceSyncRepresentation storedDevice =
2355 // versionedDevice.getValue();
2356 // if (storedDevice == null)
2358 // for(SyncEntity se: storedDevice.getEntities()) {
2360 // // Do we have a device for this entity??
2361 // IDevice d = findDevice(se.macAddress, se.vlan,
2369 // } catch (IllegalArgumentException e) {
2370 // // not all key fields provided. Skip entity
2374 // // We currently DO NOT have a live device that
2375 // // matches the current device from the store.
2376 // // Delete device from store.
2377 // if (logger.isDebugEnabled()) {
2378 // logger.debug("Removing device {} from store. No "
2379 // + "corresponding live device",
2380 // storedDevice.getKey());
2382 // debugCounters.updateCounter(CNT_CONSOLIDATE_STORE_DEVICES_REMOVED);
2383 // removeDevice(versionedDevice);
2387 // if (iter != null)
2395 // * For testing. Sets the syncService. Only call after init but before
2396 // * startUp. Used by MockDeviceManager
2397 // * @param syncService
2399 // protected void setSyncServiceIfNotSet(ISyncService syncService) {
2400 // if (this.syncService == null)
2401 // this.syncService = syncService;