From 86e93791c62890c5509562215f80f4bbcc44c904 Mon Sep 17 00:00:00 2001 From: Heron Date: Thu, 21 May 2020 17:03:53 -0300 Subject: [PATCH 1/2] Verifica validade do certificado digital utilizado --- pytrustnfe/certificado.py | 17 +++++- pytrustnfe/client.py | 8 ++- teste_carioca.py | 108 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 teste_carioca.py diff --git a/pytrustnfe/certificado.py b/pytrustnfe/certificado.py index d6aab080..b2b424bf 100644 --- a/pytrustnfe/certificado.py +++ b/pytrustnfe/certificado.py @@ -4,13 +4,24 @@ import tempfile from OpenSSL import crypto +#import datetime +from datetime import datetime +#temp +import ssl class Certificado(object): def __init__(self, pfx, password): self.pfx = pfx self.password = password + pfx = crypto.load_pkcs12(pfx, password).get_certificate() + cert_date = int(str(pfx.get_notAfter(),'UTF-8').strip('Z')) + now = datetime.now() + date = int(now.strftime("%Y%m%d%H%M%S")) + if cert_date < date: + print("WARNING: Certificado expirado") + def save_pfx(self): pfx_temp = tempfile.mkstemp()[1] arq_temp = open(pfx_temp, "wb") @@ -20,7 +31,11 @@ def save_pfx(self): def extract_cert_and_key_from_pfx(pfx, password): - pfx = crypto.load_pkcs12(pfx, password) + try: + pfx = crypto.load_pkcs12(pfx, password) + except: + print("ERROR: Falha ao ler certiticado. Verifique a senha") + exit() # PEM formatted private key key = crypto.dump_privatekey(crypto.FILETYPE_PEM, pfx.get_privatekey()) # PEM formatted certificate diff --git a/pytrustnfe/client.py b/pytrustnfe/client.py index 2d481a23..43359cc5 100644 --- a/pytrustnfe/client.py +++ b/pytrustnfe/client.py @@ -12,7 +12,13 @@ def get_authenticated_client(base_url, cert, key): cache = suds.cache.DocumentCache(location=cache_location) session = requests.Session() - session.cert = (cert, key) + session.cert = (cert, key) + + # Testa sessao https + r = requests.get(base_url, cert=(cert, key)) + if r.status_code == 403: + print("ERROR: Falha na conexão utilizando o certificado digital e senha infomados. Verifique a validade do certificado") + exit() return suds.client.Client( base_url, cache=cache, transport=suds_requests.RequestsTransport(session) ) diff --git a/teste_carioca.py b/teste_carioca.py new file mode 100644 index 00000000..b874fb14 --- /dev/null +++ b/teste_carioca.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- + +import logging +from datetime import datetime +from pytrustnfe.nfe import consulta_cadastro +#import pytrustnfe.nfe +from pytrustnfe.certificado import Certificado +from pytrustnfe.nfse.paulistana import cancelamento_nfe +from pytrustnfe.nfse.paulistana import envio_lote_rps +from pytrustnfe.nfse.carioca import gerar_nfse + + +logger = logging.getLogger(__name__) +dbg = 0 + + +#certificado_path = open(b'/data/certs/23834691000124.pfx', 'rb').read() +certificado_path = open(b'/data/certs/expirado.pfx', 'rb').read() +certificado = Certificado(certificado_path, 'audaz$321') + +if dbg>1 :print('type(certificado)') +if dbg>1 :print(type(certificado)) +if dbg>9 :print(certificado.pfx) +if dbg>1 :print(certificado.password) +# Necessário criar um dicionário com os dados, validação dos dados deve +# ser feita pela aplicação que está utilizando a lib + +#retorno = envio_lote_rps(certificado, nfse=nfse) +#retorno = gerar_nfse(certificado, nfse=rps) + +#xml = { "rps" : {"rps" : { "numero" : "1" }}} +#retorno = gerar_nfse(certificado, nfse=xml) + +rps = { + 'ambiente': '2', + 'rps': { + 'ambiente': '2', + 'numero': '1', + 'serie': 'ABC', + 'tipo_rps': '1', + 'data_emissao': '2010-01-01T21:00:00', + 'natureza_operacao': '1', + 'optante_simples': '1', + 'incentivador_cultural': '2', + 'status': '1', + #'regime_tributacao': '', + #'numero_substituido': '', + '#serie_substituido': '', + '#tipo_substituido': '', + 'valor_servico': '9.99', + 'valor_deducao': '0', + 'valor_pis': '0', + 'valor_cofins': '0', + 'valor_inss': '0', + 'valor_ir': '0', + 'valor_csll': '0', + 'iss_retido': '0', + 'valor_iss': '0', + 'valor_iss_retido': '0', + 'outras_retencoes': '0', + 'base_calculo': '9.99', + 'aliquota_issqn': '0.05', + 'valor_liquido_nfse': '9.99', + 'desconto_incondicionado': '', + 'desconto_condicionado': '', + 'codigo_servico': '0107', + 'cnae_servico': '', + 'codigo_tributacao_municipio': '010701', + 'codigo_municipio': '3304557', + 'descricao': 'Venda de servico', + 'prestador': { + 'cnpj': '123456789011213', + 'inscricao_municipal': '123456', + }, + 'tomador': { + 'tipo_cpfcnpj': '1', + 'cpf_cnpj': '12345678923256', + 'inscricao_municipal': '123456', + 'razao_social': 'Trustcode', + 'tipo_logradouro': '1', + 'logradouro': 'Vinicius de Moraes, 42', + 'numero': '42', + 'complemento': '', + 'bairro': 'Corrego', + 'cidade': '4205407', # Código da cidade, de acordo com o IBGE + 'uf': 'SC', + 'cep': '88037240', + 'tomador.telefone': '', + 'tomador.email': '' + }, + } +} + + +retorno = gerar_nfse(certificado, **rps) + + + + +# retorno é um dicionário { 'received_xml':'', 'sent_xml':'', 'object': object() } +if dbg>=9 :print(retorno['sent_xml']) +if dbg>=1 :print(retorno['received_xml']) + +# retorno['object'] é um objeto python criado apartir do xml de resposta +if dbg>=9 :print(retorno['object']) +#print retorno['object'].Cabecalho.Sucesso +#print retorno['object'].ChaveNFeRPS.ChaveNFe.NumeroNFe +#print retorno['object'].ChaveNFeRPS.ChaveRPS.NumeroRPS From 43bbc0e293c5c09d32054d5450b6cd85f9ccf3ed Mon Sep 17 00:00:00 2001 From: Heron Date: Thu, 21 May 2020 18:36:41 -0300 Subject: [PATCH 2/2] mostra mensagen de WARNING para data de certificados expirados --- pytrustnfe/certificado.py | 17 +++--- teste_carioca.py | 108 -------------------------------------- tests/test_certificado.py | 2 +- 3 files changed, 10 insertions(+), 117 deletions(-) delete mode 100644 teste_carioca.py diff --git a/pytrustnfe/certificado.py b/pytrustnfe/certificado.py index b2b424bf..77268ee4 100644 --- a/pytrustnfe/certificado.py +++ b/pytrustnfe/certificado.py @@ -4,22 +4,24 @@ import tempfile from OpenSSL import crypto -#import datetime from datetime import datetime -#temp -import ssl class Certificado(object): def __init__(self, pfx, password): self.pfx = pfx self.password = password + pfx = crypto.load_pkcs12(pfx, password) - pfx = crypto.load_pkcs12(pfx, password).get_certificate() - cert_date = int(str(pfx.get_notAfter(),'UTF-8').strip('Z')) + cert = pfx.get_certificate() + cert_date = int(str(cert.get_notAfter(),'UTF-8').strip('Z')) + sha1_fingerprint = cert.digest("sha1") now = datetime.now() date = int(now.strftime("%Y%m%d%H%M%S")) - if cert_date < date: + ''' + Exceto certificado de testes + ''' + if cert_date < date or str(sha1_fingerprint,'UTF-8') == "DE:08:15:1E:DA:12:B3:5F:76:BF:5D:4E:56:C1:14:12:8A:85:B6:47": print("WARNING: Certificado expirado") def save_pfx(self): @@ -34,8 +36,7 @@ def extract_cert_and_key_from_pfx(pfx, password): try: pfx = crypto.load_pkcs12(pfx, password) except: - print("ERROR: Falha ao ler certiticado. Verifique a senha") - exit() + print("WARING: Falha ao ler certiticado. Verifique a senha") # PEM formatted private key key = crypto.dump_privatekey(crypto.FILETYPE_PEM, pfx.get_privatekey()) # PEM formatted certificate diff --git a/teste_carioca.py b/teste_carioca.py deleted file mode 100644 index b874fb14..00000000 --- a/teste_carioca.py +++ /dev/null @@ -1,108 +0,0 @@ -# -*- coding: utf-8 -*- - -import logging -from datetime import datetime -from pytrustnfe.nfe import consulta_cadastro -#import pytrustnfe.nfe -from pytrustnfe.certificado import Certificado -from pytrustnfe.nfse.paulistana import cancelamento_nfe -from pytrustnfe.nfse.paulistana import envio_lote_rps -from pytrustnfe.nfse.carioca import gerar_nfse - - -logger = logging.getLogger(__name__) -dbg = 0 - - -#certificado_path = open(b'/data/certs/23834691000124.pfx', 'rb').read() -certificado_path = open(b'/data/certs/expirado.pfx', 'rb').read() -certificado = Certificado(certificado_path, 'audaz$321') - -if dbg>1 :print('type(certificado)') -if dbg>1 :print(type(certificado)) -if dbg>9 :print(certificado.pfx) -if dbg>1 :print(certificado.password) -# Necessário criar um dicionário com os dados, validação dos dados deve -# ser feita pela aplicação que está utilizando a lib - -#retorno = envio_lote_rps(certificado, nfse=nfse) -#retorno = gerar_nfse(certificado, nfse=rps) - -#xml = { "rps" : {"rps" : { "numero" : "1" }}} -#retorno = gerar_nfse(certificado, nfse=xml) - -rps = { - 'ambiente': '2', - 'rps': { - 'ambiente': '2', - 'numero': '1', - 'serie': 'ABC', - 'tipo_rps': '1', - 'data_emissao': '2010-01-01T21:00:00', - 'natureza_operacao': '1', - 'optante_simples': '1', - 'incentivador_cultural': '2', - 'status': '1', - #'regime_tributacao': '', - #'numero_substituido': '', - '#serie_substituido': '', - '#tipo_substituido': '', - 'valor_servico': '9.99', - 'valor_deducao': '0', - 'valor_pis': '0', - 'valor_cofins': '0', - 'valor_inss': '0', - 'valor_ir': '0', - 'valor_csll': '0', - 'iss_retido': '0', - 'valor_iss': '0', - 'valor_iss_retido': '0', - 'outras_retencoes': '0', - 'base_calculo': '9.99', - 'aliquota_issqn': '0.05', - 'valor_liquido_nfse': '9.99', - 'desconto_incondicionado': '', - 'desconto_condicionado': '', - 'codigo_servico': '0107', - 'cnae_servico': '', - 'codigo_tributacao_municipio': '010701', - 'codigo_municipio': '3304557', - 'descricao': 'Venda de servico', - 'prestador': { - 'cnpj': '123456789011213', - 'inscricao_municipal': '123456', - }, - 'tomador': { - 'tipo_cpfcnpj': '1', - 'cpf_cnpj': '12345678923256', - 'inscricao_municipal': '123456', - 'razao_social': 'Trustcode', - 'tipo_logradouro': '1', - 'logradouro': 'Vinicius de Moraes, 42', - 'numero': '42', - 'complemento': '', - 'bairro': 'Corrego', - 'cidade': '4205407', # Código da cidade, de acordo com o IBGE - 'uf': 'SC', - 'cep': '88037240', - 'tomador.telefone': '', - 'tomador.email': '' - }, - } -} - - -retorno = gerar_nfse(certificado, **rps) - - - - -# retorno é um dicionário { 'received_xml':'', 'sent_xml':'', 'object': object() } -if dbg>=9 :print(retorno['sent_xml']) -if dbg>=1 :print(retorno['received_xml']) - -# retorno['object'] é um objeto python criado apartir do xml de resposta -if dbg>=9 :print(retorno['object']) -#print retorno['object'].Cabecalho.Sucesso -#print retorno['object'].ChaveNFeRPS.ChaveNFe.NumeroNFe -#print retorno['object'].ChaveNFeRPS.ChaveRPS.NumeroRPS diff --git a/tests/test_certificado.py b/tests/test_certificado.py index 6c90ca2d..a88e3a49 100644 --- a/tests/test_certificado.py +++ b/tests/test_certificado.py @@ -60,7 +60,7 @@ def test_preparar_pfx(self): def test_save_pfx(self): pfx_source = open(os.path.join(self.caminho, "teste.pfx"), "rb").read() - pfx = Certificado(pfx_source, "123") + pfx = Certificado(pfx_source, "123456") path = pfx.save_pfx() saved = open(path, "rb").read() self.assertEqual(