From 264f31d924c39aca6a903f2392283ecacd8ce5ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Valyi?= Date: Sun, 26 May 2024 19:58:30 -0300 Subject: [PATCH] [REF] l10n_br_sped_base: same logic, better code --- l10n_br_sped_base/models/sped_declaration.py | 50 ++-- l10n_br_sped_base/models/sped_mixin.py | 226 ++++++++++++------- 2 files changed, 162 insertions(+), 114 deletions(-) diff --git a/l10n_br_sped_base/models/sped_declaration.py b/l10n_br_sped_base/models/sped_declaration.py index 39f8685a23dd..f91da5afc3dc 100644 --- a/l10n_br_sped_base/models/sped_declaration.py +++ b/l10n_br_sped_base/models/sped_declaration.py @@ -27,19 +27,6 @@ def _get_default_dt_fin(self): dt = fields.Date.context_today(self) return dt.replace(year=dt.year + 1) - def name_get(self): - res = [] - for declaration in self: - name = "%s-%s-%s" % ( - declaration.DT_FIN.month - if declaration.DT_FIN.month > 9 - else "0" + str(declaration.DT_FIN.month), - declaration.DT_FIN.year, - declaration.company_id.name.replace(" ", "_"), - ) - res.append((declaration.id, name)) - return res - company_id = fields.Many2one( comodel_name="res.company", string="Company", @@ -71,10 +58,20 @@ def name_get(self): sped_attachment_id = fields.Many2one("ir.attachment", string="Sped Attachment") @api.model - def _get_kind(self): + def _get_kind(self) -> str: return self._name.replace(".0000", "").split(".")[-1] + def name_get(self): + return [ + ( + declaration.id, + f"{declaration.DT_FIN:%m-%Y}-{declaration.company_id.name.replace(' ', '_')}", + ) + for declaration in self + ] + def button_populate_sped_from_odoo(self): + """Populate SPED registers from Odoo.""" # TODO add cron pulling from Odoo for open declarations log_msg = StringIO() log_msg.write("

%s

" % (_("Pulled from Odoo:"),)) @@ -104,10 +101,11 @@ def button_draft(self): self.state = "draft" def button_create_sped_file(self): + """Generate and attach the SPED file.""" self.ensure_one() sped_txt = self._generate_sped_text() kind = self._get_kind() - file_name = kind + "-" + self.name_get()[0][1] + ".txt" + file_name = f"{kind}-{self.name_get()[0][1]}.txt" self.sped_attachment_id = self.env["ir.attachment"].create( { "name": file_name, @@ -133,6 +131,7 @@ def onchange_company_id(self): @api.model def _append_view_header(self, form): + """Append custom buttons to the form view header.""" header = E.header() header.append( E.button( @@ -190,6 +189,7 @@ def _append_view_header(self, form): @api.model def _append_view_footer(self, form): + """Append the chatter elements to the form view footer.""" div = E.div( name="message_follower_ids", ) @@ -200,18 +200,12 @@ def _append_view_footer(self, form): @api.model def _append_top_view_elements(self, group, inline=False): + """Append top-level elements to the form view.""" group.append(E.field(name="company_id")) group.append(E.separator(colspan="4")) def _generate_sped_text(self, version=None): - """ - Generate SPED text from Odoo declaration records. - - :param declaration: Odoo declaration record. - :param version: SPED layout version (optional). - :return: SPED text as a string. - """ - + """Generate SPED text from Odoo declaration records.""" self.ensure_one() kind = self._get_kind() if version is None: @@ -219,7 +213,6 @@ def _generate_sped_text(self, version=None): top_register_classes = self._get_top_registers(kind) sped = StringIO() last_bloco = None - bloco = None line_total = 0 # mutable register line_count https://stackoverflow.com/a/15148557 line_count = [0] @@ -260,12 +253,9 @@ def _generate_sped_text(self, version=None): last_bloco = bloco # close the last register: - if ( - kind == "ecf" - ): # WTF why is it different for ecf?? You kidding me? or is it an error? - sped.write("\n|" + bloco + "099|%s|" % (line_count[0] + 1,)) - else: - sped.write("\n|" + bloco + "990|%s|" % (line_count[0] + 1,)) + # closing_register = "099" if kind == "ecf" else "990" + # sped.write(f"\n|{bloco}{closing_register}|{line_count[0] + 1}|") + sped.write(f"\n|{bloco}990|{line_count[0] + 1}|") # totals: sped.write("\n|9001|0|") diff --git a/l10n_br_sped_base/models/sped_mixin.py b/l10n_br_sped_base/models/sped_mixin.py index a1fe02e8dd63..bae11c81d073 100644 --- a/l10n_br_sped_base/models/sped_mixin.py +++ b/l10n_br_sped_base/models/sped_mixin.py @@ -98,8 +98,8 @@ def _odoo_domain(self): @api.model def _get_alphanum_sequence(self, model_name): """ - Used to order the SPED register in the same order - as in the SPED layout (the register name alone won't cut it) + Helper method to get alphanumeric sequence for sorting registers. + (the register name alone won't cut it) """ register_code = model_name[-4:] bloco_key = register_code[0] @@ -115,8 +115,12 @@ def _get_alphanum_sequence(self, model_name): @api.model def _get_top_registers(self, kind): """ - Get the "blocos" registers + Get the top register classes, the "blocos", for a specific kind. + + :param kind: Type of SPED register + :return: List of top register classes """ + register_model_names = list( filter(lambda x: "l10n_br_sped.%s" % (kind,) in x, self.env.keys()) ) @@ -305,7 +309,11 @@ def _append_top_view_elements(self, group, inline=False): group.append( E.field( name="declaration_id", - attrs="{'readonly': [('state', 'not in', ['draft']), ('declaration_id', '!=', False)]}", + attrs=( + "{'readonly': " + "[('state', 'not in', ['draft']), " + "('declaration_id', '!=', False)]}" + ), invisible="1" if inline else "0", ) ) @@ -314,6 +322,12 @@ def _append_top_view_elements(self, group, inline=False): @api.model def _flush_registers(self, kind, declaration_id=None): + """ + Flush the SPED registers for a specific kind and declaration. + + :param kind: Type of SPED register + :param declaration_id: Declaration ID to flush + """ if declaration_id: domain = [("declaration_id", "=", declaration_id)] else: @@ -335,9 +349,16 @@ def _import_file(self, filename, kind, version=None, declaration=None): :return: Declaration record created in Odoo. examples: - env["l10n_br_sped.mixin"]._import_file("/odoo/links/l10n_br_sped/demo/demo_ecd.txt", "ecd") - env["l10n_br_sped.mixin"]._import_file("/odoo/links/l10n_br_sped/demo/demo_ecd.txt", "ecd") - env["l10n_br_sped.mixin"]._import_file("/odoo/links/l10n_br_sped/demo/demo_efd_pis_cofins_multi.txt", "efd_pis_cofins") + env["l10n_br_sped.mixin"]._import_file( + "/odoo/links/l10n_br_sped/demo/demo_ecd.txt", "ecd" + ) + env["l10n_br_sped.mixin"]._import_file( + "/odoo/links/l10n_br_sped/demo/demo_ecd.txt", "ecd" + ) + env["l10n_br_sped.mixin"]._import_file( + "/odoo/links/l10n_br_sped/demo/demo_efd_pis_cofins_multi.txt", + "efd_pis_cofins", + ) """ if version is None: version = LAYOUT_VERSIONS[kind] @@ -349,6 +370,7 @@ def _import_file(self, filename, kind, version=None, declaration=None): level_2_registers = defaultdict(list) for line in [line.rstrip("\r\n") for line in spedfile]: reg_code = line.split("|")[1] + if declaration is not None and reg_code == "0000": continue register_class = self.env.get( @@ -359,6 +381,7 @@ def _import_file(self, filename, kind, version=None, declaration=None): ), None, ) + if register_class is None: if "001" in reg_code or "990" in reg_code or reg_code == "9999": continue @@ -402,7 +425,7 @@ def _import_file(self, filename, kind, version=None, declaration=None): log_msg = StringIO() log_msg.write("

%s

" % (_("Imported from file:"),)) - for code, registers in level_2_registers.items(): + for _code, registers in level_2_registers.items(): registers[0]._log_chatter_sped_item(log_msg, 2, registers) declaration.message_post(body=log_msg.getvalue()) @@ -449,89 +472,125 @@ def _read_register_line(self, line, version): register_vals[fname] = val return register_vals - # flake8: noqa: C901 def _generate_register_text(self, sped, version, line_count, count_by_register): """ - Recursively generate the SPED text of the registers. + Recursively generate the SPED text for a register. + + :param sped: StringIO object to write SPED text + :param version: Layout version + :param line_count: Line count list + :param count_by_register: Dictionary to count registers """ - code = self._name[-4:] + code = self._name[-4:].upper() register_spec_model = self._name.replace( - ".%s" % (code), ".%s.%s" % (version, code) + f".{code.lower()}", f".{version}.{code.lower()}" ) - code = code.upper() register_spec = self.env[register_spec_model] - keys = [i[0] for i in register_spec._fields.items()] - if ( - not keys - ): # happens with ECD I550, I555 and I555 with "LEIAUTE PARAMETRIZÁVEL" - keys = ["id"] # BUT should not happen! + keys = [k for k, v in register_spec._fields.items()] or ["id"] + if len(self): count_by_register[code] += len(self) - if code == "0000": - line_start = "" - else: - line_start = "\n" for rec in self: - sped.write("%s|%s|" % (line_start, code)) - line_count[0] += 1 - children_groups = [] - should_break_next = False - vals = {k: getattr(rec, k) for k in keys} - for fname, value in vals.items(): - if register_spec._fields[fname].type == "one2many" and fname.startswith( - "reg_" - ): - children_groups.append(value) - should_break_next = True - continue # we assume it's the last register specific field - elif should_break_next: # if the register has a parent but no children - break - elif not fname.isupper(): # not a SPED field - continue + self._write_register_line(sped, code, rec, keys, line_count, register_spec) + self._process_children( + sped, version, line_count, count_by_register, rec, keys, register_spec + ) + return sped - # Handle different field types - if self._fields[fname].type == "date": - val = value.strftime("%d%m%Y") if value else "" - elif self._fields[fname].type == "char": - val = str(value) if value else "" - elif self._fields[fname].type == "selection": - val = value if value else "" - elif self._fields[fname].type == "integer": - if value == 0: - val = "" - else: - val = str(value) - elif self._fields[fname].type == "float": - if float_is_zero(value % 1, 6): # ex: aliquota ICMS - val = str(int(value)) - else: - val = str(value).replace(".", ",") - elif self._fields[fname].type == "monetary": - if float_is_zero(value, precision_digits=8): - val = "" - elif float_is_zero(value % 1, precision_digits=8): - val = str(int(value)) - else: - val = str(value) + def _write_register_line(self, sped, code, rec, keys, line_count, register_spec): + """ + Write a line for the register. + + :param sped: StringIO object to write SPED text + :param code: Register code + :param rec: Record + :param keys: List of field keys + :param line_count: Line count list + :param register_spec: Register specification model + """ + line_start = "" if code == "0000" else "\n" + sped.write(f"{line_start}|{code}|") + line_count[0] += 1 - else: - val = str(value) - sped.write(val + "|") + for fname, value in {k: getattr(rec, k) for k in keys}.items(): + if not fname.isupper(): + continue + + val = self._format_field_value(register_spec, fname, value) + sped.write(f"{val}|") - children_groups = sorted( - children_groups, key=lambda children: children._name + def _format_field_value(self, register_spec, fname, value): + """ + Format the field value based on its type. + + :param register_spec: Register specification model + :param fname: Field name + :param value: Field value + :return: Formatted field value as string + """ + field = register_spec._fields[fname] + if field.type == "date": + return value.strftime("%d%m%Y") if value else "" + elif field.type == "char" or field.type == "selection": + return str(value) if value else "" + elif field.type == "integer": + return "" if value == 0 else str(value) + elif field.type == "float": + return ( + str(int(value)) + if float_is_zero(value % 1, 6) + else str(round(value, 6)).replace(".", ",") + ) + elif field.type == "monetary": + return ( + "" + if float_is_zero(value, precision_digits=8) + else str(int(value)) + if float_is_zero(value % 1, precision_digits=8) + else str(value) + ) + else: + return str(value) + + def _process_children( + self, sped, version, line_count, count_by_register, rec, keys, register_spec + ): + """ + Process child registers recursively. + + :param sped: StringIO object to write SPED text + :param version: Layout version + :param line_count: Line count list + :param count_by_register: Dictionary to count registers + :param rec: Record + :param keys: List of field keys + :param register_spec: Register specification model + """ + children_groups = [ + getattr(rec, fname) + for fname in keys + if register_spec._fields[fname].type == "one2many" + and fname.startswith("reg_") + ] + for children in sorted(children_groups, key=lambda c: c._name): + children._generate_register_text( + sped, version, line_count, count_by_register ) - for children in children_groups: - children._generate_register_text( - sped, version, line_count, count_by_register - ) - return sped @api.model def _pull_records_from_odoo( self, kind, level, parent_register=None, parent_record=None, log_msg=None ): + """ + Pull records from Odoo and populate the SPED registers. + + :param kind: Type of SPED register + :param level: Depth level for pulling records + :param parent_register: Parent register if any + :param parent_record: Parent record if any + :param log_msg: StringIO object for logging + """ declaration = self._context["declaration"] children = [ @@ -553,10 +612,11 @@ def _pull_records_from_odoo( records = self.env[self._odoo_model].search( self._odoo_domain(parent_record, declaration) ) + elif hasattr(self, "_odoo_query"): self._cr.execute(*self._odoo_query(parent_record, declaration)) - records = self._cr.dictfetchall() + elif hasattr(self, "_map_from_odoo"): # in this case we will generate a register without any # specific Odoo record. Example: ECD I010 @@ -574,6 +634,7 @@ def _pull_records_from_odoo( ) self._log_chatter_sped_item(log_msg, level, [register]) return + else: self._log_chatter_sped_item(log_msg, level) return @@ -605,17 +666,14 @@ def _pull_records_from_odoo( @api.model def _log_chatter_sped_item(self, log_msg, level, records=None): - actions = self.env["ir.actions.act_window"].search( - [("res_model", "=", self._name)] + action = self.env["ir.actions.act_window"].search( + [("res_model", "=", self._name)], limit=1 ) - if actions: - body = """ -
%s%s%s
- """ % ( - " " * level * 4, - actions[0].id, - self._name[-4:].upper(), - records and " (%s records)" % (str(len(records)),) or "", + if action: + record_count = f" ({len(records)} records)" if records else "" + body = ( + f"
{' ' * level * 4}" + f'' + f"{self._name[-4:].upper()}{record_count}
" ) log_msg.write(body)