Source code for tendril.gedaif.bomparser

# Copyright (C) 2015 Chintalagiri Shashank
#
# This file is part of Tendril.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
gEDA BOM Parser Module (:mod:`tendril.gedaif.bomparser`)
========================================================
"""

import subprocess
import csv
import os
import shutil

from tendril.conventions.motifs import create_motif_object
from tendril.conventions.electronics import ident_transform
from tendril.boms.validate import ValidationContext
from tendril.boms.validate import ErrorCollector
from tendril.boms.validate import BomMotifPolicy
from tendril.boms.validate import BomMotifUnrecognizedError

import projfile

from tendril.utils.config import INSTANCE_CACHE
from tendril.utils.config import PROJECTS_ROOT
from tendril.utils import fsutils
from tendril.utils import log
logger = log.get_logger(__name__, level=log.WARNING)


FNULL = open(os.devnull, 'w')


[docs]class BomLine(object): def __init__(self, line, columns): self.data = {} line = line.rstrip('\r\n') elems = line.split('\t') for i in range(len(columns)): self.data[columns[i]] = elems[i] def __repr__(self): return self.data.__repr__() def __getattr__(self, item): if item in self.data.keys(): return self.data[item] elif item == 'ident': return ident_transform( self.data['device'], self.data['value'], self.data['footprint'] ) else: raise AttributeError
[docs]class CachedBomParser(object): _basefolder = None def __init__(self, projectfolder, use_cached=True, **kwargs): self.projectfolder = os.path.normpath(projectfolder) self._namebase = os.path.relpath(self.projectfolder, PROJECTS_ROOT) self._validation_context = ValidationContext( self.projectfolder, 'BOMParser' ) self._validation_errors = ErrorCollector() self.line_gen = None self._use_cached = use_cached self._generator_args = kwargs @property def _temp_folder(self): return os.path.join(fsutils.TEMPDIR, self._namebase, self._basefolder) @property def _temp_bom_path(self): return os.path.join(self._temp_folder, "tempbom.net") @property def _cache_folder(self): return os.path.join(INSTANCE_CACHE, 'gedaproject', self._namebase) @property def _cached_bom_path(self): return os.path.join(self._cache_folder, 'bom.net')
[docs] def generate_bom_file(self, outfile, **kwargs): raise NotImplementedError
[docs] def get_bom_file(self): if not os.path.exists(self._cache_folder): os.makedirs(self._cache_folder) bom_mtime = fsutils.get_file_mtime(self._cached_bom_path) source_folder = os.path.join(self.projectfolder, self._basefolder) source_mtime = fsutils.get_folder_mtime(source_folder) if self._use_cached is True and bom_mtime is not None \ and source_mtime < bom_mtime: return open(self._cached_bom_path, 'r') else: self.generate_bom_file(self._temp_bom_path, **self._generator_args) shutil.copy(self._temp_bom_path, self._cached_bom_path) return open(self._cached_bom_path, 'r')
[docs] def get_lines(self): raise NotImplementedError
@property def validation_errors(self): return self._validation_errors
[docs]class GedaBomParser(CachedBomParser): _basefolder = 'schematic' def __init__(self, projectfolder, use_cached=True, backend=None): super(GedaBomParser, self).__init__(projectfolder, use_cached=use_cached, backend=backend) self._gpf = projfile.GedaProjectFile(self.projectfolder) self.bom_reader = None self.columns = [] self.schpaths = [] self.prep_bom()
[docs] def _get_temp_schematic(self): self.schpaths = [] if not os.path.exists(self._temp_folder): os.makedirs(self._temp_folder) for schpath in self._gpf.schpaths: tschpath = os.path.join(self._temp_folder, os.path.split(schpath)[1]) shutil.copy(schpath, tschpath) self.schpaths.append(tschpath)
[docs] def generate_bom_file(self, outpath, backend=None): self._get_temp_schematic() cmd = ["gnetlist", '-g', backend, '-Oattrib_file=' + os.path.join(self.projectfolder, self._basefolder, 'attribs') ] outdir, outfile = os.path.split(outpath) idx_refdes = None idx_schfile = None intermediate_outpath = os.path.join(outdir, 'int.' + outfile) with open(intermediate_outpath, 'wb') as outf: outw = csv.writer(outf, delimiter='\t') header_written = False found_refdes = set() additional_schfiles = {} for schpath in self.schpaths: schfile = os.path.split(schpath)[1] soutpath = os.path.join(outdir, '.'.join([schfile, outfile])) scmd = cmd + ['-o', soutpath, schpath] subprocess.call(scmd, stdout=FNULL, stderr=subprocess.STDOUT,) with open(soutpath, 'rb') as sf: sr = csv.reader(sf, delimiter='\t') if not header_written: header = next(sr) + ['schfile'] outw.writerow(header) idx_refdes = header.index('refdes') idx_schfile = header.index('schfile') header_written = True else: next(sr) for row in sr: refdes = row[idx_refdes] if refdes not in found_refdes: found_refdes.add(refdes) outw.writerow(row + [schfile]) else: if refdes in additional_schfiles.keys(): additional_schfiles[refdes].append(schfile) else: additional_schfiles[refdes] = [schfile] if len(additional_schfiles.keys()): with open(intermediate_outpath, 'rb') as inf: r = csv.reader(inf, delimiter='\t') with open(outpath, 'wb') as outf: w = csv.writer(outf, delimiter='\t') for row in r: if row[idx_refdes] in additional_schfiles.keys(): files = [row[idx_schfile]] files.extend(additional_schfiles[row[idx_refdes]]) row[idx_schfile] = ';'.join(files) w.writerow(row) else: shutil.copy(intermediate_outpath, outpath) return outpath
[docs] def prep_bom(self): self.bom_reader = self.get_bom_file() self.columns = self.bom_reader.readline().rstrip().split('\t') self.line_gen = self.get_lines()
[docs] def get_lines(self): for line in self.bom_reader: yield BomLine(line, self.columns) self.cleanup()
[docs] def cleanup(self): self.bom_reader.close() if os.path.exists(self._temp_bom_path): os.remove(self._temp_bom_path) for tschpath in self.schpaths: os.remove(tschpath)
[docs]class MotifAwareBomParser(GedaBomParser): def __init__(self, projectfolder, **kwargs): super(MotifAwareBomParser, self).__init__(projectfolder, **kwargs) self._motifs = [] # self._motifconfigs = self._gpf.configsfile.configdata['motiflist'] self.motif_gen = None self._motif_policy = BomMotifPolicy(self._validation_context)
[docs] def get_motif(self, motifst): motifst = motifst.split(':')[0] for motif in self._motifs: if motif.refdes == motifst: return motif logger.info("Creating new motif : " + motifst) motif = create_motif_object(motifst) self._motifs.append(motif) return motif
[docs] def get_lines(self): for line in self.bom_reader: bomline = BomLine(line, self.columns) if bomline.data['motif'] != 'unknown': try: motif = self.get_motif(bomline.data['motif']) motif.add_element(bomline) except ValueError: e = BomMotifUnrecognizedError(self._motif_policy, bomline.data['motif'], bomline.data['refdes']) self._validation_errors.add(e) else: yield bomline self.motif_gen = self.get_motifs() self.cleanup()
[docs] def get_motifs(self): for motif in self._motifs: yield motif