# Copyright (C) 2015 Chintalagiri Shashank
#
# This file is part of Tendril.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
EntityHub Maps Module documentation (:mod:`entityhub.maps`)
===========================================================
"""
import os
import csv
from tendril.utils.config import VENDOR_MAP_FOLDER
from tendril.utils.fsutils import VersionedOutputFile
from tendril.utils.fsutils import get_file_mtime
from tendril.utils import log
logger = log.get_logger(__name__, log.INFO)
[docs]class MapFileBase(object):
def __init__(self, mappath, name=None):
self._mappath = mappath
if name is None:
self._name = os.path.splitext(os.path.split(mappath)[1])[0]
else:
self._name = name
[docs] def _dump_mapfile(self):
outpath = os.path.join(VENDOR_MAP_FOLDER, self._mappath)
outf = VersionedOutputFile(outpath)
outw = csv.writer(outf)
outw.writerow(('Canonical', 'Strategy', 'Lparts'))
for ident in self.get_idents():
outw.writerow(
[ident, (self.get_strategy(ident) or '')] +
self.get_upartnos(ident) +
['@AG@' + x for x in self.get_apartnos(ident)]
)
outf.close()
logger.info("Dumped {0} Vendor Map to File : {1}"
"".format(self._name, outpath))
[docs] def get_idents(self):
raise NotImplementedError
[docs] def get_map_time(self, canonical):
raise NotImplementedError
[docs] def get_upartnos(self, canonical):
raise NotImplementedError
[docs] def get_apartnos(self, canonical):
raise NotImplementedError
[docs] def get_partnos(self, canonical):
uparts = self.get_upartnos(canonical)
if len(uparts) > 0:
return uparts
aparts = self.get_apartnos(canonical)
if len(aparts) > 0:
return aparts
return []
[docs] def get_all_partnos(self, canonical):
return self.get_upartnos(canonical) + self.get_apartnos(canonical)
[docs] def get_strategy(self, canonical):
raise NotImplementedError
[docs] def get_canonical(self, partno):
raise NotImplementedError
[docs] def get_user_map(self):
raise NotImplementedError
[docs] def length(self):
raise NotImplementedError
[docs] def remove_apartno(self, partno, canonical):
raise NotImplementedError
[docs]class MapFile(MapFileBase):
# Deprecated.
def __init__(self, mappath):
super(MapFile, self).__init__(mappath)
self._map = {}
self._umap = {}
self._strategy = {}
self._len = 0
self._mappath = mappath
self._load_mapfile()
[docs] def _load_mapfile(self):
with open(self._mappath) as f:
rdr = csv.reader(f)
for row in rdr:
if row[0] == "Canonical":
continue
ident = row[0]
self._strategy[ident] = row[1]
vals = row[2:]
self._map[ident] = []
self._umap[ident] = []
for elem in vals:
assert isinstance(elem, str)
self._len += 1
if elem.startswith('@AG@'):
elem = elem[4:]
self._map[ident].append(elem)
else:
self._umap[ident].append(elem)
[docs] def get_idents(self):
for key in sorted(self._map.keys()):
if len(self._map[key]) or len(self._umap[key]):
yield key
[docs] def get_map_time(self, canonical):
return get_file_mtime(self._mappath)
[docs] def get_upartnos(self, canonical):
return self._umap[canonical]
[docs] def get_apartnos(self, canonical):
return self._map[canonical]
[docs] def get_strategy(self, canonical):
if canonical not in self.get_idents():
return 'NOTRECOG'
return self._strategy[canonical]
[docs] def get_canonical(self, partno):
if len(self._umap) > 0:
for (k, v) in self._umap.iteritems():
if partno in v:
return k
if len(self._map) > 0:
for (k, v) in self._map.iteritems():
if partno in v:
return k
return None
[docs] def get_user_map(self):
return self._umap
[docs] def length(self):
return self._len