# Copyright (C) 2015 Chintalagiri Shashank
#
# This file is part of Tendril.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Digi-Key Vendor Module (:mod:`tendril.sourcing.digikey`)
========================================================
'Standalone' Usage
------------------
This module can be imported by itself to provide limited but potentially
useful functionality.
>>> from tendril.sourcing import digikey
.. rubric:: Search
Search for Digi-Key part numbers given a Tendril-compatible ident string :
>>> digikey.dvobj.search_vpnos('IC SMD W5300 LQFP100-14')
(['1278-1009-ND'], 'EXACT_MATCH_FFP')
>>> digikey.dvobj.search_vpnos('IC SMD LM311 8-TSSOP')
(['296-13719-2-ND', '296-13719-1-ND'], 'NAIVE_FP_MATCH')
Note that only certain types of components are supported by the search. The
``device`` component of the ident string is used to determine whether or not
a search will even be attempted. See :data:`VendorDigiKey._devices` for a
list of supported device classes. For all device classes, search is
supported only for components whose ``value`` is a manufacturer part number,
or enough of one to sufficiently identify the part in question.
.. seealso::
:ref:`symbol-conventions`
For certain (very specific) device classes, the search can be done using
generic descriptions of the component. These 'jelly bean' components must
have a correctly formatted ``value``, as per the details listed in the
:ref:`symbol-conventions`.
>>> digikey.dvobj.search_vpnos('CAP CER SMD 22uF/35V 0805')
(['445-14428-2-ND', '445-14428-1-ND', '445-11527-2-ND', '445-11527-1-ND'],
'CONSENSUS_FP_MATCH')
>>> digikey.dvobj.search_vpnos('RES SMD 1.15E/0.125W 0805')
(['311-1.15CRTR-ND', '311-1.15CRCT-ND', '541-1.15CCTR-ND', '541-1.15CCCT-ND'],
'CONSENSUS_FP_MATCH')
>>> digikey.dvobj.search_vpnos('RES ARRAY SMD 102E/0.0625W 1206-4')
(['TC164-FR-07102RL-ND', 'AF164-FR-07102RL-ND'], 'NAIVE_FP_MATCH_ALLOW_NS')
>>> digikey.dvobj.search_vpnos('CRYSTAL AT 13MHz HC49')
(['887-2036-ND'], 'EXACT_MATCH')
.. seealso::
Implementation of :func:`VendorDigiKey._get_pas_vpnos` and the
functions that it uses to perform this search.
Even with supported device types, remember that this search is nowhere near
bulletproof. The more generic a device and/or its name is, the less likely
it is to work. The intended use for this type of search is in concert with
an organization's engineering policies and an instance administrator who
can make the necessary tweaks to the search algortihms and maintain a low
error rate for component types commonly used within the instance.
.. seealso::
:func:`VendorDigiKey._search_preprocess`
:func:`VendorDigiKey._get_device_catstrings`
:func:`VendorDigiKey._get_device_subcatstrings`
:func:`VendorDigiKey._get_device_subcatstrings`
Tweaks and improvements to the search process are welcome as pull requests
to the tendril github repository, though their inclusion will be predicated
on them causing minimal breakage to what already exists.
.. rubric:: Retrieve
Obtain information for a given Digi-Key part number :
>>> p = digikey.DigiKeyElnPart('800-2146-ND')
>>> p.manufacturer
'IDT, Integrated Device Technology Inc'
>>> p.mpartno
'72V2113L10PFI'
>>> p.vqtyavail
185
>>> p.datasheet
'http://www.idt.com/document/72v2103-72v2113-datasheet'
>>> p.package
'80-LQFP'
>>> p.vpartdesc
'IC FIFO SUPERSYNCII 10NS 80TQFP'
The pricing information is also populated by this time, and can be accessed
though the part object using the interfaces defined in those class definitions.
>>> for b in p._prices:
... print b
...
<VendorPrice INR 7,610.18 @1(1)>
<VendorPrice INR 6,801.37 @25(1)>
<VendorPrice INR 6,420.86 @100(1)>
<VendorPrice INR 6,183.05 @250(1)>
<VendorPrice INR 6,087.93 @500(1)>
<VendorPrice INR 5,802.56 @1000(1)>
<VendorPrice INR 5,707.43 @2500(1)>
.. hint::
By default, the :class:`.vendors.VendorPrice` instances are represented
by the unit price in the currency defined by the
:data:`tendril.utils.types.currency.native_currency_defn`.
Additionally, the contents of the Overview and Attributes tables are
available, though not parsed, in the form of BS4 trees. Note, however, that
these contents are only available when part details are obtained from the
soup itself, and not when reconstructed from the Tendril database.
>>> for k in p.attributes_table.keys():
... print(k)
...
Category
Operating Temperature
Memory Size
Package / Case
Function
Product Photos
Datasheets
Current - Supply (Max)
Data Rate
Programmable Flags Support
FWFT Support
Standard Package
Bus Directional
Access Time
Online Catalog
Expansion Type
Family
Mounting Type
Series
Supplier Device Package
Packaging
Voltage - Supply
Other Names
Retransmit Capability
>>> for k in p.overview_table.keys():
... print k
...
Description
Digi-Key Part Number
Manufacturer Part Number
Moisture Sensitivity Level (MSL)
Lead Free Status / RoHS Status
Quantity Available
Manufacturer
Price Break
"""
import codecs
import csv
import locale
import os
import re
import traceback
import urllib
import urlparse
from copy import copy
from decimal import Decimal
from urllib2 import HTTPError
from tendril.conventions.electronics import check_for_std_val
from tendril.conventions.electronics import parse_capacitor
from tendril.conventions.electronics import parse_crystal
from tendril.conventions.electronics import parse_ident
from tendril.conventions.electronics import parse_resistor
from tendril.utils import log
from tendril.utils import www
from tendril.utils.config import INSTANCE_ROOT
from tendril.utils.types import currency
from tendril.utils.types.electromagnetic import Capacitance
from tendril.utils.types.electromagnetic import Resistance
from tendril.utils.types.electromagnetic import Voltage
from tendril.utils.types.thermodynamic import ThermalDissipation
from tendril.utils.types.time import Frequency
from . import customs
from .vendors import SearchPart
from .vendors import SearchResult
from .vendors import VendorBase
from .vendors import VendorElnPartBase
from .vendors import VendorPrice
from .vendors import VendorPartRetrievalError
logger = log.get_logger(__name__, log.DEFAULT)
locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
[docs]class DigiKeyElnPart(VendorElnPartBase):
def __init__(self, dkpartno, ident=None, vendor=None, max_age=-1):
"""
This class acquires and contains information about a single DigiKey
part number, specified by the `dkpartno` parameter.
:param dkpartno: The DigiKey part number.
:type dkpartno: str
:param ident: ``(Optional)`` Allows the caller to include the
canonical representation within Tendril for the
part represented by the object. This is required
for use with the Tendril database.
:type ident: str
:param vendor: ``(Optional)`` Allows the caller to 'bind' the part
to the pre-existing vendor objects in Tendril's
sourcing infrastructure. This is required
for use with the Tendril database.
:type vendor: :class:`VendorDigiKey`
"""
if vendor is None:
vendor = dvobj
if not dkpartno:
logger.error("Not enough information to create a Digikey Part")
super(DigiKeyElnPart, self).__init__(dkpartno, ident, vendor, max_age)
@property
def vparturl(self):
if self._vparturl:
return self._vparturl
else:
return 'http://search.digikey.com/scripts/DkSearch/dksus.dll?Detail?name=' \
+ urllib.quote_plus(self.vpno)
[docs] def _get_data(self):
"""
This function downloads the part page from Digi-Key, scrapes the page,
and populates the object with all the necessary information.
"""
try:
soup = self._vendor.get_soup(self.vparturl)
except HTTPError as e:
if e.code == 404:
logger.error("Got 404 opening DigiKey product page : " + self.vpno)
raise VendorPartRetrievalError
else:
raise
if soup is None:
logger.error("Unable to open DigiKey product page : " + self.vpno)
raise VendorPartRetrievalError
for price in self._get_prices(soup):
self.add_price(price)
try:
self.overview_table = self._get_overview_table(soup)
except AttributeError:
logger.error("Error acquiring DigiKey information for {0}"
"".format(self.vpno))
return
self.attributes_table = self._get_attributes_table(soup)
self.manufacturer = self._get_manufacturer()
self.mpartno = self._get_mpartno()
self.datasheet = self._get_datasheet()
self.package = self._get_package()
self.vqtyavail = self._get_vqtyavail()
self.vpartdesc = self._get_vpartdesc()
[docs] def _get_prices(self, soup):
"""
Given the BS4 parsed soup of the Digi-Key product page, this function
extracts the prices and breaks and returns them as a list of
:class:`.vendors.VendorPrice` instances.
Price listings containing only 'Call' are ignored. Any other
non-numeric content for the price or break quantity listing results
in an error message from the logger and the corresponding price /
break quantity is returned as 0.
"""
pricingtable = soup.find('table', id='product-dollars')
prices = []
try:
for row in pricingtable.findAll('tr'):
cells = row.findAll('td')
if len(cells) == 3:
cells = [cell.text.strip() for cell in cells]
if cells[0] == 'Call' and cells[1] == 'Call':
continue
try:
moq = int(cells[0].replace(',', ''))
except ValueError:
moq = 0
logger.error(
cells[0] + " found while acquiring moq for " +
self.vpno
)
try:
price = locale.atof(cells[1])
except ValueError:
price = 0
logger.error(
cells[1] + " found while acquiring price for " +
self.vpno
)
prices.append(
VendorPrice(moq, price, self._vendor.currency)
)
except AttributeError:
logger.error(
"Pricing table not found or other error for " + self.vpno
)
return prices
@staticmethod
[docs] def _get_table(table):
"""
Parses tabular data in the format of Digi-Key's part detail tables,
obtained from a BS4 parsed tree.
The tables parsed are of the format :
.. code-block:: html
<table>
...
<tr>
<th> [HEAD] </th>
<td> [DATA] </th>
</tr>
...
</table>
The parsed table is returned as a dictionary. For each row in the
table, the extracted ``[HEAD]`` is stripped, encoded to ascii and
used as a key, and value is the corresponding ``[DATA]`` in it's
BS4 parsed form.
"""
rows = table.findAll('tr')
parsed_rows = {}
for row in rows:
try:
head = row.find('th').text.strip().encode('ascii', 'replace')
data = row.find('td')
parsed_rows[head] = data
except AttributeError:
continue
return parsed_rows
[docs] def _get_overview_table(self, soup):
"""
Given the BS4 parsed soup of the Digi-Key product page, this function
extracts and returns the overview / product-detail table using
:func:`_get_table`.
"""
table = soup.find('table', {'id': 'product-details'})
return self._get_table(table)
[docs] def _get_attributes_table(self, soup):
"""
Given the BS4 parsed soup of the Digi-Key product page, this function
extracts and returns the product attributes table using
:func:`_get_table`.
"""
table = soup.find('table', {'class': 'attributes-table-main'})
return self._get_table(table)
[docs] def _get_mpartno(self):
"""
Extracts the Manufacturer Part Number from the overview table.
"""
cell = self.overview_table['Manufacturer Part Number']
return cell.text.strip().encode('ascii', 'replace')
[docs] def _get_manufacturer(self):
"""
Extracts the Manufacturer from the overview table.
"""
cell = self.overview_table['Manufacturer']
return cell.text.strip().encode('ascii', 'replace')
[docs] def _get_package(self):
"""
Extracts the Package from the attributes table. If ``Package / Case``
is not found in the table, returns None.
"""
try:
cell = self.attributes_table['Package / Case']
except KeyError:
return None
return cell.text.strip().encode('ascii', 'replace')
[docs] def _get_datasheet(self):
"""
Extracts the first datasheet link from the attributes table.
If ``Datasheets`` is not found in the table, returns None.
"""
try:
cell = self.attributes_table['Datasheets']
except KeyError:
return None
datasheet_link = cell.findAll('a')[0].attrs['href']
return datasheet_link.strip()
_regex_vqtyavail = re.compile(r'^(?P<qty>\d+(,*\d+)*)\s*Can ship immediately')
_regex_vqty_vai = re.compile(r'^Value Added Item')
[docs] def _get_vqtyavail(self):
"""
Extracts the Quantity Available from the overview table. If
``Value Added Item`` is found in the available quantity cell,
returns -2. If any other non integral value is found in the
cell, returns -1.
"""
cell = self.overview_table['Quantity Available']
text = cell.text.strip().encode('ascii', 'replace')
try:
m = self._regex_vqtyavail.search(text)
qtytext = m.groupdict()['qty'].replace(',', '')
return int(qtytext)
except:
if self._regex_vqty_vai.match(text):
return -2
return -1
[docs] def _get_vpartdesc(self):
"""
Extracts the Description from the overview table.
"""
cell = self.overview_table['Description']
return cell.text.strip().encode('ascii', 'replace')
[docs]class DigiKeyInvoice(customs.CustomsInvoice):
def __init__(self, vendor=None, inv_yaml=None, working_folder=None):
if vendor is None:
vendor = VendorDigiKey('digikey', 'Digi-Key Corporation',
'electronics',
currency_code='USD', currency_symbol='US$')
if inv_yaml is None:
inv_yaml = os.path.join(INSTANCE_ROOT, 'scratch', 'customs',
'inv_data.yaml')
super(DigiKeyInvoice, self).__init__(vendor, inv_yaml, working_folder)
[docs] def _acquire_lines(self):
logger.info("Acquiring Lines")
invoice_file = os.path.join(self._source_folder,
self._data['invoice_file'])
with open(invoice_file) as f:
reader = csv.reader(f)
header = None
for line in reader:
if line[0].startswith(codecs.BOM_UTF8):
line[0] = line[0][3:]
if line[0] == 'Index':
header = line
break
if header is None:
raise ValueError
for line in reader:
if line[0] != '':
idx = line[header.index('Index')].strip()
qty = int(line[header.index('Quantity')].strip())
vpno = line[header.index('Part Number')].strip()
desc = line[header.index('Description')].strip()
ident = line[header.index('Customer Reference')].strip()
boqty = line[header.index('Backorder')].strip()
try:
if int(boqty) > 0:
logger.warning(
"Apparant backorder. Crosscheck customs "
"treatment for: {0} {1}".format(idx, ident)
)
except ValueError:
print(line)
unitp_str = line[header.index('Unit Price')].strip()
unitp = currency.CurrencyValue(
float(unitp_str), self._vendor.currency
)
lineobj = customs.CustomsInvoiceLine(
self, ident, vpno, unitp, qty, idx=idx, desc=desc
)
self._lines.append(lineobj)
[docs]class VendorDigiKey(VendorBase):
_partclass = DigiKeyElnPart
_invoiceclass = DigiKeyInvoice
#: Supported Device Classes
#:
#: .. hint::
#: This handles instance-specific tweaks, and should be
#: modified to match your instance's nomenclature guidelines.
#:
_devices = [
'IC SMD', 'IC THRU', 'IC PLCC',
'FERRITE BEAD SMD', 'TRANSISTOR THRU', 'TRANSISTOR SMD', 'MOSFET SMD',
'CONN DF13', 'CONN DF13 HOUS', 'CONN DF13 WIRE', 'CONN DF13 CRIMP',
'DIODE SMD', 'DIODE THRU', 'LED SMD', 'BRIDGE RECTIFIER', 'ZENER SMD',
'RES SMD', 'RES THRU', 'RES ARRAY SMD', 'VARISTOR',
'CAP CER SMD', 'CAP TANT SMD', 'CAP AL SMD', 'CAP MICA SMD',
'CAP ELEC THRU', 'TRANSFORMER SMD', 'INDUCTOR SMD', 'RELAY',
'CRYSTAL AT', 'CRYSTAL OSC', 'CRYSTAL VCXO',
'CONN MODULAR', 'CONN SECII', 'CONN TERMINAL DMC',
'LIGHT PIPE', 'MODULE SMPS', 'SWITCH TACT'
]
_url_base = 'http://www.digikey.com'
_vendorlogo = '/static/images/vendor-logo-digikey.png'
_search_url_base = urlparse.urljoin(_url_base, '/product-search/en/')
_default_urlparams = [
('stock', '0'), ('mnonly', '0'), ('newproducts', '0'),
('ColumnSort', '0'), ('page', '1'), ('quantity', '0'),
('ptm', '0'), ('fid', '0'), ('pagesize', '500'),
('pbfree', '0'), ('rohs', '0')
]
_type = 'DIGIKEY'
def __init__(self, name, dname, pclass, mappath=None,
currency_code='USD', currency_symbol='US$', **kwargs):
self._searchpages_filters = {}
self._session = www.get_session()
super(VendorDigiKey, self).__init__(
name, dname, pclass, mappath,
currency_code, currency_symbol, **kwargs
)
self.add_order_baseprice_component("Shipping Cost", 40)
self.add_order_additional_cost_component("Customs", 12.85)
@property
def session(self):
return self._session
[docs] def get_soup(self, url):
"""
Retrieve the soup for a particular url. This function is factored out
to allow switching between the :mod:`urllib2` and :mod:`requests`
based implementations in :mod:`tendril.utils.www`.
Currently, the requests based implementation is about two times slower
for vendor map generation. The cache provided by :mod:`cachecontrol`
in that implementation is not very well suited to this application.
Additional changes to the underlying implementation or configuration
thereof (perhaps in the heuristic) may change this in the future.
"""
# return www.get_soup_requests(url, session=self._session)
return www.get_soup(url)
[docs] def search_vpnos(self, ident):
device, value, footprint = parse_ident(ident)
if device not in self._devices:
return None, 'NODEVICE'
try:
if device.startswith('RES') or \
device.startswith('POT') or \
device.startswith('CAP') or \
device.startswith('CRYSTAL') or \
device.startswith('LED'):
if check_for_std_val(ident) is False:
return self._get_search_vpnos(device, value, footprint)
try:
return self._get_pas_vpnos(device, value, footprint)
except NotImplementedError:
logger.warning(ident + ' :: DK Search for ' + device +
' Not Implemented')
return None, 'NOT_IMPL'
if device in self._devices:
return self._get_search_vpnos(device, value, footprint)
else:
return None, 'FILTER_NODEVICE'
except Exception:
logger.error(traceback.format_exc())
logger.error('Fatal Error searching for : ' + ident)
return None, None
rex_to_package = re.compile(ur'^TO(?P<code>[\d]+[A-Za-z]*)$')
[docs] def _search_preprocess(self, device, value, footprint):
"""
Pre-processes and returns the ``(device, value, footprint)`` tuple
parsed from the part ident, making tweaks necessary to translate
the names into those more likely to be found on Digi-Key.
.. hint::
This function handles instance-specific tweaks, and should be
modified to match your instance's nomenclature guidelines.
.. todo::
The content of this function should be moved out of the core and
into the instance folder.
"""
if footprint is not None:
m = self.rex_to_package.match(footprint)
if m:
footprint = 'TO-' + m.group('code')
return device, value, footprint
@staticmethod
[docs] def _process_product_page(soup):
"""
Given the BS4 parsed soup of a Digi-Key product page, returns a
:class:`.vendors.SearchResult` instance, whose ``parts`` variable
includes a list of exactly one :class:`.vendors.SearchPart`
instance, with relevant data parsed from the page contained
within it. The ``strategy`` for the returned result is set to
``EXACT_MATCH``.
In case any of the following exclusion criteria are met, the
part is not returned within the :class:`.vendors.SearchResult`, its
``success`` parameter is set to ``False``, and its ``strategy``
parameter is set as listed below :
+----------------------------------------+-----------------------+
| Listed as Obsolete on the Product Page | ``OBSOLETE_NOTAVAIL`` |
+----------------------------------------+-----------------------+
| No Part Number Found | ``NO_PNO_CELL`` |
+----------------------------------------+-----------------------+
| Product Details Table Not Found | ``NO_PDTABLE`` |
+----------------------------------------+-----------------------+
.. todo::
The current approach of handling catergories and subcategories
can result in these seemingly 'exact match' results to be put
into a comparison. If that were to happen, the Package/Case,
Unit Price, and MOQ are all necessary pieces of information to
determine inclusion. These are not currently parsed from the
page soup and should be handled here, preferably without
instantiating a full digikey part instance.
"""
beablock = soup.find('div', {'class': 'product-details-feedback'})
if beablock is not None:
if beablock.text == u'\nObsolete item; call Digi-Key for more information.\n': # noqa
return SearchResult(False, None, 'OBSOLETE_NOTAVAIL')
pdtable = soup.find('table', {'id': 'product-details'})
if pdtable is not None:
pno_meta = pdtable.find('meta', {'itemprop': 'productID'})
if pno_meta is None:
return SearchResult(False, None, 'NO_PNO_CELL')
pno_text = pno_meta.attrs['content']
pno = pno_text.split(':')[1].encode('ascii', 'replace')
mpno_meta = pdtable.find('meta', {'itemprop': 'name'})
mpno_text = mpno_meta.attrs['content']
mpno = mpno_text.encode('ascii', 'replace')
part = {'Digi-Key Part Number': pno,
'Manufacturer Part Number': mpno,
'Package / Case': 'Dummy',
'Unit Price (USD)': '100',
'Minimum Quantity': '1'}
return SearchResult(True, [part], 'EXACT_MATCH')
else:
return SearchResult(False, None, 'NO_PDTABLE')
@staticmethod
[docs] def _get_device_catstrings(device):
"""
Given a certain ``device`` string, returns the known Digi-Key
categories which should be searched in case the first search attempt
returns an Index Page.
Returns a ``tuple``, the first element being ``success``
(``True`` or ``False``) and the second being the list of recognized
category strings for the given device string.
.. hint::
This function handles instance-specific tweaks, and should be
modified to match your instance's engineering and nomenclature
guidelines.
.. todo::
The content of this function should be moved out of the core and
into the instance folder.
"""
if device.startswith('IC'):
catstrings = ['Integrated Circuits (ICs)',
'Isolators',
'Sensors, Transducers']
elif device.startswith('DIODE'):
catstrings = ['Discrete Semiconductor Products']
elif device.startswith('TRANSISTOR'):
catstrings = ['Discrete Semiconductor Products']
elif device.startswith('BRIDGE RECTIFIER'):
catstrings = ['Discrete Semiconductor Products']
else:
return False, None
return True, catstrings
@staticmethod
[docs] def _get_device_subcatstrings(device):
"""
Given a certain ``device`` string, returns the known Digi-Key
sub-categories which should be searched in case the a search attempt
returns a Secondary Index Page.
Returns a ``tuple``, the first element being ``success``
(``True`` or ``False``) and the second being the list of recognized
sub-category strings for the given device string.
.. hint::
This function handles instance-specific tweaks, and should be
modified to match your instance's engineering and nomenclature
guidelines.
.. todo::
The content of this function should be moved out of the core and
into the instance folder.
"""
if device.startswith('IC'):
subcatstrings = ['Interface - Controllers']
elif device.startswith('DIODE'):
subcatstrings = ['Diodes - Rectifiers - Single',
'Diodes - Rectifiers - Arrays']
elif device.startswith('TRANSISTOR'):
subcatstrings = ['Transistors - FETs, MOSFETs - Single',
'Transistors - FETs, MOSFETs - Arrays']
elif device.startswith('BRIDGE RECTIFIER'):
subcatstrings = ['Bridge Rectifiers',
'Diodes - Bridge Rectifiers']
else:
return False, None
return True, subcatstrings
[docs] def _process_secondary_index_page(self, soup, device):
result, subcatstrings = self._get_device_subcatstrings(device)
if subcatstrings is None or len(subcatstrings) == 0:
return SearchResult(False, None, 'SUBCATEGORY_UNKNOWN')
cathead = soup.find('h1', {'class': 'catfiltertopitem'})
if cathead is None:
return SearchResult(False, None, 'SECONDARY_INDEX_NOT_FOUND')
subcatcontainer = soup.find('ul', {'id': 'catfiltersubid'})
subcat_elems = subcatcontainer.findAll('a')
subcats = {}
for elem in subcat_elems:
subcat_name = elem.text.strip().encode('ascii', 'replace')
subcat_url_part = elem.attrs['href'].strip()
subcats[subcat_name] = subcat_url_part
new_url_parts = []
for subcat_name in subcats.keys():
if subcat_name in subcatstrings:
new_url_parts.append(subcats[subcat_name])
if len(new_url_parts) == 0:
return SearchResult(False, None, 'SUBCATEGORY_NOT_FOUND')
results = []
for url_part in new_url_parts:
new_url = urlparse.urljoin(self._url_base, url_part)
soup = self.get_soup(new_url)
if soup is not None:
try:
results.extend(self._get_resultpage_table(soup))
except ValueError:
# Must be a product page
sr = self._process_product_page(soup)
if sr.success is True:
results.extend(sr.parts)
if len(results):
return SearchResult(True, results, '')
else:
return SearchResult(False, [], 'SECONDARY_INDEX_TRAVERSAL')
[docs] def _process_index_page(self, soup, device):
idxlist = soup.find('div', id='productIndexList')
if idxlist is None:
return SearchResult(False, None, 'INDEX_NOT_FOUND')
# Handling categories needs to be much better
catheads = idxlist.findAll('h2', attrs={'class': 'catfiltertopitem'})
cats = {}
for cat in catheads:
cat_elem = cat.find('a')
cat_name = cat_elem.text.strip().encode('ascii', 'replace')
cat_url_part = cat_elem.attrs['href'].strip()
cats[cat_name] = cat_url_part
result, catstrings = self._get_device_catstrings(device)
if result is False:
return SearchResult(False, None, 'CATEGORY_UNKNOWN')
new_url_parts = []
for cat_name in cats.keys():
if cat_name in catstrings:
new_url_parts.append(cats[cat_name])
if len(new_url_parts) == 0:
return SearchResult(False, None, 'CATEGORY_NOT_FOUND')
results = []
for url_part in new_url_parts:
new_url = urlparse.urljoin(self._url_base, url_part)
soup = self.get_soup(new_url)
if soup is not None:
try:
results.extend(self._get_resultpage_table(soup))
except ValueError:
# Either an exact match or a secondary index
sr = self._process_product_page(soup)
if sr.success is True:
results.extend(sr.parts)
continue
if sr.strategy == 'NO_PDTABLE':
sr = self._process_secondary_index_page(soup, device)
if sr.success is True:
results.extend(sr.parts)
if len(results):
return SearchResult(True, results, '')
else:
return SearchResult(False, [], 'INDEX_TRAVERSAL')
@staticmethod
[docs] def _process_resultpage_row(row):
"""
Given a row from a CSV file obtained from Digi-Key's search
interface and read in using :class:`csv.DictReader`, convert it into
a :class:`.vendors.SearchPart` instance.
If the unit price for the row is not readily parseable into a
float, or if the minqty field includes a Non-Stock note, the
returned SearchPart has it's ``ns`` attribute set to True.
:type row: dict
:rtype: :class:`.vendors.SearchPart`
"""
pno = row.get('Digi-Key Part Number', '').strip()
package = row.get('Package / Case', '').strip()
minqty = row.get('Minimum Quantity', '').strip()
mfgpno = row.get('Manufacturer Part Number', '').strip()
unitp = row.get('Unit Price (USD)', '').strip()
qtyavail = row.get('Quantity Available', '0').strip()
qtyavail = int(qtyavail)
try:
unitp = locale.atof(unitp)
except ValueError:
unitp = None
if pno is not None:
if qtyavail == 0 or unitp is None:
ns = True
else:
ns = False
else:
ns = True
part = SearchPart(pno=pno, mfgpno=mfgpno, package=package,
ns=ns, unitp=unitp, minqty=minqty, raw=row)
return part
[docs] def _get_resultpage_parts(self, rows):
"""
Given a list of table rows obtained using a
:func:`_get_resultpage_table`, or a combined list from multiple
calls thereof with multiple result pages, returns a
:class:`.vendors.SearchResult` instance whose ``parts`` variable is a
list of all Digi-Key parts found in those result pages, each
represented by a :class:`.vendors.SearchPart` instance and generated
using :func:`_process_resultpage_row`.
"""
parts = []
for row in rows:
part = self._process_resultpage_row(row)
parts.append(part)
return SearchResult(True, parts, '')
[docs] def _get_resultpage_table(self, soup):
"""
Given the BS4 parsed soup of a Digi-Key search page, returns a
list of rows in the table.
This function uses the 'Download table as CSV' feature of Digi-Keys
website to obtain the table as CSV, and then uses
:class:`csv.DictReader` to parse the CSV file and returns a list
of dictionaries - one for each row.
Each row is intended for later processing into a
:class:`.vendors.SearchPart` instance using
:func:`_process_resultpage_row`.
"""
form = soup.find('form', {'name': 'downloadform'})
if form is None:
raise ValueError
params = []
parts = form.findAll('input')
for part in parts:
params.append((part.attrs['name'], part.attrs['value']))
target = urlparse.urljoin(self._url_base,
form.attrs['action'])
url_dl = target + '?' + urllib.urlencode(params)
table_csv_path = www.cached_fetcher.fetch(url_dl, getcpath=True)
with codecs.open(table_csv_path, 'r', encoding='utf-8') as f:
table_csv = f.read()
table_csv = table_csv.encode('utf-8')
if table_csv.startswith(codecs.BOM_UTF8):
table_csv = table_csv[len(codecs.BOM_UTF8):]
lines = csv.DictReader(table_csv.splitlines())
return lines
[docs] def _get_search_vpnos(self, device, value, footprint):
if value.strip() == '':
return None, 'NOVALUE'
device, value, footprint = self._search_preprocess(
device, value, footprint
)
params = copy(self._default_urlparams)
searchurl = self._search_url_base + '?k=' + urllib.quote_plus(value) \
+ '&' + urllib.urlencode(params)
soup = self.get_soup(searchurl)
if soup is None:
return None, 'URL_FAIL'
ptable = soup.find('table', id='productTable')
index_results = None
if ptable is None:
# check for single product page
sr = self._process_product_page(soup)
if sr.success is True:
# Sent directly to an exact match. Just send it back.
return [sr.parts[0]['Digi-Key Part Number']], sr.strategy
# check for secondary index page and get full parts listing
# thereof
sr = self._process_secondary_index_page(soup, device)
if sr.success is True:
index_results = sr.parts
elif sr.strategy == 'SECONDARY_INDEX_NOT_FOUND':
# No secondary index there
# check for index page and get full parts listing thereof
sr = self._process_index_page(soup, device)
if sr.success is False:
# No luck with the index
return None, sr.strategy
index_results = sr.parts
else:
return None, sr.strategy
if index_results is None:
table = self._get_resultpage_table(soup)
else:
table = index_results
sr = self._get_resultpage_parts(table)
if sr.success is False:
return SearchResult(False, None, sr.strategy)
if sr.parts is None:
raise Exception
if len(sr.parts) == 0:
return SearchResult(False, None, 'NO_RESULTS:2')
results = sr.parts
sr = self._process_results(results, value, footprint)
if sr.success is False:
return None, sr.strategy
return sr.parts, sr.strategy
_regex_fo_res = re.compile(r'^(?P<num>\d+(.\d+)*)(?P<order>[kKMGT])*$')
[docs] def _tf_resistance_to_canonical(self, rstr):
# TODO Replace with tendril.utils.type version
rstr = rstr.encode('ascii', 'replace').strip()
if rstr == '-':
return
if rstr == '*':
return
if ',' in rstr:
return
try:
rparts = self._regex_fo_res.search(rstr).groupdict()
except AttributeError:
raise
rval = Decimal(rparts['num'])
if rparts['order'] is None:
do = 0
if rval > 1000:
raise ValueError
while 1 > rval > 0:
rval *= 1000
do += 1
if do == 0:
ostr = 'E'
elif do == 1:
ostr = 'm'
elif do == 2:
ostr = 'u'
else:
raise ValueError
else:
if rparts['order'] in ['K', 'k']:
ostr = 'K'
elif rparts['order'] == 'M':
ostr = 'M'
elif rparts['order'] == 'G':
ostr = 'G'
elif rparts['order'] == 'T':
ostr = 'T'
else:
raise ValueError(str(rparts))
vfmt = lambda d: str(d.quantize(Decimal(1))
if d == d.to_integral() else d.normalize())
return vfmt(rval) + ostr
@staticmethod
[docs] def _tf_tolerance_to_canonical(tstr):
tstr = www.strencode(tstr).strip()
return tstr
_regex_fo_cap = re.compile(r'^(?P<num>\d+(.\d+)*)(?P<order>[pum]F)$')
[docs] def _tf_capacitance_to_canonical(self, cstr):
# TODO Replace with tendril.utils.type version
cstr = www.strencode(cstr).strip()
if cstr == '-':
return
if cstr == '*':
return
try:
cparts = self._regex_fo_cap.search(cstr).groupdict()
except AttributeError:
raise AttributeError(cstr)
cval = Decimal(cparts['num'])
ostr = cparts['order']
ostrs = ['fF', 'pF', 'nF', 'uF', 'mF']
oindex = ostrs.index(ostr)
if cparts['order'] is None:
raise ValueError
if cval >= 1000:
po = 0
while cval >= 1000:
cval /= 1000
po += 1
if po > 0:
for i in range(po):
oindex += 1
else:
do = 0
while 0 < cval < 1:
cval *= 1000
do += 1
if do > 0:
for i in range(do):
oindex -= 1
vfmt = lambda d: str(d.quantize(Decimal(1))
if d == d.to_integral() else d.normalize())
return vfmt(cval) + ostrs[oindex]
@staticmethod
[docs] def _tf_option_base(ostr):
ostr = www.strencode(ostr).strip()
if ostr == '-':
return
if ostr == '*':
return
return ostr
@property
def _filter_otf(self):
return {
'Resistance (Ohms)': self._tf_resistance_to_canonical,
'Capacitance': self._tf_capacitance_to_canonical,
'Tolerance': self._tf_tolerance_to_canonical,
'base': self._tf_option_base,
}
[docs] def _get_searchpage_filters(self, soup):
filters = {}
idx = 0
try:
filtertable = soup.find('table', attrs={'class': 'filters-group'})
headers = filtertable.find('tr')
headers = [header.text.encode('ascii', 'replace')
for header in headers.findAll('th')]
fvaluerow = filtertable.findAll('tr')[1]
fvaluecells = fvaluerow.findAll('td')
fvalueselects = [cell.find('select') for cell in fvaluecells]
for idx, header in enumerate(headers):
optionsoup = fvalueselects[idx].findAll('option')
options = [(www.strencode(o.text).rstrip('\n').strip(),
o.attrs['value']) for o in optionsoup]
fname = fvalueselects[idx].attrs['name']
fname = fname.encode('ascii', 'replace').strip()
if header in self._filter_otf.keys():
options = [
(self._filter_otf[header](option[0]), option[1])
for option in options
]
else:
options = [
(self._filter_otf['base'](option[0]), option[1])
for option in options
]
filters[header] = (fname, options)
except Exception:
logger.error(traceback.format_exc())
logger.error('idx :' + str(idx))
return False, None
return True, filters
[docs] def _get_searchurl_filters(self, searchurl):
if searchurl in self._searchpages_filters.keys():
return self._searchpages_filters[searchurl]
else:
searchsoup = self.get_soup(searchurl)
result, filters = self._get_searchpage_filters(searchsoup)
if result is True:
self._searchpages_filters[searchurl] = filters
else:
raise AttributeError
return filters
@staticmethod
[docs] def _tf_package_tant_smd(footprint):
if footprint == 'TANT-B':
footprint = 'B'
if footprint == 'TANT-C':
footprint = 'C'
if footprint == 'TANT-D':
footprint = 'D'
return footprint, None
@staticmethod
[docs] def _tf_package_crystal_at(footprint):
extraparams_fp = None
if footprint == 'HC49':
footprint = ['HC-49S', 'HC49/U', 'HC49/US']
extraparams_fp = [('Mounting Type', 'Through Hole'),]
return footprint, extraparams_fp
@staticmethod
[docs] def _tf_package_res_array_smd(footprint):
extraparams_fp = None
if footprint == '1206-4':
footprint = '1206'
extraparams_fp = [('Number of Resistors', '4')]
return footprint, extraparams_fp
@property
def _searchurl_res_smd(self):
return urlparse.urljoin(self._search_url_base,
'resistors/chip-resistor-surface-mount/')
@property
def _searchurl_res_thru(self):
return urlparse.urljoin(self._search_url_base,
'resistors/through-hole-resistors/')
@property
def _searchurl_res_array_smd(self):
return urlparse.urljoin(self._search_url_base,
'resistors/resistor-networks-arrays/')
@property
def _searchurl_cap_cer_smd(self):
return urlparse.urljoin(self._search_url_base,
'capacitors/ceramic-capacitors/')
@property
def _searchurl_cap_tant_smd(self):
return urlparse.urljoin(self._search_url_base,
'capacitors/tantalum-capacitors/')
@property
def _searchurl_crystal(self):
return urlparse.urljoin(self._search_url_base,
'crystals-oscillators-resonators/crystals/')
[docs] def _get_device_searchparams(self, device):
packagetf = None
package_header = 'Package / Case'
package_filter_type = 'contains'
extraparams = None
if device == "RES SMD":
searchurlbase = self._searchurl_res_smd
devtype = 'resistor'
elif device == "RES THRU":
searchurlbase = self._searchurl_res_thru
extraparams = (('Tolerance', '+/-1%'),)
package_filter_type = None
devtype = 'resistor'
elif device == "RES POWER":
devtype = 'resistor'
raise NotImplementedError
elif device == "RES ARRAY THRU":
devtype = 'resistor'
raise NotImplementedError
elif device == "RES ARRAY SMD":
searchurlbase = self._searchurl_res_array_smd
packagetf = self._tf_package_res_array_smd
devtype = 'resistor_array'
extraparams = (('Circuit Type', 'Isolated'), )
elif device == "POT TRIM":
devtype = 'resistor'
raise NotImplementedError
elif device == "CAP CER SMD":
searchurlbase = self._searchurl_cap_cer_smd
devtype = 'capacitor'
elif device == "CAP TANT SMD":
searchurlbase = self._searchurl_cap_tant_smd
packagetf = self._tf_package_tant_smd
package_header = 'Manufacturer Size Code'
package_filter_type = 'equals'
devtype = 'capacitor'
elif device == "CAP CER THRU":
devtype = 'capacitor'
raise NotImplementedError
elif device == "CAP ELEC THRU":
devtype = 'capacitor'
raise NotImplementedError
elif device == "CAP POLY THRU":
devtype = 'capacitor'
raise NotImplementedError
elif device == "CAP PAPER THRU":
devtype = 'capacitor'
raise NotImplementedError
elif device == "CRYSTAL AT":
searchurlbase = self._searchurl_crystal
packagetf = self._tf_package_crystal_at
package_filter_type = 'inlist'
extraparams = (('Operating Mode', 'Fundamental'),)
devtype = 'crystal'
elif device == "LED SMD":
raise NotImplementedError
else:
raise NotImplementedError
return (devtype, searchurlbase, packagetf, package_header,
package_filter_type, extraparams)
@staticmethod
[docs] def _get_value_options(devtype, value):
if devtype == 'resistor':
loptions = (
('resistance', "Resistance (Ohms)", 'equals', Resistance),
('wattage', "Power (Watts)", 'contains', ThermalDissipation)
)
lvalues = parse_resistor(value)
elif devtype == 'resistor_array':
loptions = (
('resistance', "Resistance (Ohms)", 'equals', Resistance),
('wattage', "Power Per Element", 'tequals', ThermalDissipation)
)
lvalues = parse_resistor(value)
elif devtype == 'capacitor':
loptions = (
('capacitance', "Capacitance", 'equals', Capacitance),
('voltage', "Voltage - Rated", 'equals', Voltage)
)
lvalues = parse_capacitor(value)
elif devtype == 'crystal':
loptions = (
('frequency', "Frequency", 'equals', Frequency),
)
lvalues = [parse_crystal(value)]
else:
raise ValueError
return loptions, lvalues
@staticmethod
[docs] def _process_value_options(filters, loptions, lvalues):
lparams = []
for lidx, loption in enumerate(loptions):
if lvalues[lidx] is None:
continue
dkopcode = None
for option in filters[loption[1]][1]:
if loption[2] == 'equals':
if option[0] == lvalues[lidx]:
dkopcode = option[1]
elif loption[2] == 'contains':
if option[0] and lvalues[lidx] in option[0]:
dkopcode = option[1]
elif loption[2] == 'tequals':
t = loption[3]
if not option[0]:
continue
if ',' in option[0]:
val = option[0].split(',')[0]
else:
val = option[0]
if t(val) == t(lvalues[lidx]):
dkopcode = option[1]
dkfiltcode = filters[loption[1]][0]
if dkopcode is None:
raise ValueError(loption[0] + ' ' + lvalues[lidx])
lparams.append((dkfiltcode, dkopcode))
return lparams
@staticmethod
[docs] def _process_package_options(filters, package_header,
package_filter_type, footprint):
if package_filter_type is not None:
lparams = []
pkgopcode = None
for option in filters[package_header][1]:
if not option[0]:
continue
if package_filter_type == 'contains':
if footprint in option[0]:
try:
pkgopcode.append(option[1])
except AttributeError:
pkgopcode = [option[1]]
elif package_filter_type == 'equals':
if footprint == option[0]:
pkgopcode = option[1]
elif package_filter_type == 'inlist':
if option[0] in footprint:
try:
pkgopcode.append(option[1])
except AttributeError:
pkgopcode = [option[1]]
if pkgopcode is None:
raise ValueError(footprint)
if isinstance(pkgopcode, list):
for opt in pkgopcode:
lparams.append((filters[package_header][0], opt))
else:
lparams.append((filters[package_header][0], pkgopcode))
return lparams
else:
return []
@staticmethod
[docs] def _get_pas_vpnos(self, device, value, footprint):
devtype, searchurlbase, packagetf, package_header, \
package_filter_type, extraparams = \
self._get_device_searchparams(device)
filters = self._get_searchurl_filters(searchurlbase)
params = copy(self._default_urlparams)
loptions, lvalues = self._get_value_options(devtype, value)
params.extend(
self._process_value_options(filters, loptions, lvalues)
)
extraparams_fp = None
if packagetf is not None:
footprint, extraparams_fp = packagetf(footprint)
packages_handled = False
package_params = self._process_package_options(
filters, package_header, package_filter_type, footprint
)
if len(package_params):
packages_handled = True
params.extend(package_params)
if extraparams_fp is not None:
if extraparams is not None:
extraparams = list(extraparams) + extraparams_fp
else:
extraparams = extraparams_fp
params.extend(self._process_extraparams(filters, extraparams))
searchurl = searchurlbase + '?' + urllib.urlencode(params)
soup = self.get_soup(searchurl)
if soup is None:
return None, 'URL_FAIL'
ptable = soup.find('table', id='productTable')
if ptable is None:
# check for single product page
sr = self._process_product_page(soup)
if sr.success is True:
# Sent directly to an exact match. Just send it back.
return [sr.parts[0]['Digi-Key Part Number']], sr.strategy
try:
table = self._get_resultpage_table(soup)
except ValueError:
return None, 'NO_RESULTS:PAS1'
sr = self._get_resultpage_parts(table)
if sr.success is False:
return SearchResult(False, None, sr.strategy)
if sr.parts is None:
raise Exception
if len(sr.parts) == 0:
return None, 'NO_RESULTS:PAS2'
results = sr.parts
if not packages_handled:
sr = self._process_results(results, value, footprint)
else:
sr = self._filter_results_unfiltered(results)
if sr.success is False:
return None, sr.strategy
return sr.parts, sr.strategy
dvobj = VendorDigiKey('digikey', 'Digi-Key Corporation',
'electronics', mappath=None,
currency_code='USD', currency_symbol='US$')