2 * Copyright (c) 2011,2012 Big Switch Networks, Inc.
4 * Licensed under the Eclipse Public License, Version 1.0 (the
5 * "License"); you may not use this file except in compliance with the
6 * License. You may obtain a copy of the License at
8 * http://www.eclipse.org/legal/epl-v10.html
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 * implied. See the License for the specific language governing
14 * permissions and limitations under the License.
16 * This file incorporates work covered by the following copyright and
19 * Originally created by David Erickson, Stanford University
21 * Licensed under the Apache License, Version 2.0 (the "License");
22 * you may not use this file except in compliance with the
23 * License. You may obtain a copy of the License at
25 * http://www.apache.org/licenses/LICENSE-2.0
27 * Unless required by applicable law or agreed to in writing,
28 * software distributed under the License is distributed on an "AS
29 * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
30 * express or implied. See the License for the specific language
31 * governing permissions and limitations under the License.
34 package org.opendaylight.controller.hosttracker.internal;
36 import static org.opendaylight.controller.hosttracker.internal.DeviceManagerImpl.DeviceUpdate.Change.ADD;
37 import static org.opendaylight.controller.hosttracker.internal.DeviceManagerImpl.DeviceUpdate.Change.CHANGE;
38 import static org.opendaylight.controller.hosttracker.internal.DeviceManagerImpl.DeviceUpdate.Change.DELETE;
40 import java.net.InetAddress;
41 import java.net.UnknownHostException;
42 import java.util.ArrayList;
43 import java.util.Calendar;
44 import java.util.Collection;
45 import java.util.Collections;
46 import java.util.Comparator;
47 import java.util.Date;
48 import java.util.EnumSet;
49 import java.util.HashMap;
50 import java.util.HashSet;
51 import java.util.Iterator;
52 import java.util.LinkedList;
53 import java.util.List;
55 import java.util.Queue;
57 import java.util.concurrent.ConcurrentHashMap;
58 import java.util.concurrent.Executors;
59 import java.util.concurrent.Future;
60 import java.util.concurrent.ScheduledExecutorService;
61 import java.util.concurrent.TimeUnit;
63 import org.opendaylight.controller.hosttracker.Entity;
64 import org.opendaylight.controller.hosttracker.IDevice;
65 import org.opendaylight.controller.hosttracker.IDeviceListener;
66 import org.opendaylight.controller.hosttracker.IDeviceService;
67 import org.opendaylight.controller.hosttracker.IEntityClass;
68 import org.opendaylight.controller.hosttracker.IEntityClassListener;
69 import org.opendaylight.controller.hosttracker.IEntityClassifierService;
70 import org.opendaylight.controller.hosttracker.IfIptoHost;
71 import org.opendaylight.controller.hosttracker.IfNewHostNotify;
72 import org.opendaylight.controller.hosttracker.SwitchPort;
73 import org.opendaylight.controller.hosttracker.hostAware.HostNodeConnector;
74 import org.opendaylight.controller.sal.core.Edge;
75 import org.opendaylight.controller.sal.core.NodeConnector;
76 import org.opendaylight.controller.sal.core.NodeConnector.NodeConnectorIDType;
77 import org.opendaylight.controller.sal.packet.ARP;
78 import org.opendaylight.controller.sal.packet.Ethernet;
79 import org.opendaylight.controller.sal.packet.IDataPacketService;
80 import org.opendaylight.controller.sal.packet.IListenDataPacket;
81 import org.opendaylight.controller.sal.packet.Packet;
82 import org.opendaylight.controller.sal.packet.PacketResult;
83 import org.opendaylight.controller.sal.packet.RawPacket;
84 import org.opendaylight.controller.sal.topology.TopoEdgeUpdate;
85 import org.opendaylight.controller.sal.utils.HexEncode;
86 import org.opendaylight.controller.sal.utils.ListenerDispatcher;
87 import org.opendaylight.controller.sal.utils.MultiIterator;
88 import org.opendaylight.controller.sal.utils.SingletonTask;
89 import org.opendaylight.controller.sal.utils.Status;
90 import org.opendaylight.controller.sal.utils.StatusCode;
91 import org.opendaylight.controller.switchmanager.ISwitchManager;
92 import org.opendaylight.controller.topologymanager.ITopologyManager;
93 import org.opendaylight.controller.topologymanager.ITopologyManagerAware;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
98 * DeviceManager creates Devices based upon MAC addresses seen in the network.
99 * It tracks any network addresses mapped to the Device, and its location within
104 public class DeviceManagerImpl implements IDeviceService, IEntityClassListener,
105 IListenDataPacket, ITopologyManagerAware, IfIptoHost {
106 protected static Logger logger = LoggerFactory
107 .getLogger(DeviceManagerImpl.class);
109 public static final String MODULE_NAME = "devicemanager";
111 // protected ITopologyService topology;
112 // protected IStorageSourceService storageSource;
113 // protected IRestApiService restApi;
114 // protected IThreadPoolService threadPool;
115 // protected IFlowReconcileService flowReconcileMgr;
116 // protected IFlowReconcileEngineService flowReconcileEngine;
117 // protected IDebugCounterService debugCounters;
118 // private ISyncService syncService;
119 // private IStoreClient<String,DeviceSyncRepresentation> storeClient;
120 // private DeviceSyncManager deviceSyncManager;
122 private ITopologyManager topology;
123 private ISwitchManager switchManager = null;
124 private IDataPacketService dataPacketService = null;
126 public static final String CNT_INCOMING = MODULE_NAME + "-incoming";
127 public static final String CNT_RECONCILE_REQUEST = MODULE_NAME
128 + "-reconcileRequest";
129 public static final String CNT_RECONCILE_NO_SOURCE = MODULE_NAME
130 + "-reconcileNoSourceDevice";
131 public static final String CNT_RECONCILE_NO_DEST = MODULE_NAME
132 + "-reconcileNoDestDevice";
133 public static final String CNT_BROADCAST_SOURCE = MODULE_NAME
134 + "-broadcastSource";
135 public static final String CNT_NO_SOURCE = MODULE_NAME + "-noSourceDevice";
136 public static final String CNT_NO_DEST = MODULE_NAME + "-noDestDevice";
137 public static final String CNT_DHCP_CLIENT_NAME_SNOOPED = MODULE_NAME
138 + "-dhcpClientNameSnooped";
139 public static final String CNT_DEVICE_ON_INTERAL_PORT_NOT_LEARNED = MODULE_NAME
140 + "-deviceOnInternalPortNotLearned";
141 public static final String CNT_PACKET_NOT_ALLOWED = MODULE_NAME
142 + "-packetNotAllowed";
143 public static final String CNT_NEW_DEVICE = MODULE_NAME + "-newDevice";
144 public static final String CNT_PACKET_ON_INTERNAL_PORT_FOR_KNOWN_DEVICE = MODULE_NAME
145 + "-packetOnInternalPortForKnownDevice";
146 public static final String CNT_NEW_ENTITY = MODULE_NAME + "-newEntity";
147 public static final String CNT_DEVICE_CHANGED = MODULE_NAME
149 public static final String CNT_DEVICE_MOVED = MODULE_NAME + "-deviceMoved";
150 public static final String CNT_CLEANUP_ENTITIES_RUNS = MODULE_NAME
151 + "-cleanupEntitiesRuns";
152 public static final String CNT_ENTITY_REMOVED_TIMEOUT = MODULE_NAME
153 + "-entityRemovedTimeout";
154 public static final String CNT_DEVICE_DELETED = MODULE_NAME
156 public static final String CNT_DEVICE_RECLASSIFY_DELETE = MODULE_NAME
157 + "-deviceReclassifyDelete";
158 public static final String CNT_DEVICE_STORED = MODULE_NAME
160 public static final String CNT_DEVICE_STORE_THROTTLED = MODULE_NAME
161 + "-deviceStoreThrottled";
162 public static final String CNT_DEVICE_REMOVED_FROM_STORE = MODULE_NAME
163 + "-deviceRemovedFromStore";
164 public static final String CNT_SYNC_EXCEPTION = MODULE_NAME
166 public static final String CNT_DEVICES_FROM_STORE = MODULE_NAME
167 + "-devicesFromStore";
168 public static final String CNT_CONSOLIDATE_STORE_RUNS = MODULE_NAME
169 + "-consolidateStoreRuns";
170 public static final String CNT_CONSOLIDATE_STORE_DEVICES_REMOVED = MODULE_NAME
171 + "-consolidateStoreDevicesRemoved";
173 static final String DEVICE_SYNC_STORE_NAME = DeviceManagerImpl.class
174 .getCanonicalName() + ".stateStore";
177 * Time interval between writes of entries for the same device to the sync
180 // static final int DEFAULT_SYNC_STORE_WRITE_INTERVAL_MS =
181 // 5*60*1000; // 5 min
182 // private int syncStoreWriteIntervalMs =
183 // DEFAULT_SYNC_STORE_WRITE_INTERVAL_MS;
186 * Time after SLAVE->MASTER until we run the consolidate store code.
188 // static final int DEFAULT_INITIAL_SYNC_STORE_CONSOLIDATE_MS =
189 // 15*1000; // 15 sec
190 // private int initialSyncStoreConsolidateMs =
191 // DEFAULT_INITIAL_SYNC_STORE_CONSOLIDATE_MS;
194 * Time interval between consolidate store runs.
196 // static final int DEFAULT_SYNC_STORE_CONSOLIDATE_INTERVAL_MS =
197 // 75*60*1000; // 75 min
198 // private final int syncStoreConsolidateIntervalMs =
199 // DEFAULT_SYNC_STORE_CONSOLIDATE_INTERVAL_MS;
202 * Time in milliseconds before entities will expire
204 protected static final int ENTITY_TIMEOUT = 60 * 60 * 1000;
207 * Time in seconds between cleaning up old entities/devices
209 protected static final int ENTITY_CLEANUP_INTERVAL = 60 * 60;
212 * This is the master device map that maps device IDs to {@link Device}
215 protected ConcurrentHashMap<Long, Device> deviceMap;
218 * Counter used to generate device keys
220 protected long deviceKeyCounter = 0;
223 * Lock for incrementing the device key counter
225 protected Object deviceKeyLock = new Object();
228 * This is the primary entity index that contains all entities
230 protected DeviceUniqueIndex primaryIndex;
233 * This stores secondary indices over the fields in the devices
235 protected Map<EnumSet<DeviceField>, DeviceIndex> secondaryIndexMap;
238 * This map contains state for each of the {@ref IEntityClass} that exist
240 protected ConcurrentHashMap<String, ClassState> classStateMap;
243 * This is the list of indices we want on a per-class basis
245 protected Set<EnumSet<DeviceField>> perClassIndices;
248 * The entity classifier currently in use
250 protected IEntityClassifierService entityClassifier;
253 * Used to cache state about specific entity classes
255 protected class ClassState {
260 protected DeviceUniqueIndex classIndex;
263 * This stores secondary indices over the fields in the device for the
266 protected Map<EnumSet<DeviceField>, DeviceIndex> secondaryIndexMap;
269 * Allocate a new {@link ClassState} object for the class
272 * the class to use for the state
274 public ClassState(IEntityClass clazz) {
275 EnumSet<DeviceField> keyFields = clazz.getKeyFields();
276 EnumSet<DeviceField> primaryKeyFields = entityClassifier
278 boolean keyFieldsMatchPrimary = primaryKeyFields.equals(keyFields);
280 if (!keyFieldsMatchPrimary)
281 classIndex = new DeviceUniqueIndex(keyFields);
283 secondaryIndexMap = new HashMap<EnumSet<DeviceField>, DeviceIndex>();
284 for (EnumSet<DeviceField> fields : perClassIndices) {
285 secondaryIndexMap.put(fields, new DeviceMultiIndex(fields));
291 * Device manager event listeners reclassifyDeviceListeners are notified
292 * first before reconcileDeviceListeners. This is to make sure devices are
293 * correctly reclassified before reconciliation.
295 protected ListenerDispatcher<String, IDeviceListener> deviceListeners;
298 * Using the IfNewHostNotify to notify listeners of host changes.
300 private Set<IfNewHostNotify> newHostNotify = Collections.synchronizedSet(new HashSet<IfNewHostNotify>());
302 * A device update event to be dispatched
304 protected static class DeviceUpdate {
310 * The affected device
312 protected Device device;
315 * The change that was made
317 protected Change change;
320 * If not added, then this is the list of fields changed
322 protected EnumSet<DeviceField> fieldsChanged;
324 public DeviceUpdate(Device device, Change change,
325 EnumSet<DeviceField> fieldsChanged) {
327 this.device = device;
328 this.change = change;
329 this.fieldsChanged = fieldsChanged;
333 public String toString() {
334 String devIdStr = device.getEntityClass().getName() + "::"
335 + device.getMACAddressString();
336 return "DeviceUpdate [device=" + devIdStr + ", change=" + change
337 + ", fieldsChanged=" + fieldsChanged + "]";
343 * AttachmentPointComparator
345 * Compares two attachment points and returns the latest one. It is assumed
346 * that the two attachment points are in the same L2 domain.
350 protected class AttachmentPointComparator implements
351 Comparator<AttachmentPoint> {
352 public AttachmentPointComparator() {
357 public int compare(AttachmentPoint oldAP, AttachmentPoint newAP) {
358 // First compare based on L2 domain ID;
360 // XXX - missing functionality -- need topology
361 // long oldDomain = topology.getL2DomainId(oldSw);
362 // boolean oldBD = topology.isBroadcastDomainPort(oldSw, oldPort);
364 boolean oldBD = false;
366 // XXX - missing functionality -- need topology
367 // long newDomain = topology.getL2DomainId(newSw);
368 // boolean newBD = topology.isBroadcastDomainPort(newSw, newPort);
370 boolean newBD = false;
372 if (oldDomain < newDomain)
374 else if (oldDomain > newDomain)
377 // Give preference to OFPP_LOCAL always
378 if (!oldAP.getPort().getType().equals(NodeConnectorIDType.SWSTACK)
379 && newAP.getPort().getType()
380 .equals(NodeConnectorIDType.SWSTACK)) {
382 } else if (oldAP.getPort().getType()
383 .equals(NodeConnectorIDType.SWSTACK)
384 && !newAP.getPort().getType()
385 .equals(NodeConnectorIDType.SWSTACK)) {
389 // We expect that the last seen of the new AP is higher than
390 // old AP, if it is not, just reverse and send the negative
392 if (oldAP.getActiveSince() > newAP.getActiveSince())
393 return -compare(newAP, oldAP);
395 long activeOffset = 0;
396 // XXX - missing functionality -- need topology
397 // if (!topology.isConsistent(oldSw, oldPort, newSw, newPort)) {
398 if (!newBD && oldBD) {
401 if (newBD && oldBD) {
402 activeOffset = AttachmentPoint.EXTERNAL_TO_EXTERNAL_TIMEOUT;
403 } else if (newBD && !oldBD) {
404 activeOffset = AttachmentPoint.OPENFLOW_TO_EXTERNAL_TIMEOUT;
408 // // The attachment point is consistent.
409 // activeOffset = AttachmentPoint.CONSISTENT_TIMEOUT;
412 if ((newAP.getActiveSince() > oldAP.getLastSeen() + activeOffset)
413 || (newAP.getLastSeen() > oldAP.getLastSeen()
414 + AttachmentPoint.INACTIVITY_INTERVAL)) {
422 * Comparator for sorting by cluster ID
424 public AttachmentPointComparator apComparator;
427 * Switch ports where attachment points shouldn't be learned
429 private Set<SwitchPort> suppressAPs;
432 * Periodic task to clean up expired entities
434 public SingletonTask entityCleanupTask;
436 // ********************
437 // Dependency injection
438 // ********************
440 void setNewHostNotify(IfNewHostNotify obj){
441 this.newHostNotify.add(obj);
444 void unsetNewHostNotify(IfNewHostNotify obj){
445 this.newHostNotify.remove(obj);
448 void setDataPacketService(IDataPacketService s) {
449 this.dataPacketService = s;
452 void unsetDataPacketService(IDataPacketService s) {
453 if (this.dataPacketService == s) {
454 this.dataPacketService = null;
458 public void setTopologyManager(ITopologyManager s) {
462 public void unsetTopologyManager(ITopologyManager s) {
463 if (this.topology == s) {
464 logger.debug("Topology Manager Service removed!");
465 this.topology = null;
469 private volatile boolean stopped = true;
470 private ScheduledExecutorService ses;
478 public void start() {
479 this.perClassIndices = new HashSet<EnumSet<DeviceField>>();
481 // XXX - TODO need to make it possible to register a non-default
483 entityClassifier = new DefaultEntityClassifier();
484 this.deviceListeners = new ListenerDispatcher<String, IDeviceListener>();
485 this.suppressAPs = Collections
486 .newSetFromMap(new ConcurrentHashMap<SwitchPort, Boolean>());
487 primaryIndex = new DeviceUniqueIndex(entityClassifier.getKeyFields());
488 secondaryIndexMap = new HashMap<EnumSet<DeviceField>, DeviceIndex>();
490 deviceMap = new ConcurrentHashMap<Long, Device>();
491 classStateMap = new ConcurrentHashMap<String, ClassState>();
492 apComparator = new AttachmentPointComparator();
494 addIndex(true, EnumSet.of(DeviceField.IPV4));
496 // floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
497 // floodlightProvider.addHAListener(this.haListenerDelegate);
498 // if (topology != null)
499 // topology.addListener(this);
500 // flowReconcileMgr.addFlowReconcileListener(this);
501 // entityClassifier.addListener(this);
504 // XXX - Should use a common threadpool but this doesn't currently exist
505 ses = Executors.newScheduledThreadPool(1);
506 Runnable ecr = new Runnable() {
511 entityCleanupTask.reschedule(ENTITY_CLEANUP_INTERVAL,
515 entityCleanupTask = new SingletonTask(ses, ecr);
516 entityCleanupTask.reschedule(ENTITY_CLEANUP_INTERVAL, TimeUnit.SECONDS);
519 * XXX Missing functionality if (restApi != null) {
520 * restApi.addRestletRoutable(new DeviceRoutable()); } else {
521 * logger.debug("Could not instantiate REST API"); }
524 registerDeviceManagerDebugCounters();
527 * XXX Missing functionality try {
528 * this.syncService.registerStore(DEVICE_SYNC_STORE_NAME, Scope.LOCAL);
529 * this.storeClient = this.syncService
530 * .getStoreClient(DEVICE_SYNC_STORE_NAME, String.class,
531 * DeviceSyncRepresentation.class); } catch (SyncException e) { throw
532 * new FloodlightModuleException("Error while setting up sync service",
535 * Runnable consolidateStoreRunner = new Runnable() {
537 * @Override public void run() { deviceSyncManager.consolidateStore();
538 * storeConsolidateTask.reschedule(syncStoreConsolidateIntervalMs,
539 * TimeUnit.MILLISECONDS); debugCounters.flushCounters(); } };
540 * storeConsolidateTask = new SingletonTask(ses,
541 * consolidateStoreRunner); if (isMaster)
542 * storeConsolidateTask.reschedule(syncStoreConsolidateIntervalMs,
543 * TimeUnit.MILLISECONDS);
548 * Periodic task to consolidate entries in the store. I.e., delete entries
549 * in the store that are not known to DeviceManager
551 // XXX - Missing functionality
552 // private SingletonTask storeConsolidateTask;
554 // *********************
555 // IDeviceManagerService
556 // *********************
558 void setSwitchManager(ISwitchManager s) {
559 logger.debug("SwitchManager set");
560 this.switchManager = s;
563 void unsetSwitchManager(ISwitchManager s) {
564 if (this.switchManager == s) {
565 logger.debug("SwitchManager removed!");
566 this.switchManager = null;
571 public IDevice getDevice(Long deviceKey) {
572 return deviceMap.get(deviceKey);
576 public IDevice findDevice(long macAddress, Short vlan, Integer ipv4Address,
577 NodeConnector port) throws IllegalArgumentException {
578 if (vlan != null && vlan.shortValue() <= 0)
580 if (ipv4Address != null && ipv4Address == 0)
582 Entity e = new Entity(macAddress, vlan, ipv4Address, port, null);
583 if (!allKeyFieldsPresent(e, entityClassifier.getKeyFields())) {
584 throw new IllegalArgumentException("Not all key fields specified."
585 + " Required fields: " + entityClassifier.getKeyFields());
587 return findDeviceByEntity(e);
591 public IDevice findClassDevice(IEntityClass entityClass, long macAddress,
592 Short vlan, Integer ipv4Address) throws IllegalArgumentException {
593 if (vlan != null && vlan.shortValue() <= 0)
595 if (ipv4Address != null && ipv4Address == 0)
597 Entity e = new Entity(macAddress, vlan, ipv4Address, null, null);
598 if (entityClass == null
599 || !allKeyFieldsPresent(e, entityClass.getKeyFields())) {
600 throw new IllegalArgumentException("Not all key fields and/or "
601 + " no source device specified. Required fields: "
602 + entityClassifier.getKeyFields());
604 return findDestByEntity(entityClass, e);
608 public Collection<? extends IDevice> getAllDevices() {
609 return Collections.unmodifiableCollection(deviceMap.values());
613 public void addIndex(boolean perClass, EnumSet<DeviceField> keyFields) {
615 perClassIndices.add(keyFields);
617 secondaryIndexMap.put(keyFields, new DeviceMultiIndex(keyFields));
622 public Iterator<? extends IDevice> queryDevices(Long macAddress,
623 Short vlan, Integer ipv4Address, NodeConnector port) {
624 DeviceIndex index = null;
625 if (secondaryIndexMap.size() > 0) {
626 EnumSet<DeviceField> keys = getEntityKeys(macAddress, vlan,
628 index = secondaryIndexMap.get(keys);
631 Iterator<Device> deviceIterator = null;
633 // Do a full table scan
634 deviceIterator = deviceMap.values().iterator();
637 Entity entity = new Entity((macAddress == null ? 0 : macAddress),
638 vlan, ipv4Address, port, null);
639 deviceIterator = new DeviceIndexInterator(this,
640 index.queryByEntity(entity));
643 DeviceIterator di = new DeviceIterator(deviceIterator, null,
644 macAddress, vlan, ipv4Address, port);
649 public Iterator<? extends IDevice> queryClassDevices(
650 IEntityClass entityClass, Long macAddress, Short vlan,
651 Integer ipv4Address, NodeConnector port) {
652 ArrayList<Iterator<Device>> iterators = new ArrayList<Iterator<Device>>();
653 ClassState classState = getClassState(entityClass);
655 DeviceIndex index = null;
656 if (classState.secondaryIndexMap.size() > 0) {
657 EnumSet<DeviceField> keys = getEntityKeys(macAddress, vlan,
659 index = classState.secondaryIndexMap.get(keys);
662 Iterator<Device> iter;
664 index = classState.classIndex;
667 return new DeviceIterator(deviceMap.values().iterator(),
668 new IEntityClass[] { entityClass }, macAddress, vlan,
671 // scan the entire class
672 iter = new DeviceIndexInterator(this, index.getAll());
676 Entity entity = new Entity((macAddress == null ? 0 : macAddress),
677 vlan, ipv4Address, port, null);
678 iter = new DeviceIndexInterator(this, index.queryByEntity(entity));
682 return new MultiIterator<Device>(iterators.iterator());
685 protected Iterator<Device> getDeviceIteratorForQuery(Long macAddress,
686 Short vlan, Integer ipv4Address, NodeConnector port) {
687 DeviceIndex index = null;
688 if (secondaryIndexMap.size() > 0) {
689 EnumSet<DeviceField> keys = getEntityKeys(macAddress, vlan,
691 index = secondaryIndexMap.get(keys);
694 Iterator<Device> deviceIterator = null;
696 // Do a full table scan
697 deviceIterator = deviceMap.values().iterator();
700 Entity entity = new Entity((macAddress == null ? 0 : macAddress),
701 vlan, ipv4Address, port, null);
702 deviceIterator = new DeviceIndexInterator(this,
703 index.queryByEntity(entity));
706 DeviceIterator di = new DeviceIterator(deviceIterator, null,
707 macAddress, vlan, ipv4Address, port);
712 public void addListener(IDeviceListener listener) {
713 deviceListeners.addListener("device", listener);
718 public void addSuppressAPs(NodeConnector port) {
719 suppressAPs.add(new SwitchPort(port));
723 public void removeSuppressAPs(NodeConnector port) {
724 suppressAPs.remove(new SwitchPort(port));
728 public Set<SwitchPort> getSuppressAPs() {
729 return Collections.unmodifiableSet(suppressAPs);
732 private void logListeners() {
733 List<IDeviceListener> listeners = deviceListeners.getOrderedListeners();
734 if (listeners != null) {
735 StringBuffer sb = new StringBuffer();
736 sb.append("DeviceListeners: ");
737 for (IDeviceListener l : listeners) {
738 sb.append(l.getName());
741 logger.debug(sb.toString());
746 // IFlowReconcileListener
749 * XXX - Missing functionality
751 * @Override public Command reconcileFlows(ArrayList<OFMatchReconcile>
752 * ofmRcList) { ListIterator<OFMatchReconcile> iter =
753 * ofmRcList.listIterator(); while (iter.hasNext()) { OFMatchReconcile ofm =
756 * // Remove the STOPPed flow. if (Command.STOP == reconcileFlow(ofm)) {
759 * if (ofmRcList.size() > 0) { return Command.CONTINUE; } else { return
762 * protected Command reconcileFlow(OFMatchReconcile ofm) {
763 * debugCounters.updateCounter(CNT_RECONCILE_REQUEST); // Extract source
764 * entity information Entity srcEntity =
765 * getEntityFromFlowMod(ofm.ofmWithSwDpid, true); if (srcEntity == null) {
766 * debugCounters.updateCounter(CNT_RECONCILE_NO_SOURCE); return
769 * // Find the device by source entity Device srcDevice =
770 * findDeviceByEntity(srcEntity); if (srcDevice == null) {
771 * debugCounters.updateCounter(CNT_RECONCILE_NO_SOURCE); return
772 * Command.STOP; } // Store the source device in the context
773 * fcStore.put(ofm.cntx, CONTEXT_SRC_DEVICE, srcDevice);
775 * // Find the device matching the destination from the entity // classes of
776 * the source. Entity dstEntity = getEntityFromFlowMod(ofm.ofmWithSwDpid,
777 * false); Device dstDevice = null; if (dstEntity != null) { dstDevice =
778 * findDestByEntity(srcDevice.getEntityClass(), dstEntity); if (dstDevice !=
779 * null) fcStore.put(ofm.cntx, CONTEXT_DST_DEVICE, dstDevice); else
780 * debugCounters.updateCounter(CNT_RECONCILE_NO_DEST); } else {
781 * debugCounters.updateCounter(CNT_RECONCILE_NO_DEST); } if
782 * (logger.isTraceEnabled()) {
783 * logger.trace("Reconciling flow: match={}, srcEntity={}, srcDev={}, " +
784 * "dstEntity={}, dstDev={}", new Object[] {ofm.ofmWithSwDpid.getOfMatch(),
785 * srcEntity, srcDevice, dstEntity, dstDevice } ); } return
786 * Command.CONTINUE; }
794 public PacketResult receiveDataPacket(RawPacket inPkt) {
795 // XXX - Can this really pass in null? Why would you ever want that?
797 return PacketResult.IGNORED;
800 // throw new Exception("Sample");
801 // } catch (Exception e) {
802 // logger.error("Sample stack trace", e);
805 Packet formattedPak = this.dataPacketService.decodeDataPacket(inPkt);
807 if (formattedPak instanceof Ethernet) {
808 eth = (Ethernet) formattedPak;
810 return PacketResult.IGNORED;
813 // Extract source entity information
814 NodeConnector inPort = inPkt.getIncomingNodeConnector();
815 Entity srcEntity = getSourceEntityFromPacket(eth, inPort);
816 if (srcEntity == null) {
817 // debugCounters.updateCounter(CNT_BROADCAST_SOURCE);
818 return PacketResult.CONSUME;
821 // Learn from ARP packet for special VRRP settings.
822 // In VRRP settings, the source MAC address and sender MAC
823 // addresses can be different. In such cases, we need to learn
824 // the IP to MAC mapping of the VRRP IP address. The source
825 // entity will not have that information. Hence, a separate call
826 // to learn devices in such cases.
827 learnDeviceFromArpResponseData(eth, inPort);
829 // Learn/lookup device information
830 Device srcDevice = learnDeviceByEntity(srcEntity);
831 if (srcDevice == null) {
832 // debugCounters.updateCounter(CNT_NO_SOURCE);
833 return PacketResult.CONSUME;
835 logger.trace("Saw packet from device {}", srcDevice);
837 // // Store the source device in the context
838 // fcStore.put(cntx, CONTEXT_SRC_DEVICE, srcDevice);
840 // // Find the device matching the destination from the entity
841 // // classes of the source.
842 // Entity dstEntity = getDestEntityFromPacket(eth);
843 // Device dstDevice = null;
844 // if (dstEntity != null) {
846 // findDestByEntity(srcDevice.getEntityClass(), dstEntity);
847 // if (dstDevice != null)
848 // fcStore.put(cntx, CONTEXT_DST_DEVICE, dstDevice);
850 // //debugCounters.updateCounter(CNT_NO_DEST);
852 // //debugCounters.updateCounter(CNT_NO_DEST);
855 // if (logger.isTraceEnabled()) {
856 // logger.trace("Received PI: {} on switch {}, port {} *** eth={}" +
857 // " *** srcDev={} *** dstDev={} *** ",
858 // new Object[] { pi, sw.getStringId(), pi.getInPort(), eth,
859 // srcDevice, dstDevice });
862 // snoopDHCPClientName(eth, srcDevice);
864 return PacketResult.KEEP_PROCESSING;
872 * Snoop and record client-provided host name from DHCP requests
877 // private void snoopDHCPClientName(Ethernet eth, Device srcDevice) {
878 // if (! (eth.getPayload() instanceof IPv4) )
880 // IPv4 ipv4 = (IPv4) eth.getPayload();
881 // if (! (ipv4.getPayload() instanceof UDP) )
883 // UDP udp = (UDP) ipv4.getPayload();
884 // if (!(udp.getPayload() instanceof DHCP))
886 // DHCP dhcp = (DHCP) udp.getPayload();
887 // byte opcode = dhcp.getOpCode();
888 // if (opcode == DHCP.OPCODE_REQUEST) {
889 // DHCPOption dhcpOption = dhcp.getOption(
890 // DHCPOptionCode.OptionCode_Hostname);
891 // if (dhcpOption != null) {
892 // debugCounters.updateCounter(CNT_DHCP_CLIENT_NAME_SNOOPED);
893 // srcDevice.dhcpClientName = new String(dhcpOption.getData());
899 * Check whether the given attachment point is valid given the current
906 * @return true if it's a valid attachment point
908 public boolean isValidAttachmentPoint(NodeConnector port) {
909 // XXX - missing functionality -- need topology module
910 // if (topology.isAttachmentPointPort(port) == false)
912 if (topology.isInternal(port))
914 if (!switchManager.isNodeConnectorEnabled(port))
916 if (suppressAPs.contains(new SwitchPort(port)))
923 * Get sender IP address from packet if the packet is either an ARP packet.
929 private int getSrcNwAddr(Ethernet eth, long dlAddr) {
930 if (eth.getPayload() instanceof ARP) {
931 ARP arp = (ARP) eth.getPayload();
932 if ((arp.getProtocolType() == ARP.PROTO_TYPE_IP)
933 && (toLong(arp.getSenderHardwareAddress()) == dlAddr)) {
934 return toIPv4Address(arp.getSenderProtocolAddress());
941 * Parse an entity from an {@link Ethernet} packet.
944 * the packet to parse
946 * the switch on which the packet arrived
948 * the original packetin
949 * @return the entity from the packet
951 protected Entity getSourceEntityFromPacket(Ethernet eth, NodeConnector port) {
952 byte[] dlAddrArr = eth.getSourceMACAddress();
953 long dlAddr = toLong(dlAddrArr);
955 // Ignore broadcast/multicast source
956 if ((dlAddrArr[0] & 0x1) != 0)
959 // XXX missing functionality
961 int nwSrc = getSrcNwAddr(eth, dlAddr);
962 return new Entity(dlAddr, null, ((nwSrc != 0) ? nwSrc : null), port,
967 * Learn device from ARP data in scenarios where the Ethernet source MAC is
968 * different from the sender hardware address in ARP data.
970 protected void learnDeviceFromArpResponseData(Ethernet eth,
971 NodeConnector port) {
973 if (!(eth.getPayload() instanceof ARP))
975 ARP arp = (ARP) eth.getPayload();
977 byte[] dlAddrArr = eth.getSourceMACAddress();
978 long dlAddr = toLong(dlAddrArr);
980 byte[] senderHardwareAddr = arp.getSenderHardwareAddress();
981 long senderAddr = toLong(senderHardwareAddr);
983 if (dlAddr == senderAddr)
986 // Ignore broadcast/multicast source
987 if ((senderHardwareAddr[0] & 0x1) != 0)
990 // short vlan = eth.getVlanID();
991 int nwSrc = toIPv4Address(arp.getSenderProtocolAddress());
993 Entity e = new Entity(senderAddr, null, ((nwSrc != 0) ? nwSrc : null),
996 learnDeviceByEntity(e);
1000 * Get a (partial) entity for the destination from the packet.
1005 // protected Entity getDestEntityFromPacket(Ethernet eth) {
1006 // byte[] dlAddrArr = eth.getDestinationMACAddress();
1007 // long dlAddr = Ethernet.toLong(dlAddrArr);
1008 // short vlan = eth.getVlanID();
1011 // // Ignore broadcast/multicast destination
1012 // if ((dlAddrArr[0] & 0x1) != 0)
1015 // if (eth.getPayload() instanceof IPv4) {
1016 // IPv4 ipv4 = (IPv4) eth.getPayload();
1017 // nwDst = ipv4.getDestinationAddress();
1020 // return new Entity(dlAddr,
1021 // ((vlan >= 0) ? vlan : null),
1022 // ((nwDst != 0) ? nwDst : null),
1029 * Parse an entity from an OFMatchWithSwDpid.
1031 * @param ofmWithSwDpid
1032 * @return the entity from the packet
1034 // private Entity getEntityFromFlowMod(OFMatchWithSwDpid ofmWithSwDpid,
1035 // boolean isSource) {
1036 // byte[] dlAddrArr = ofmWithSwDpid.getOfMatch().getDataLayerSource();
1037 // int nwSrc = ofmWithSwDpid.getOfMatch().getNetworkSource();
1039 // dlAddrArr = ofmWithSwDpid.getOfMatch().getDataLayerDestination();
1040 // nwSrc = ofmWithSwDpid.getOfMatch().getNetworkDestination();
1043 // long dlAddr = Ethernet.toLong(dlAddrArr);
1045 // // Ignore broadcast/multicast source
1046 // if ((dlAddrArr[0] & 0x1) != 0)
1049 // Long swDpid = null;
1050 // Short inPort = null;
1053 // swDpid = ofmWithSwDpid.getSwitchDataPathId();
1054 // inPort = ofmWithSwDpid.getOfMatch().getInputPort();
1057 // /**for the new flow cache design, the flow mods retrived are not always
1058 // from the source, learn AP should be disabled --meiyang*/
1059 // boolean learnap = false;
1061 // * if (swDpid == null ||
1062 // inPort == null ||
1063 // !isValidAttachmentPoint(swDpid, inPort)) {
1064 // // If this is an internal port or we otherwise don't want
1065 // // to learn on these ports. In the future, we should
1066 // // handle this case by labeling flows with something that
1067 // // will give us the entity class. For now, we'll do our
1068 // // best assuming attachment point information isn't used
1069 // // as a key field.
1074 // short vlan = ofmWithSwDpid.getOfMatch().getDataLayerVirtualLan();
1075 // return new Entity(dlAddr,
1076 // ((vlan >= 0) ? vlan : null),
1077 // ((nwSrc != 0) ? nwSrc : null),
1078 // (learnap ? swDpid : null),
1079 // (learnap ? (int)inPort : null),
1084 * Look up a {@link Device} based on the provided {@link Entity}. We first
1085 * check the primary index. If we do not find an entry there we classify the
1086 * device into its IEntityClass and query the classIndex. This implies that
1087 * all key field of the current IEntityClassifier must be present in the
1088 * entity for the lookup to succeed!
1091 * the entity to search for
1092 * @return The {@link Device} object if found
1094 protected Device findDeviceByEntity(Entity entity) {
1095 // Look up the fully-qualified entity to see if it already
1096 // exists in the primary entity index.
1097 Long deviceKey = primaryIndex.findByEntity(entity);
1098 IEntityClass entityClass = null;
1100 if (deviceKey == null) {
1101 // If the entity does not exist in the primary entity index,
1102 // use the entity classifier for find the classes for the
1103 // entity. Look up the entity in the returned class'
1104 // class entity index.
1105 entityClass = entityClassifier.classifyEntity(entity);
1106 if (entityClass == null) {
1109 ClassState classState = getClassState(entityClass);
1111 if (classState.classIndex != null) {
1112 deviceKey = classState.classIndex.findByEntity(entity);
1115 if (deviceKey == null)
1117 return deviceMap.get(deviceKey);
1121 * Get a destination device using entity fields that corresponds with the
1122 * given source device. The source device is important since there could be
1123 * ambiguity in the destination device without the attachment point
1127 * the source device's entity class. The returned destination
1128 * will be in the same entity class as the source.
1130 * the entity to look up
1131 * @return an {@link Device} or null if no device is found.
1133 protected Device findDestByEntity(IEntityClass reference, Entity dstEntity) {
1135 // Look up the fully-qualified entity to see if it
1136 // exists in the primary entity index
1137 Long deviceKey = primaryIndex.findByEntity(dstEntity);
1139 if (deviceKey == null) {
1140 // This could happen because:
1141 // 1) no destination known, or a broadcast destination
1142 // 2) if we have attachment point key fields since
1143 // attachment point information isn't available for
1144 // destination devices.
1145 // For the second case, we'll need to match up the
1146 // destination device with the class of the source
1148 ClassState classState = getClassState(reference);
1149 if (classState.classIndex == null) {
1152 deviceKey = classState.classIndex.findByEntity(dstEntity);
1154 if (deviceKey == null)
1156 return deviceMap.get(deviceKey);
1160 * Look up a {@link Device} within a particular entity class based on the
1161 * provided {@link Entity}.
1164 * the entity class to search for the entity
1166 * the entity to search for
1167 * @return The {@link Device} object if found private Device
1168 * findDeviceInClassByEntity(IEntityClass clazz, Entity entity) { //
1169 * XXX - TODO throw new UnsupportedOperationException(); }
1173 * Look up a {@link Device} based on the provided {@link Entity}. Also
1174 * learns based on the new entity, and will update existing devices as
1178 * the {@link Entity}
1179 * @return The {@link Device} object if found
1181 protected Device learnDeviceByEntity(Entity entity) {
1182 logger.info("Primary index {}", primaryIndex);
1183 ArrayList<Long> deleteQueue = null;
1184 LinkedList<DeviceUpdate> deviceUpdates = null;
1185 Device oldDevice = null;
1186 Device device = null;
1188 // we may need to restart the learning process if we detect
1189 // concurrent modification. Note that we ensure that at least
1190 // one thread should always succeed so we don't get into infinite
1193 deviceUpdates = null;
1195 // Look up the fully-qualified entity to see if it already
1196 // exists in the primary entity index.
1197 Long deviceKey = primaryIndex.findByEntity(entity);
1198 IEntityClass entityClass = null;
1200 if (deviceKey == null) {
1201 // If the entity does not exist in the primary entity index,
1202 // use the entity classifier for find the classes for the
1203 // entity. Look up the entity in the returned class'
1204 // class entity index.
1205 entityClass = entityClassifier.classifyEntity(entity);
1206 if (entityClass == null) {
1207 // could not classify entity. No device
1211 ClassState classState = getClassState(entityClass);
1213 if (classState.classIndex != null) {
1214 deviceKey = classState.classIndex.findByEntity(entity);
1217 if (deviceKey != null) {
1218 // If the primary or secondary index contains the entity
1219 // use resulting device key to look up the device in the
1220 // device map, and use the referenced Device below.
1221 device = deviceMap.get(deviceKey);
1222 if (device == null) {
1223 // This can happen due to concurrent modification
1224 if (logger.isDebugEnabled()) {
1225 logger.debug("No device for deviceKey {} while "
1226 + "while processing entity {}", deviceKey,
1229 // if so, then try again till we don't even get the device
1231 // and so we recreate the device
1235 // If the secondary index does not contain the entity,
1236 // create a new Device object containing the entity, and
1237 // generate a new device ID if the the entity is on an
1238 // attachment point port. Otherwise ignore.
1239 if (entity.hasSwitchPort()
1240 && !isValidAttachmentPoint(entity.getPort())) {
1241 // debugCounters.updateCounter(CNT_DEVICE_ON_INTERAL_PORT_NOT_LEARNED);
1242 if (logger.isDebugEnabled()) {
1243 logger.debug("Not learning new device on internal"
1244 + " link: {}", entity);
1249 // Before we create the new device also check if
1250 // the entity is allowed (e.g., for spoofing protection)
1251 if (!isEntityAllowed(entity, entityClass)) {
1252 // debugCounters.updateCounter(CNT_PACKET_NOT_ALLOWED);
1253 if (logger.isDebugEnabled()) {
1254 logger.debug("PacketIn is not allowed {} {}",
1255 entityClass.getName(), entity);
1260 synchronized (deviceKeyLock) {
1261 deviceKey = Long.valueOf(deviceKeyCounter++);
1263 device = allocateDevice(deviceKey, entity, entityClass);
1265 // Add the new device to the primary map with a simple put
1266 deviceMap.put(deviceKey, device);
1269 if (!updateIndices(device, deviceKey)) {
1270 if (deleteQueue == null)
1271 deleteQueue = new ArrayList<Long>();
1272 deleteQueue.add(deviceKey);
1276 updateSecondaryIndices(entity, entityClass, deviceKey);
1278 // We need to count and log here. If we log earlier we could
1279 // hit a concurrent modification and restart the dev creation
1280 // and potentially count the device twice.
1281 // debugCounters.updateCounter(CNT_NEW_DEVICE);
1282 if (logger.isDebugEnabled()) {
1284 "New device created: {} deviceKey={}, entity={}",
1285 new Object[] { device, deviceKey, entity });
1287 // generate new device update
1288 deviceUpdates = updateUpdates(deviceUpdates, new DeviceUpdate(
1289 device, ADD, null));
1293 // if it gets here, we have a pre-existing Device for this Entity
1294 if (!isEntityAllowed(entity, device.getEntityClass())) {
1295 // debugCounters.updateCounter(CNT_PACKET_NOT_ALLOWED);
1296 if (logger.isDebugEnabled()) {
1297 logger.info("PacketIn is not allowed {} {}", device
1298 .getEntityClass().getName(), entity);
1302 // If this is not an attachment point port we don't learn the new
1304 // and don't update indexes. But we do allow the device to continue
1307 if (entity.hasSwitchPort()
1308 && !isValidAttachmentPoint(entity.getPort())) {
1309 // debugCounters.updateCounter(CNT_PACKET_ON_INTERNAL_PORT_FOR_KNOWN_DEVICE);
1312 int entityindex = -1;
1313 if ((entityindex = device.entityIndex(entity)) >= 0) {
1314 // Entity already exists
1315 // update timestamp on the found entity
1316 Date lastSeen = entity.getLastSeenTimestamp();
1317 if (lastSeen == null) {
1318 lastSeen = new Date();
1319 entity.setLastSeenTimestamp(lastSeen);
1321 device.entities[entityindex].setLastSeenTimestamp(lastSeen);
1322 // we break the loop after checking for changes to the AP
1324 // New entity for this device
1325 // compute the insertion point for the entity.
1326 // see Arrays.binarySearch()
1327 entityindex = -(entityindex + 1);
1328 Device newDevice = allocateDevice(device, entity, entityindex);
1331 EnumSet<DeviceField> changedFields = findChangedFields(device,
1334 // update the device map with a replace call
1335 boolean res = deviceMap.replace(deviceKey, device, newDevice);
1336 // If replace returns false, restart the process from the
1337 // beginning (this implies another thread concurrently
1338 // modified this Device).
1344 if (!updateIndices(device, deviceKey)) {
1347 updateSecondaryIndices(entity, device.getEntityClass(),
1350 // We need to count here after all the possible "continue"
1351 // statements in this branch
1352 // debugCounters.updateCounter(CNT_NEW_ENTITY);
1353 if (changedFields.size() > 0) {
1354 // debugCounters.updateCounter(CNT_DEVICE_CHANGED);
1355 deviceUpdates = updateUpdates(deviceUpdates,
1356 new DeviceUpdate(newDevice, CHANGE, changedFields));
1358 // we break the loop after checking for changed AP
1360 // Update attachment point (will only be hit if the device
1361 // already existed and no concurrent modification)
1362 if (entity.hasSwitchPort()) {
1363 boolean moved = device.updateAttachmentPoint(entity.getPort(),
1364 entity.getLastSeenTimestamp().getTime());
1365 // TODO: use update mechanism instead of sending the
1366 // notification directly
1368 // we count device moved events in
1369 // sendDeviceMovedNotification()
1370 sendDeviceMovedNotification(device, oldDevice);
1371 if (logger.isTraceEnabled()) {
1372 logger.trace("Device moved: attachment points {},"
1373 + "entities {}", device.attachmentPoints,
1377 if (logger.isTraceEnabled()) {
1378 logger.trace("Device attachment point updated: "
1379 + "attachment points {}," + "entities {}",
1380 device.attachmentPoints, device.entities);
1387 if (deleteQueue != null) {
1388 for (Long l : deleteQueue) {
1389 Device dev = deviceMap.get(l);
1390 this.deleteDevice(dev);
1394 processUpdates(deviceUpdates);
1395 // deviceSyncManager.storeDeviceThrottled(device);
1400 protected boolean isEntityAllowed(Entity entity, IEntityClass entityClass) {
1404 protected EnumSet<DeviceField> findChangedFields(Device device,
1406 EnumSet<DeviceField> changedFields = EnumSet.of(DeviceField.IPV4,
1407 DeviceField.VLAN, DeviceField.SWITCHPORT);
1409 if (newEntity.getIpv4Address() == null)
1410 changedFields.remove(DeviceField.IPV4);
1411 if (newEntity.getVlan() == null)
1412 changedFields.remove(DeviceField.VLAN);
1413 if (newEntity.getPort() == null)
1414 changedFields.remove(DeviceField.SWITCHPORT);
1416 if (changedFields.size() == 0)
1417 return changedFields;
1419 for (Entity entity : device.getEntities()) {
1420 if (newEntity.getIpv4Address() == null
1421 || (entity.getIpv4Address() != null && entity
1423 .equals(newEntity.getIpv4Address())))
1424 changedFields.remove(DeviceField.IPV4);
1425 if (newEntity.getVlan() == null
1426 || (entity.getVlan() != null && entity.getVlan().equals(
1427 newEntity.getVlan())))
1428 changedFields.remove(DeviceField.VLAN);
1429 if (newEntity.getPort() == null
1430 || (entity.getPort() != null && entity.getPort().equals(
1431 newEntity.getPort())))
1432 changedFields.remove(DeviceField.SWITCHPORT);
1435 return changedFields;
1439 * Send update notifications to listeners
1442 * the updates to process.
1444 protected void processUpdates(Queue<DeviceUpdate> updates) {
1445 if (updates == null)
1447 DeviceUpdate update = null;
1448 while (null != (update = updates.poll())) {
1449 if (logger.isTraceEnabled()) {
1450 logger.trace("Dispatching device update: {}", update);
1452 // if (update.change == DeviceUpdate.Change.DELETE)
1453 // deviceSyncManager.removeDevice(update.device);
1455 // deviceSyncManager.storeDevice(update.device);
1456 List<IDeviceListener> listeners = deviceListeners
1457 .getOrderedListeners();
1458 notifyListeners(listeners, update);
1462 protected void notifyListeners(List<IDeviceListener> listeners,
1463 DeviceUpdate update) {
1464 if (listeners == null && newHostNotify.isEmpty()) {
1468 * TODO: IfNewHostNotify is needed for current controller API.
1469 * Adding logic so that existing apps (like SimpleForwardingManager)
1470 * work. IDeviceListener adds additional methods and uses IListener's
1471 * callback ordering. The two interfaces need to be merged.
1474 for (IfNewHostNotify notify : newHostNotify){
1475 switch (update.change) {
1477 notify.notifyHTClient(update.device.toHostNodeConnector());
1480 notify.notifyHTClientHostRemoved(update.device.toHostNodeConnector());
1487 * TODO: Remove this section as IDeviceListener functionality gets
1488 * merged with IfNewHostNotify
1490 for (IDeviceListener listener : listeners) {
1491 switch (update.change) {
1493 listener.deviceAdded(update.device);
1496 listener.deviceRemoved(update.device);
1499 for (DeviceField field : update.fieldsChanged) {
1502 listener.deviceIPV4AddrChanged(update.device);
1505 // listener.deviceMoved(update.device);
1508 listener.deviceVlanChanged(update.device);
1511 logger.debug("Unknown device field changed {}",
1512 update.fieldsChanged.toString());
1522 * Check if the entity e has all the keyFields set. Returns false if not
1527 * the key fields to check e against
1530 protected boolean allKeyFieldsPresent(Entity e,
1531 EnumSet<DeviceField> keyFields) {
1532 for (DeviceField f : keyFields) {
1535 // MAC address is always present
1538 if (e.getIpv4Address() == null)
1542 if (e.getPort() == null)
1546 // FIXME: vlan==null is ambiguous: it can mean: not present
1548 // if (e.vlan == null) return false;
1551 // we should never get here. unless somebody extended
1553 throw new IllegalStateException();
1559 private LinkedList<DeviceUpdate> updateUpdates(
1560 LinkedList<DeviceUpdate> list, DeviceUpdate update) {
1564 list = new LinkedList<DeviceUpdate>();
1571 * Get the secondary index for a class. Will return null if the secondary
1572 * index was created concurrently in another thread.
1575 * the class for the index
1578 private ClassState getClassState(IEntityClass clazz) {
1579 ClassState classState = classStateMap.get(clazz.getName());
1580 if (classState != null)
1583 classState = new ClassState(clazz);
1584 ClassState r = classStateMap.putIfAbsent(clazz.getName(), classState);
1593 * Update both the primary and class indices for the provided device. If the
1594 * update fails because of an concurrent update, will return false.
1597 * the device to update
1599 * the device key for the device
1600 * @return true if the update succeeded, false otherwise.
1602 private boolean updateIndices(Device device, Long deviceKey) {
1603 if (!primaryIndex.updateIndex(device, deviceKey)) {
1606 IEntityClass entityClass = device.getEntityClass();
1607 ClassState classState = getClassState(entityClass);
1609 if (classState.classIndex != null) {
1610 if (!classState.classIndex.updateIndex(device, deviceKey))
1617 * Update the secondary indices for the given entity and associated entity
1621 * the entity to update
1622 * @param entityClass
1623 * the entity class for the entity
1625 * the device key to set up
1627 private void updateSecondaryIndices(Entity entity,
1628 IEntityClass entityClass, Long deviceKey) {
1629 for (DeviceIndex index : secondaryIndexMap.values()) {
1630 index.updateIndex(entity, deviceKey);
1632 ClassState state = getClassState(entityClass);
1633 for (DeviceIndex index : state.secondaryIndexMap.values()) {
1634 index.updateIndex(entity, deviceKey);
1639 * Clean up expired entities/devices
1641 protected void cleanupEntities() {
1642 // debugCounters.updateCounter(CNT_CLEANUP_ENTITIES_RUNS);
1644 Calendar c = Calendar.getInstance();
1645 c.add(Calendar.MILLISECOND, -ENTITY_TIMEOUT);
1646 Date cutoff = c.getTime();
1648 ArrayList<Entity> toRemove = new ArrayList<Entity>();
1649 ArrayList<Entity> toKeep = new ArrayList<Entity>();
1651 Iterator<Device> diter = deviceMap.values().iterator();
1652 LinkedList<DeviceUpdate> deviceUpdates = new LinkedList<DeviceUpdate>();
1654 while (diter.hasNext()) {
1655 Device d = diter.next();
1658 deviceUpdates.clear();
1661 for (Entity e : d.getEntities()) {
1662 if (e.getLastSeenTimestamp() != null
1663 && 0 > e.getLastSeenTimestamp().compareTo(cutoff)) {
1664 // individual entity needs to be removed
1670 if (toRemove.size() == 0) {
1674 // debugCounters.updateCounter(CNT_ENTITY_REMOVED_TIMEOUT);
1675 for (Entity e : toRemove) {
1676 removeEntity(e, d.getEntityClass(), d.getDeviceKey(),
1680 if (toKeep.size() > 0) {
1681 Device newDevice = allocateDevice(d.getDeviceKey(),
1682 d.getDHCPClientName(), d.oldAPs,
1683 d.attachmentPoints, toKeep, d.getEntityClass());
1685 EnumSet<DeviceField> changedFields = EnumSet
1686 .noneOf(DeviceField.class);
1687 for (Entity e : toRemove) {
1688 changedFields.addAll(findChangedFields(newDevice, e));
1690 DeviceUpdate update = null;
1691 if (changedFields.size() > 0) {
1692 update = new DeviceUpdate(d, CHANGE, changedFields);
1695 if (!deviceMap.replace(newDevice.getDeviceKey(), d,
1697 // concurrent modification; try again
1698 // need to use device that is the map now for the next
1700 d = deviceMap.get(d.getDeviceKey());
1704 if (update != null) {
1705 // need to count after all possibly continue stmts in
1707 // debugCounters.updateCounter(CNT_DEVICE_CHANGED);
1708 deviceUpdates.add(update);
1711 DeviceUpdate update = new DeviceUpdate(d, DELETE, null);
1712 if (!deviceMap.remove(d.getDeviceKey(), d)) {
1713 // concurrent modification; try again
1714 // need to use device that is the map now for the next
1716 d = deviceMap.get(d.getDeviceKey());
1719 // debugCounters.updateCounter(CNT_DEVICE_DELETED);
1721 deviceUpdates.add(update);
1723 processUpdates(deviceUpdates);
1729 protected void removeEntity(Entity removed, IEntityClass entityClass,
1730 Long deviceKey, Collection<Entity> others) {
1731 // Don't count in this method. This method CAN BE called to clean-up
1732 // after concurrent device adds/updates and thus counting here
1734 for (DeviceIndex index : secondaryIndexMap.values()) {
1735 index.removeEntityIfNeeded(removed, deviceKey, others);
1737 ClassState classState = getClassState(entityClass);
1738 for (DeviceIndex index : classState.secondaryIndexMap.values()) {
1739 index.removeEntityIfNeeded(removed, deviceKey, others);
1742 primaryIndex.removeEntityIfNeeded(removed, deviceKey, others);
1744 if (classState.classIndex != null) {
1745 classState.classIndex.removeEntityIfNeeded(removed, deviceKey,
1751 * method to delete a given device, remove all entities first and then
1752 * finally delete the device itself.
1756 protected void deleteDevice(Device device) {
1757 // Don't count in this method. This method CAN BE called to clean-up
1758 // after concurrent device adds/updates and thus counting here
1760 ArrayList<Entity> emptyToKeep = new ArrayList<Entity>();
1761 for (Entity entity : device.getEntities()) {
1762 this.removeEntity(entity, device.getEntityClass(),
1763 device.getDeviceKey(), emptyToKeep);
1765 if (!deviceMap.remove(device.getDeviceKey(), device)) {
1766 if (logger.isDebugEnabled())
1767 logger.debug("device map does not have this device -"
1768 + device.toString());
1772 private EnumSet<DeviceField> getEntityKeys(Long macAddress, Short vlan,
1773 Integer ipv4Address, NodeConnector port) {
1774 // FIXME: vlan==null is a valid search. Need to handle this
1775 // case correctly. Note that the code will still work correctly.
1776 // But we might do a full device search instead of using an index.
1777 EnumSet<DeviceField> keys = EnumSet.noneOf(DeviceField.class);
1778 if (macAddress != null)
1779 keys.add(DeviceField.MAC);
1781 keys.add(DeviceField.VLAN);
1782 if (ipv4Address != null)
1783 keys.add(DeviceField.IPV4);
1785 keys.add(DeviceField.SWITCHPORT);
1789 protected Iterator<Device> queryClassByEntity(IEntityClass clazz,
1790 EnumSet<DeviceField> keyFields, Entity entity) {
1791 ClassState classState = getClassState(clazz);
1792 DeviceIndex index = classState.secondaryIndexMap.get(keyFields);
1794 return Collections.<Device> emptySet().iterator();
1795 return new DeviceIndexInterator(this, index.queryByEntity(entity));
1798 protected Device allocateDevice(Long deviceKey, Entity entity,
1799 IEntityClass entityClass) {
1800 return new Device(this, deviceKey, entity, entityClass);
1804 protected Device allocateDevice(Long deviceKey, String dhcpClientName,
1805 List<AttachmentPoint> aps, List<AttachmentPoint> trueAPs,
1806 Collection<Entity> entities, IEntityClass entityClass) {
1807 return new Device(this, deviceKey, dhcpClientName, aps, trueAPs,
1808 entities, entityClass);
1811 protected Device allocateDevice(Device device, Entity entity,
1812 int insertionpoint) {
1813 return new Device(device, entity, insertionpoint);
1817 protected Device allocateDevice(Device device, Set<Entity> entities) {
1818 List<AttachmentPoint> newPossibleAPs = new ArrayList<AttachmentPoint>();
1819 List<AttachmentPoint> newAPs = new ArrayList<AttachmentPoint>();
1820 for (Entity entity : entities) {
1821 if (entity.getPort() != null) {
1822 AttachmentPoint aP = new AttachmentPoint(entity.getPort(), 0);
1823 newPossibleAPs.add(aP);
1826 if (device.attachmentPoints != null) {
1827 for (AttachmentPoint oldAP : device.attachmentPoints) {
1828 if (newPossibleAPs.contains(oldAP)) {
1833 if (newAPs.isEmpty())
1835 Device d = new Device(this, device.getDeviceKey(),
1836 device.getDHCPClientName(), newAPs, null, entities,
1837 device.getEntityClass());
1838 d.updateAttachmentPoint();
1842 // *********************
1843 // ITopologyManagerAware
1844 // *********************
1847 public void edgeUpdate(List<TopoEdgeUpdate> topoedgeupdateList) {
1848 Iterator<Device> diter = deviceMap.values().iterator();
1850 while (diter.hasNext()) {
1851 Device d = diter.next();
1852 if (d.updateAttachmentPoint()) {
1853 if (logger.isDebugEnabled()) {
1854 logger.debug("Attachment point changed for device: {}", d);
1856 sendDeviceMovedNotification(d);
1862 public void edgeOverUtilized(Edge edge) {
1867 public void edgeUtilBackToNormal(Edge edge) {
1871 // *********************
1872 // IEntityClassListener
1873 // *********************
1876 public void entityClassChanged(Set<String> entityClassNames) {
1878 * iterate through the devices, reclassify the devices that belong to
1879 * these entity class names
1881 Iterator<Device> diter = deviceMap.values().iterator();
1882 while (diter.hasNext()) {
1883 Device d = diter.next();
1884 if (d.getEntityClass() == null
1885 || entityClassNames.contains(d.getEntityClass().getName()))
1886 reclassifyDevice(d);
1894 * Send update notifications to listeners
1897 * the updates to process.
1899 protected void sendDeviceMovedNotification(Device d) {
1900 // debugCounters.updateCounter(CNT_DEVICE_MOVED);
1901 // deviceSyncManager.storeDevice(d);
1902 List<IDeviceListener> listeners = deviceListeners.getOrderedListeners();
1903 if (listeners != null) {
1904 for (IDeviceListener listener : listeners) {
1905 listener.deviceMoved(d);
1910 * Send update notifications to listeners.
1911 * IfNewHostNotify listeners need to remove old device and add new device.
1915 protected void sendDeviceMovedNotification(Device device, Device oldDevice){
1916 for (IfNewHostNotify notify : newHostNotify){
1917 notify.notifyHTClientHostRemoved(oldDevice.toHostNodeConnector());
1918 notify.notifyHTClient(device.toHostNodeConnector());
1920 sendDeviceMovedNotification(device);
1924 * this method will reclassify and reconcile a device - possibilities are -
1925 * create new device(s), remove entities from this device. If the device
1926 * entity class did not change then it returns false else true.
1930 protected boolean reclassifyDevice(Device device) {
1931 // first classify all entities of this device
1932 if (device == null) {
1933 logger.debug("In reclassify for null device");
1936 boolean needToReclassify = false;
1937 for (Entity entity : device.entities) {
1938 IEntityClass entityClass = this.entityClassifier
1939 .classifyEntity(entity);
1940 if (entityClass == null || device.getEntityClass() == null) {
1941 needToReclassify = true;
1944 if (!entityClass.getName()
1945 .equals(device.getEntityClass().getName())) {
1946 needToReclassify = true;
1950 if (needToReclassify == false) {
1954 // debugCounters.updateCounter(CNT_DEVICE_RECLASSIFY_DELETE);
1955 LinkedList<DeviceUpdate> deviceUpdates = new LinkedList<DeviceUpdate>();
1956 // delete this device and then re-learn all the entities
1957 this.deleteDevice(device);
1958 deviceUpdates.add(new DeviceUpdate(device, DeviceUpdate.Change.DELETE,
1960 if (!deviceUpdates.isEmpty())
1961 processUpdates(deviceUpdates);
1962 for (Entity entity : device.entities) {
1963 this.learnDeviceByEntity(entity);
1969 * For testing: sets the interval between writes of the same device to the
1974 // void setSyncStoreWriteInterval(int intervalMs) {
1975 // this.syncStoreWriteIntervalMs = intervalMs;
1979 * For testing: sets the time between transition to MASTER and consolidate
1984 // void setInitialSyncStoreConsolidateMs(int intervalMs) {
1985 // this.initialSyncStoreConsolidateMs = intervalMs;
1988 private long toLong(byte[] address) {
1990 for (int i = 0; i < 6; i++) {
1991 long t = (address[i] & 0xffL) << ((5 - i) * 8);
1998 * Accepts an IPv4 address in a byte array and returns the corresponding
1999 * 32-bit integer value.
2004 private static int toIPv4Address(byte[] ipAddress) {
2006 for (int i = 0; i < 4; i++) {
2007 int t = (ipAddress[i] & 0xff) << ((3 - i) * 8);
2013 private void registerDeviceManagerDebugCounters() {
2015 * XXX Missing functionality if (debugCounters == null) {
2016 * logger.error("Debug Counter Service not found."); debugCounters = new
2017 * NullDebugCounter(); return; }
2018 * debugCounters.registerCounter(CNT_INCOMING,
2019 * "All incoming packets seen by this module",
2020 * CounterType.ALWAYS_COUNT);
2021 * debugCounters.registerCounter(CNT_RECONCILE_REQUEST,
2022 * "Number of flows that have been received for reconciliation by " +
2023 * "this module", CounterType.ALWAYS_COUNT);
2024 * debugCounters.registerCounter(CNT_RECONCILE_NO_SOURCE,
2025 * "Number of flow reconcile events that failed because no source " +
2026 * "device could be identified", CounterType.WARN); // is this really a
2027 * warning debugCounters.registerCounter(CNT_RECONCILE_NO_DEST,
2028 * "Number of flow reconcile events that failed because no " +
2029 * "destination device could be identified", CounterType.WARN); // is
2030 * this really a warning
2031 * debugCounters.registerCounter(CNT_BROADCAST_SOURCE,
2032 * "Number of packetIns that were discarded because the source " +
2033 * "MAC was broadcast or multicast", CounterType.WARN);
2034 * debugCounters.registerCounter(CNT_NO_SOURCE,
2035 * "Number of packetIns that were discarded because the " +
2036 * "could not identify a source device. This can happen if a " +
2037 * "packet is not allowed, appears on an illegal port, does not " +
2038 * "have a valid address space, etc.", CounterType.WARN);
2039 * debugCounters.registerCounter(CNT_NO_DEST,
2040 * "Number of packetIns that did not have an associated " +
2041 * "destination device. E.g., because the destination MAC is " +
2042 * "broadcast/multicast or is not yet known to the controller.",
2043 * CounterType.ALWAYS_COUNT);
2044 * debugCounters.registerCounter(CNT_DHCP_CLIENT_NAME_SNOOPED,
2045 * "Number of times a DHCP client name was snooped from a " +
2046 * "packetIn.", CounterType.ALWAYS_COUNT);
2047 * debugCounters.registerCounter(CNT_DEVICE_ON_INTERAL_PORT_NOT_LEARNED,
2048 * "Number of times packetIn was received on an internal port and" +
2049 * "no source device is known for the source MAC. The packetIn is " +
2050 * "discarded.", CounterType.WARN);
2051 * debugCounters.registerCounter(CNT_PACKET_NOT_ALLOWED,
2052 * "Number of times a packetIn was not allowed due to spoofing " +
2053 * "protection configuration.", CounterType.WARN); // is this really a
2054 * warning? debugCounters.registerCounter(CNT_NEW_DEVICE,
2055 * "Number of times a new device was learned",
2056 * CounterType.ALWAYS_COUNT); debugCounters.registerCounter(
2057 * CNT_PACKET_ON_INTERNAL_PORT_FOR_KNOWN_DEVICE,
2058 * "Number of times a packetIn was received on an internal port " +
2059 * "for a known device.", CounterType.ALWAYS_COUNT);
2060 * debugCounters.registerCounter(CNT_NEW_ENTITY,
2061 * "Number of times a new entity was learned for an existing device",
2062 * CounterType.ALWAYS_COUNT);
2063 * debugCounters.registerCounter(CNT_DEVICE_CHANGED,
2064 * "Number of times device properties have changed",
2065 * CounterType.ALWAYS_COUNT);
2066 * debugCounters.registerCounter(CNT_DEVICE_MOVED,
2067 * "Number of times devices have moved", CounterType.ALWAYS_COUNT);
2068 * debugCounters.registerCounter(CNT_CLEANUP_ENTITIES_RUNS,
2069 * "Number of times the entity cleanup task has been run",
2070 * CounterType.ALWAYS_COUNT);
2071 * debugCounters.registerCounter(CNT_ENTITY_REMOVED_TIMEOUT,
2072 * "Number of times entities have been removed due to timeout " +
2073 * "(entity has been inactive for " + ENTITY_TIMEOUT/1000 + "s)",
2074 * CounterType.ALWAYS_COUNT);
2075 * debugCounters.registerCounter(CNT_DEVICE_DELETED,
2076 * "Number of devices that have been removed due to inactivity",
2077 * CounterType.ALWAYS_COUNT);
2078 * debugCounters.registerCounter(CNT_DEVICE_RECLASSIFY_DELETE,
2079 * "Number of devices that required reclassification and have been " +
2080 * "temporarily delete for reclassification", CounterType.ALWAYS_COUNT);
2081 * debugCounters.registerCounter(CNT_DEVICE_STORED,
2082 * "Number of device entries written or updated to the sync store",
2083 * CounterType.ALWAYS_COUNT);
2084 * debugCounters.registerCounter(CNT_DEVICE_STORE_THROTTLED,
2085 * "Number of times a device update to the sync store was " +
2086 * "requested but not performed because the same device entities " +
2087 * "have recently been updated already", CounterType.ALWAYS_COUNT);
2088 * debugCounters.registerCounter(CNT_DEVICE_REMOVED_FROM_STORE,
2089 * "Number of devices that were removed from the sync store " +
2090 * "because the local controller removed the device due to " +
2091 * "inactivity", CounterType.ALWAYS_COUNT);
2092 * debugCounters.registerCounter(CNT_SYNC_EXCEPTION,
2093 * "Number of times an operation on the sync store resulted in " +
2094 * "sync exception", CounterType.WARN); // it this an error?
2095 * debugCounters.registerCounter(CNT_DEVICES_FROM_STORE,
2096 * "Number of devices that were read from the sync store after " +
2097 * "the local controller transitioned from SLAVE to MASTER",
2098 * CounterType.ALWAYS_COUNT);
2099 * debugCounters.registerCounter(CNT_CONSOLIDATE_STORE_RUNS,
2100 * "Number of times the task to consolidate entries in the " +
2101 * "store witch live known devices has been run",
2102 * CounterType.ALWAYS_COUNT);
2103 * debugCounters.registerCounter(CNT_CONSOLIDATE_STORE_DEVICES_REMOVED,
2104 * "Number of times a device has been removed from the sync " +
2105 * "store because no corresponding live device is known. " +
2106 * "This indicates a remote controller still writing device " +
2107 * "entries despite the local controller being MASTER or an " +
2108 * "incosistent store update from the local controller.",
2109 * CounterType.WARN);
2110 * debugCounters.registerCounter(CNT_TRANSITION_TO_MASTER,
2111 * "Number of times this controller has transitioned from SLAVE " +
2112 * "to MASTER role. Will be 0 or 1.", CounterType.ALWAYS_COUNT);
2117 public HostNodeConnector hostFind(InetAddress networkAddress) {
2118 // TODO Auto-generated method stub
2123 public HostNodeConnector hostQuery(InetAddress networkAddress) {
2124 // TODO Auto-generated method stub
2129 public Future<HostNodeConnector> discoverHost(InetAddress networkAddress) {
2130 // TODO Auto-generated method stub
2135 public List<List<String>> getHostNetworkHierarchy(InetAddress hostAddress) {
2136 // TODO Auto-generated method stub
2141 public Set<HostNodeConnector> getAllHosts() {
2142 Collection<Device> devices = Collections
2143 .unmodifiableCollection(deviceMap.values());
2144 Iterator<Device> i = devices.iterator();
2145 Set<HostNodeConnector> nc = new HashSet<HostNodeConnector>();
2146 while (i.hasNext()) {
2147 Device device = i.next();
2148 nc.add(device.toHostNodeConnector());
2154 public Set<HostNodeConnector> getActiveStaticHosts() {
2155 Collection<Device> devices = Collections
2156 .unmodifiableCollection(deviceMap.values());
2157 Iterator<Device> i = devices.iterator();
2158 Set<HostNodeConnector> nc = new HashSet<HostNodeConnector>();
2159 while (i.hasNext()) {
2160 Device device = i.next();
2161 if(device.isStaticHost())
2162 nc.add(device.toHostNodeConnector());
2168 public Set<HostNodeConnector> getInactiveStaticHosts() {
2169 // TODO Auto-generated method stub
2174 public Status addStaticHost(String networkAddress, String dataLayerAddress,
2175 NodeConnector nc, String vlan) {
2176 Long mac = HexEncode.stringToLong(dataLayerAddress);
2178 InetAddress addr = InetAddress.getByName(networkAddress);
2179 int ip = toIPv4Address(addr.getAddress());
2180 Entity e = new Entity(mac, Short.valueOf(vlan), ip, nc, new Date());
2181 Device d = this.learnDeviceByEntity(e);
2182 d.setStaticHost(true);
2183 return new Status(StatusCode.SUCCESS);
2184 }catch(UnknownHostException e){
2185 return new Status(StatusCode.INTERNALERROR);
2190 public Status removeStaticHost(String networkAddress) {
2193 addr = toIPv4Address(InetAddress.getByName(networkAddress).getAddress());
2194 } catch (UnknownHostException e) {
2195 return new Status(StatusCode.NOTFOUND, "Host does not exist");
2197 Iterator<Device> di = this.getDeviceIteratorForQuery(null, null, addr, null);
2198 List<IDeviceListener> listeners = deviceListeners
2199 .getOrderedListeners();
2200 while(di.hasNext()){
2201 Device d = di.next();
2202 if(d.isStaticHost()){
2204 for (IfNewHostNotify notify : newHostNotify)
2205 notify.notifyHTClientHostRemoved(d.toHostNodeConnector());
2206 for (IDeviceListener listener : listeners)
2207 listener.deviceRemoved(d);
2210 return new Status(StatusCode.SUCCESS);
2214 * For testing: consolidate the store NOW
2216 // void scheduleConsolidateStoreNow() {
2217 // this.storeConsolidateTask.reschedule(0, TimeUnit.MILLISECONDS);
2220 // private class DeviceSyncManager {
2221 // // maps (opaque) deviceKey to the time in System.nanoTime() when we
2222 // // last wrote the device to the sync store
2223 // private ConcurrentMap<Long, Long> lastWriteTimes =
2224 // new ConcurrentHashMap<Long, Long>();
2227 // * Write the given device to storage if we are MASTER.
2228 // * Use this method if the device has significantly changed (e.g.,
2229 // * new AP, new IP, entities removed).
2230 // * @param d the device to store
2232 // public void storeDevice(Device d) {
2237 // long now = System.nanoTime();
2238 // writeUpdatedDeviceToStorage(d);
2239 // lastWriteTimes.put(d.getDeviceKey(), now);
2243 // * Write the given device to storage if we are MASTER and if the
2244 // * last write for the device was more than this.syncStoreIntervalNs
2246 // * Use this method to updated last active times in the store.
2247 // * @param d the device to store
2249 // public void storeDeviceThrottled(Device d) {
2250 // long intervalNs = syncStoreWriteIntervalMs*1000L*1000L;
2255 // long now = System.nanoTime();
2256 // Long last = lastWriteTimes.get(d.getDeviceKey());
2257 // if (last == null ||
2258 // now - last > intervalNs) {
2259 // writeUpdatedDeviceToStorage(d);
2260 // lastWriteTimes.put(d.getDeviceKey(), now);
2262 // debugCounters.updateCounter(CNT_DEVICE_STORE_THROTTLED);
2267 // * Remove the given device from the store. If only some entities have
2268 // * been removed the updated device should be written using
2269 // * {@link #storeDevice(Device)}
2272 // public void removeDevice(Device d) {
2275 // // FIXME: could we have a problem with concurrent put to the
2276 // // hashMap? I.e., we write a stale entry to the map after the
2277 // // delete and now are left with an entry we'll never clean up
2278 // lastWriteTimes.remove(d.getDeviceKey());
2280 // // TODO: should probably do versioned delete. OTOH, even
2281 // // if we accidentally delete, we'll write it again after
2282 // // the next entity ....
2283 // debugCounters.updateCounter(CNT_DEVICE_REMOVED_FROM_STORE);
2284 // storeClient.delete(DeviceSyncRepresentation.computeKey(d));
2285 // } catch(ObsoleteVersionException e) {
2287 // } catch (SyncException e) {
2288 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2289 // logger.error("Could not remove device " + d + " from store", e);
2294 // * Remove the given Versioned device from the store. If the device
2295 // * was locally modified ignore the delete request.
2296 // * @param syncedDeviceKey
2298 // private void removeDevice(Versioned<DeviceSyncRepresentation> dev) {
2300 // debugCounters.updateCounter(CNT_DEVICE_REMOVED_FROM_STORE);
2301 // storeClient.delete(dev.getValue().getKey(),
2302 // dev.getVersion());
2303 // } catch(ObsoleteVersionException e) {
2304 // // Key was locally modified by another thread.
2305 // // Do not delete and ignore.
2306 // } catch(SyncException e) {
2307 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2308 // logger.error("Failed to remove device entry for " +
2309 // dev.toString() + " from store.", e);
2314 // * Synchronously transition from SLAVE to MASTER. By iterating through
2315 // * the store and learning all devices from the store
2317 // private void goToMaster() {
2318 // if (logger.isDebugEnabled()) {
2319 // logger.debug("Transitioning to MASTER role");
2321 // debugCounters.updateCounter(CNT_TRANSITION_TO_MASTER);
2322 // IClosableIterator<Map.Entry<String,Versioned<DeviceSyncRepresentation>>>
2325 // iter = storeClient.entries();
2326 // } catch (SyncException e) {
2327 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2328 // logger.error("Failed to read devices from sync store", e);
2332 // while(iter.hasNext()) {
2333 // Versioned<DeviceSyncRepresentation> versionedDevice =
2334 // iter.next().getValue();
2335 // DeviceSyncRepresentation storedDevice =
2336 // versionedDevice.getValue();
2337 // if (storedDevice == null)
2339 // debugCounters.updateCounter(CNT_DEVICES_FROM_STORE);
2340 // for(SyncEntity se: storedDevice.getEntities()) {
2341 // learnDeviceByEntity(se.asEntity());
2345 // if (iter != null)
2348 // storeConsolidateTask.reschedule(initialSyncStoreConsolidateMs,
2349 // TimeUnit.MILLISECONDS);
2353 // * Actually perform the write of the device to the store
2354 // * FIXME: concurrent modification behavior
2355 // * @param device The device to write
2357 // private void writeUpdatedDeviceToStorage(Device device) {
2359 // debugCounters.updateCounter(CNT_DEVICE_STORED);
2360 // // FIXME: use a versioned put
2361 // DeviceSyncRepresentation storeDevice =
2362 // new DeviceSyncRepresentation(device);
2363 // storeClient.put(storeDevice.getKey(), storeDevice);
2364 // } catch (ObsoleteVersionException e) {
2365 // // FIXME: what's the right behavior here. Can the store client
2366 // // even throw this error?
2367 // } catch (SyncException e) {
2368 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2369 // logger.error("Could not write device " + device +
2370 // " to sync store:", e);
2375 // * Iterate through all entries in the sync store. For each device
2376 // * in the store check if any stored entity matches a live device. If
2377 // * no entities match a live device we remove the entry from the store.
2379 // * Note: we do not check if all devices known to device manager are
2380 // * in the store. We rely on regular packetIns for that.
2381 // * Note: it's possible that multiple entries in the store map to the
2382 // * same device. We don't check or handle this case.
2384 // * We need to perform this check after a SLAVE->MASTER transition to
2385 // * get rid of all entries the old master might have written to the
2386 // * store after we took over. We also run it regularly in MASTER
2387 // * state to ensure we don't have stale entries in the store
2389 // private void consolidateStore() {
2392 // debugCounters.updateCounter(CNT_CONSOLIDATE_STORE_RUNS);
2393 // if (logger.isDebugEnabled()) {
2394 // logger.debug("Running consolidateStore.");
2396 // IClosableIterator<Map.Entry<String,Versioned<DeviceSyncRepresentation>>>
2399 // iter = storeClient.entries();
2400 // } catch (SyncException e) {
2401 // debugCounters.updateCounter(CNT_SYNC_EXCEPTION);
2402 // logger.error("Failed to read devices from sync store", e);
2406 // while(iter.hasNext()) {
2407 // boolean found = false;
2408 // Versioned<DeviceSyncRepresentation> versionedDevice =
2409 // iter.next().getValue();
2410 // DeviceSyncRepresentation storedDevice =
2411 // versionedDevice.getValue();
2412 // if (storedDevice == null)
2414 // for(SyncEntity se: storedDevice.getEntities()) {
2416 // // Do we have a device for this entity??
2417 // IDevice d = findDevice(se.macAddress, se.vlan,
2425 // } catch (IllegalArgumentException e) {
2426 // // not all key fields provided. Skip entity
2430 // // We currently DO NOT have a live device that
2431 // // matches the current device from the store.
2432 // // Delete device from store.
2433 // if (logger.isDebugEnabled()) {
2434 // logger.debug("Removing device {} from store. No "
2435 // + "corresponding live device",
2436 // storedDevice.getKey());
2438 // debugCounters.updateCounter(CNT_CONSOLIDATE_STORE_DEVICES_REMOVED);
2439 // removeDevice(versionedDevice);
2443 // if (iter != null)
2451 // * For testing. Sets the syncService. Only call after init but before
2452 // * startUp. Used by MockDeviceManager
2453 // * @param syncService
2455 // protected void setSyncServiceIfNotSet(ISyncService syncService) {
2456 // if (this.syncService == null)
2457 // this.syncService = syncService;