# Copyright (C) 2015 Chintalagiri Shashank
#
# This file is part of Tendril.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This file is part of tendril
See the COPYING, README, and INSTALL files for more information
"""
import os
import vendors
from tendril.gedaif import gsymlib
from tendril.utils.types import currency
from tendril.utils.config import CUSTOMSDEFAULTS_FOLDER
from tendril.utils.files import yml as yaml
from tendril.utils import log
logger = log.get_logger(__name__, log.DEFAULT)
[docs]class CustomsSection(object):
def __init__(self, code, hsdict):
self._code = code
if 'name' in hsdict.keys():
self._name = hsdict['name']
else:
self._name = None
if 'idents' in hsdict.keys():
self._idents = hsdict['idents']
else:
self._idents = None
if 'folders' in hsdict.keys():
self._folders = hsdict['folders']
else:
self._folders = None
if 'desc' in hsdict.keys():
self._desc = hsdict['desc']
else:
self._desc = None
self.bcd = None
self.bcd_notif = None
self.cvd = None
self.cvd_notif = None
self.acvd = None
self.acvd_notif = None
self.cec = None
self.cec_notif = None
self.cshec = None
self.cshec_notif = None
self.cvdec = None
self.cvdec_notif = None
self.cvdshec = None
self.cvdshec_notif = None
if 'duties' in hsdict.keys():
self._load_duties(hsdict['duties'])
[docs] def _load_duties(self, dutiesdict):
self.bcd = dutiesdict['bcd']['rate']
self.bcd_notif = dutiesdict['bcd']['notification']
self.cvd = dutiesdict['cvd']['rate']
self.cvd_notif = dutiesdict['cvd']['notification']
self.acvd = dutiesdict['acvd']['rate']
self.acvd_notif = dutiesdict['acvd']['notification']
self.cec = dutiesdict['cec']['rate']
self.cec_notif = dutiesdict['cec']['notification']
self.cshec = dutiesdict['cshec']['rate']
self.cshec_notif = dutiesdict['cshec']['notification']
self.cvdec = dutiesdict['cvdec']['rate']
self.cvdec_notif = dutiesdict['cvdec']['notification']
self.cvdshec = dutiesdict['cvdshec']['rate']
self.cvdshec_notif = dutiesdict['cvdshec']['notification']
@property
def code(self):
return self._code
@property
def name(self):
return self._name
@property
def desc(self):
return self._desc
@property
def idents(self):
return self._idents
@property
def folders(self):
return self._folders
@property
def effective_rate(self):
u = 100
bcd = u * self.bcd / 100
cvd = (u + bcd) * self.cvd / 100
cvdec = cvd * self.cvdec / 100
cvdshec = cvd * self.cvdshec / 100
cshec = (bcd + cvd + cvdec + cvdshec) * self.cshec / 100
cec = (bcd + cvd + cvdec + cvdshec) * self.cec / 100
acvd = (u + bcd + cvd + cec + cshec + cvdec + cvdshec) * self.acvd / 100
return bcd + cvd + cvdec + cvdshec + cshec + cec + acvd
def __repr__(self):
return "{0:<15} {1}".format(self._code, self.name)
[docs]class CustomsClassifier(object):
def __init__(self):
self._sections = []
self._load_sections()
[docs] def _load_sections(self):
hs_codes_file = os.path.join(CUSTOMSDEFAULTS_FOLDER, 'hs_codes.yaml')
with open(hs_codes_file, 'r') as f:
data = yaml.load(f)
for section, sectiondict in data['sections'].iteritems():
self._sections.append(CustomsSection(section, sectiondict))
[docs] def hs_from_ident(self, ident):
for section in self._sections:
if section.idents is not None:
for sign in section.idents:
if sign in ident:
return section
rsign = ''
rsec = None
for section in self._sections:
if section.folders is not None:
for sign in section.folders:
if sign in gsymlib.get_symbol_folder(ident, True):
if rsign in sign:
rsec = section
rsign = sign
return rsec
hs_classifier = CustomsClassifier()
[docs]class CustomsInvoice(vendors.VendorInvoice):
def __init__(self, vendor, inv_yaml, working_folder=None):
vendor_defaults_file = os.path.join(CUSTOMSDEFAULTS_FOLDER,
vendor._name + '.yaml')
self._data = {}
if os.path.exists(vendor_defaults_file):
with open(vendor_defaults_file, 'r') as f:
vendor_defaults = yaml.load(f)
self._data = vendor_defaults.copy()
else:
logger.warning("Vendor Customs Defaults File Not Found : " +
vendor_defaults_file)
self._source_folder = os.path.split(inv_yaml)[0]
if working_folder is None:
self._working_folder = self._source_folder
else:
self._working_folder = working_folder
with open(inv_yaml, 'r') as f:
inv_data = yaml.load(f)
self._data.update(inv_data)
self._linetype = CustomsInvoiceLine
vendor.currency = currency.CurrencyDefinition(
vendor.currency.code, vendor.currency.symbol,
exchval=self._data['exchrate']
)
super(CustomsInvoice, self).__init__(
vendor, self._data['invoice_no'], self._data['invoice_date']
)
self.freight = 0
self.insurance_pc = 0
self._includes_freight = False
self._added_insurance = False
self._process_other_costs()
hs_codes_file = os.path.join(CUSTOMSDEFAULTS_FOLDER, 'hs_codes.yaml')
with open(hs_codes_file, 'r') as f:
self._hs_codes = yaml.load(f)
self._sections = []
@property
def given_data(self):
return self._data
[docs] def _process_other_costs(self):
for k, v in self._data['costs_not_included'].iteritems():
if v is False:
self._data['costs_not_included'][k] = 'None'
elif v == 'INCL':
self._data['costs_not_included'][k] = 'None, ' \
'included in Invoice'
elif v == 'LISTED':
self._data['costs_not_included'][k] = 'Listed in Invoice'
else:
logger.warning(
"Unrecognized Other Costs definition for " + k + " : " + v
)
if self._data['costs_not_included']['FREIGHT'] == 'None, included in Invoice': # noqa
self.freight = currency.CurrencyValue(
float(self._data['shipping_cost_incl']),
self._vendor.currency
)
self._data['costs_not_included']['FREIGHT'] += " ({0})".format(self.freight.source_string) # noqa
self._includes_freight = True
if self._data['costs_not_included']['FREIGHT'] == 'Listed in Invoice':
self.freight = currency.CurrencyValue(
float(self._data['shipping_cost_listed']),
self._vendor.currency
)
self._data['costs_not_included']['FREIGHT'] = "{0} (as listed in the invoice)".format(self.freight.source_string) # noqa
self._includes_freight = True
if 'insurance_pc' in self._data.keys():
self.insurance_pc = float(self._data['insurance_pc'])
if self.insurance_pc > 0:
self._added_insurance = True
self._data['costs_not_included']['INSURANCE'] = "{0} (@{1}% {2})".format(self.insurance.source_string, self.insurance_pc, self._data['insurance_note']) # noqa
if 'handling_pc' in self._data.keys():
self.handling_pc = float(self._data['handling_pc'])
if self.handling_pc > 0:
self._added_handling = True
self._data['costs_not_included']['LANDING'] = "{0} (@{1}% {2})".format(self.landing.source_string, self.handling_pc, self._data['handling_note']) # noqa
@property
def insurance(self):
if self.insurance_pc is not None:
return self.extendedtotal * self.insurance_pc / 100
else:
return self.extendedtotal * 0
@property
def includes_freight(self):
return self._includes_freight
@property
def landing(self):
return self.cif * self.handling_pc / 100
@property
def cif(self):
return self.extendedtotal + self.freight + self.insurance
@property
def added_insurance(self):
try:
return self._added_insurance
except AttributeError:
return False
@property
def added_handling(self):
try:
return self._added_handling
except AttributeError:
return False
@property
def source_folder(self):
return self._source_folder
@property
def working_folder(self):
return self._working_folder
@property
def source_files(self):
rval = [(os.path.join(self._source_folder, 'inv_data.yaml'), 'INVOICE-DATA-YAML'), # noqa
(os.path.join(self._source_folder, self._data['invoice_file']), 'INVOICE-FILE-CSV')] # noqa
return rval
@property
def idxs(self):
return [x.idx for x in self._lines]
[docs] def _acquire_lines(self):
raise NotImplementedError
@property
def hssections(self):
rval = []
for line in self._lines:
if line.hs_section is None:
continue
if line.hs_section not in rval:
rval.append(line.hs_section)
return sorted(rval, key=lambda x: x.code)
[docs] def getsection_lines(self, hssection):
rval = []
for line in self._lines:
if line.hs_section == hssection:
rval.append(line)
return rval
[docs] def getsection_idxs(self, hssection):
rval = []
for line in self.getsection_lines(hssection):
rval.append(line.idx)
return rval
[docs] def getsection_qty(self, hssection):
rval = 0
for line in self.getsection_lines(hssection):
rval += line.qty
return rval
[docs] def getsection_assessabletotal(self, hssection):
rval = 0
for line in self.getsection_lines(hssection):
rval += line.assessableprice
return rval
@property
def unclassified(self):
rval = []
for line in self._lines:
if line.hs_section is None:
rval.append(line)
return rval
@property
def assessabletotal(self):
return sum([x.assessableprice for x in self._lines])
@property
def bcd(self):
return sum([x.bcd.value for x in self.lines])
@property
def cvd(self):
return sum([x.cvd.value for x in self.lines])
@property
def cec(self):
return sum([x.cec.value for x in self.lines])
@property
def cshec(self):
return sum([x.cshec.value for x in self.lines])
@property
def cvdec(self):
return sum([x.cvdec.value for x in self.lines])
@property
def cvdshec(self):
return sum([x.cvdshec.value for x in self.lines])
@property
def acvd(self):
return sum([x.acvd.value for x in self.lines])
@property
def dutypayable(self):
return self.bcd + self.cvd + self.cec + self.cshec + self.cvdec + self.cvdshec + self.acvd # noqa
@property
def effectiverate_cif(self):
return (self.dutypayable / self.assessabletotal) * 100
@property
def effectiverate_fob(self):
return (self.dutypayable / self.extendedtotal) * 100
[docs]class DutyComponent(object):
def __init__(self, title, rate, notification, value):
self.title = title
self.rate = rate
self.notification = notification
self.value = value
def __repr__(self):
return "{0:>50} (@{1:>5}%) : {2:>13} Notification : {3}".format(
self.title, self.rate, self.value.native_string, self.notification
)
[docs]class CustomsInvoiceLine(vendors.VendorInvoiceLine):
def __init__(self, invoice, ident, vpno, unitp, qty, idx=None, desc=None):
if idx is None:
idx = max(invoice.idxs) + 1
super(CustomsInvoiceLine, self).__init__(
invoice, ident, vpno, unitp, qty, desc=desc
)
if idx in invoice.idxs:
raise ValueError
self._idx = idx
@property
def dutypayable(self):
return self.bcd.value + self.cvd.value + self.cec.value + \
self.cshec.value + self.cvdec.value + self.cvdshec.value + \
self.acvd.value
@property
def idx(self):
return self._idx
@property
def hs_section(self):
try:
hs_section = hs_classifier.hs_from_ident(self.ident)
except ValueError:
hs_section = None
if hs_section is None:
logger.warning("Could not classify : " + self.ident)
return hs_section
@property
def bcd(self):
return DutyComponent(
"BCD", self.hs_section.bcd, self.hs_section.bcd_notif,
self.assessableprice * self.hs_section.bcd / 100.0
)
@property
def cvd(self):
return DutyComponent(
"CVD", self.hs_section.cvd, self.hs_section.cvd_notif,
(self.assessableprice + self.bcd.value) * self.hs_section.cvd / 100.0 # noqa
)
@property
def cec(self):
return DutyComponent(
"C EC", self.hs_section.cec, self.hs_section.cec_notif,
(self.bcd.value + self.cvd.value + self.cvdec.value + self.cvdshec.value) * self.hs_section.cec / 100.0 # noqa
)
@property
def cshec(self):
return DutyComponent(
"C SHEC", self.hs_section.cshec, self.hs_section.cshec_notif,
(self.bcd.value + self.cvd.value + self.cvdec.value + self.cvdshec.value) * self.hs_section.cshec / 100.0 # noqa
)
@property
def cvdec(self):
return DutyComponent(
"CVD EC", self.hs_section.cvdec, self.hs_section.cvdec_notif,
self.cvd.value * self.hs_section.cvdec / 100.0
)
@property
def cvdshec(self):
return DutyComponent(
"CVD SHEC", self.hs_section.cvdshec,
self.hs_section.cvdshec_notif,
self.cvd.value * self.hs_section.cvdshec / 100.0
)
@property
def acvd(self):
return DutyComponent(
"SAD", self.hs_section.acvd, self.hs_section.acvd_notif,
(self.assessableprice + self.bcd.value + self.cvd.value + self.cec.value + self.cshec.value + self.cvdec.value + self.cvdshec.value) * self.hs_section.acvd / 100.0 # noqa
)
@property
def invoice_fraction(self):
return self.extendedprice / self._invoice.extendedtotal
@property
def freight(self):
if self._invoice.freight is None:
return 0
return self._invoice.freight * self.invoice_fraction
@property
def insurance(self):
if self._invoice.insurance_pc is None:
return 0
return self.extendedprice * (self._invoice.insurance_pc / 100.0)
@property
def cifprice(self):
return self.extendedprice + self.freight + self.insurance
@property
def handling(self):
if self._invoice.handling_pc is None:
return 0
return self.cifprice * (self._invoice.handling_pc / 100.0)
@property
def assessableprice(self):
rval = self.extendedprice
rval += self.freight
rval += self.insurance
rval += self.handling
return rval
[docs] def print_duties(self):
print "{0:>60} : {2:>13} {3:>13}".format("Extended Value", '', self.extendedprice.native_string, self.extendedprice.source_string) # noqa
print "{0:>60} : {2:>13}".format("Freight Value", '', self.freight, '') # noqa
print "{0:>60} : {2:>13}".format("Insurance Value", '', self.insurance, '') # noqa
print "{0:>60} : {2:>13}".format("Handling Value", '', self.handling, '') # noqa
print "{0:>60} : {2:>13}".format("Assessable Value", '', self.assessableprice, '') # noqa
print self.bcd
print self.cvd
print self.cec
print self.cshec
print self.cvdec
print self.cvdshec
print self.acvd
def __repr__(self):
if self.hs_section is not None:
return "{0:<3} {1:<40} {2:<35} {3:>10} {4:>15}".format(self.idx, self._ident, self._desc, self.hs_section.code, self.hs_section.name) # noqa
else:
return "{0:<3} {1:<40} {2:<35} {3:>10} {4:>15}".format(self.idx, self._ident, self._desc, '----------', 'Unclassified') # noqa