diff --git a/go.mod b/go.mod index 6483190..3b79aea 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,10 @@ module armazenador -go 1.17 +go 1.18 require ( github.com/dadosjusbr/proto v0.0.0-20221212025627-91c60aa3cd12 - github.com/dadosjusbr/storage v0.0.0-20231026210858-4a308036eb85 + github.com/dadosjusbr/storage v0.0.0-20231202000028-bee3d3aef9a9 github.com/kelseyhightower/envconfig v1.4.0 google.golang.org/protobuf v1.28.1 ) diff --git a/go.sum b/go.sum index b622cbd..adee21c 100644 --- a/go.sum +++ b/go.sum @@ -29,8 +29,8 @@ github.com/dadosjusbr/proto v0.0.0-20221212025627-91c60aa3cd12 h1:ufl8nbCEo6g2VH github.com/dadosjusbr/proto v0.0.0-20221212025627-91c60aa3cd12/go.mod h1:gPA7VxjEmyez/xtln4qBj+tM1NO0/zcw3ryjxTRNSco= github.com/dadosjusbr/status v0.0.0-20230428151814-b605fe0e598f h1:MHtPcEeoJ4CCOvFDzvYXU2AndoqiIvZdrOjWACaImLA= github.com/dadosjusbr/status v0.0.0-20230428151814-b605fe0e598f/go.mod h1:7M8ds2L3u+rHuiP+IImNTx7bSlIUz6hXzW0ZTBOeHzw= -github.com/dadosjusbr/storage v0.0.0-20231026210858-4a308036eb85 h1:WL3E1SK52OwLoq4pEZwjV4C8P7HyfmwmYumu9KGA/L4= -github.com/dadosjusbr/storage v0.0.0-20231026210858-4a308036eb85/go.mod h1:JTg/Wu2+wn9SV1J19dDeEronive9J6QelMA7K6vLO8I= +github.com/dadosjusbr/storage v0.0.0-20231202000028-bee3d3aef9a9 h1:nn0jgylIokUL72rv3raZA+Ih5fuSyCO8zg0nw72PPXk= +github.com/dadosjusbr/storage v0.0.0-20231202000028-bee3d3aef9a9/go.mod h1:PszGy6CDoG3kNLjIsCmwD3MAWED7xL7U/OWj7ajsiHc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -89,7 +89,6 @@ github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= -github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= @@ -111,7 +110,6 @@ github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5W github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= @@ -154,7 +152,6 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg= github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= -github.com/joho/sqltocsv v0.0.0-20210428211105-a6d6801d59df/go.mod h1:mAVCUAYtW9NG31eB30umMSLKcDt6mCUWSjoSn5qBh0k= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= diff --git a/main.go b/main.go index d89d520..a9d124b 100644 --- a/main.go +++ b/main.go @@ -1,9 +1,11 @@ package main import ( + "encoding/json" "fmt" "io/ioutil" "math" + "net/http" "os" "regexp" "strings" @@ -45,34 +47,34 @@ type config struct { func main() { var c config if err := envconfig.Process("", &c); err != nil { - status.ExitFromError(status.NewError(4, fmt.Errorf("error loading config values from .env: %v", err.Error()))) + status.ExitFromError(status.NewError(status.DataUnavailable, fmt.Errorf("error loading config values from .env: %v", err.Error()))) } // Criando o client do Postgres postgresDB, err := database.NewPostgresDB(c.PostgresUser, c.PostgresPassword, c.PostgresDBName, c.PostgresHost, c.PostgresPort) if err != nil { - status.ExitFromError(status.NewError(4, fmt.Errorf("error creating PostgresDB client: %v", err.Error()))) + status.ExitFromError(status.NewError(status.DataUnavailable, fmt.Errorf("error creating PostgresDB client: %v", err.Error()))) } // Criando o client do S3 s3Client, err := file_storage.NewS3Client(c.AWSRegion, c.S3Bucket) if err != nil { - status.ExitFromError(status.NewError(4, fmt.Errorf("error creating S3 client: %v", err.Error()))) + status.ExitFromError(status.NewError(status.DataUnavailable, fmt.Errorf("error creating S3 client: %v", err.Error()))) } // Criando client do storage a partir do banco postgres e do client do s3 pgS3Client, err := storage.NewClient(postgresDB, s3Client) if err != nil { - status.ExitFromError(status.NewError(3, fmt.Errorf("error setting up postgres storage client: %s", err))) + status.ExitFromError(status.NewError(status.ConnectionError, fmt.Errorf("error setting up postgres storage client: %s", err))) } defer pgS3Client.Db.Disconnect() var er pipeline.ResultadoExecucao erIN, err := ioutil.ReadAll(os.Stdin) if err != nil { - status.ExitFromError(status.NewError(2, fmt.Errorf("error reading execution result: %v", err))) + status.ExitFromError(status.NewError(status.SystemError, fmt.Errorf("error reading execution result: %v", err))) } if err = prototext.Unmarshal(erIN, &er); err != nil { - status.ExitFromError(status.NewError(2, fmt.Errorf("error reading execution result: %v", err))) + status.ExitFromError(status.NewError(status.SystemError, fmt.Errorf("error reading execution result: %v", err))) } // Package. @@ -80,7 +82,7 @@ func main() { status.ExitFromError(status.NewError(status.InvalidInput, fmt.Errorf("there is no package to store. PackageResult:%+v", er.Pr))) } - //Armazenando os datapackages no S3 + // Armazenando os datapackages no S3 dstKey := fmt.Sprintf("%s/datapackage/%s-%d-%d.zip", er.Rc.Coleta.Orgao, er.Rc.Coleta.Orgao, er.Rc.Coleta.Ano, er.Rc.Coleta.Mes) s3Backup, err := pgS3Client.Cloud.UploadFile(er.Pr.Pacote, dstKey) if err != nil { @@ -92,18 +94,18 @@ func main() { status.ExitFromError(status.NewError(2, fmt.Errorf("no backup files found: CrawlingResult:%+v", er.Cr))) } */ - //Armazenando os backups no S3 + // Armazenando os backups no S3 dstKey = fmt.Sprintf("%s/backups/%s-%d-%d.zip", er.Rc.Coleta.Orgao, er.Rc.Coleta.Orgao, er.Rc.Coleta.Ano, er.Rc.Coleta.Mes) s3Backups, err := pgS3Client.Cloud.UploadFile(er.Rc.Coleta.Arquivos[0], dstKey) if err != nil { - status.ExitFromError(status.NewError(2, fmt.Errorf("error trying to get Backup files from S3: %v, error: %v", er.Rc.Coleta.Arquivos, err))) + status.ExitFromError(status.NewError(status.SystemError, fmt.Errorf("error trying to get Backup files from S3: %v, error: %v", er.Rc.Coleta.Arquivos, err))) } //Armazenando as remuneracoes no S3 e no postgres dstKey = fmt.Sprintf("%s/remuneracoes/%s-%d-%d.zip", er.Rc.Coleta.Orgao, er.Rc.Coleta.Orgao, er.Rc.Coleta.Ano, er.Rc.Coleta.Mes) _, err = pgS3Client.Cloud.UploadFile(er.Pr.Remuneracoes.ZipUrl, dstKey) if err != nil { - status.ExitFromError(status.NewError(2, fmt.Errorf("error trying to upload Remunerations zip in S3: %v, error: %v", er.Pr.Remuneracoes, err))) + status.ExitFromError(status.NewError(status.SystemError, fmt.Errorf("error trying to upload Remunerations zip in S3: %v, error: %v", er.Pr.Remuneracoes, err))) } err = pgS3Client.StoreRemunerations(models.Remunerations{ AgencyID: er.Rc.Coleta.Orgao, @@ -115,63 +117,16 @@ func main() { ZipUrl: fmt.Sprintf("https://%s.s3.amazonaws.com/%s", c.S3Bucket, dstKey), }) if err != nil { - status.ExitFromError(status.NewError(2, fmt.Errorf("error trying to store Remunerations zip in Postgres: %v, error: %v", er.Pr.Remuneracoes, err))) - } - - agmi := models.AgencyMonthlyInfo{ - AgencyID: er.Rc.Coleta.Orgao, - Month: int(er.Rc.Coleta.Mes), - Year: int(er.Rc.Coleta.Ano), - CrawlerRepo: er.Rc.Coleta.RepositorioColetor, - CrawlerVersion: er.Rc.Coleta.VersaoColetor, - ParserRepo: er.Rc.Coleta.RepositorioParser, - ParserVersion: er.Rc.Coleta.VersaoParser, - CrawlingTimestamp: er.Rc.Coleta.TimestampColeta, - Summary: summary(er.Rc.Folha.ContraCheque), - Backups: []models.Backup{*s3Backups}, - Meta: &models.Meta{ - OpenFormat: er.Rc.Metadados.FormatoAberto, - Access: er.Rc.Metadados.Acesso.String(), - Extension: er.Rc.Metadados.Extensao.String(), - StrictlyTabular: er.Rc.Metadados.EstritamenteTabular, - ConsistentFormat: er.Rc.Metadados.FormatoConsistente, - HaveEnrollment: er.Rc.Metadados.TemMatricula, - ThereIsACapacity: er.Rc.Metadados.TemLotacao, - HasPosition: er.Rc.Metadados.TemCargo, - BaseRevenue: er.Rc.Metadados.ReceitaBase.String(), - OtherRecipes: er.Rc.Metadados.OutrasReceitas.String(), - Expenditure: er.Rc.Metadados.Despesas.String(), - }, - Score: &models.Score{ - Score: float64(er.Rc.Metadados.IndiceTransparencia), - EasinessScore: float64(er.Rc.Metadados.IndiceFacilidade), - CompletenessScore: float64(er.Rc.Metadados.IndiceCompletude), - }, - ProcInfo: er.Rc.Procinfo, - Package: s3Backup, - } - // Calculando o tempo de execução da coleta - if c.StartTime != "" { - layout := "2006-01-02 15:04:05.000000" // formato data-hora - t, err := time.Parse(layout, c.StartTime) // transformando a hora (string) para o tipo time.Time - if err != nil { - status.ExitFromError(status.NewError(2, fmt.Errorf("error calculating collection time: %v", err))) - } - Duration := time.Since(t) // Calcula a diferença da hora dada com a hora atual (UTC+0) - agmi.Duration = Duration.Seconds() - } - if er.Rc.Procinfo != nil && er.Rc.Procinfo.Status != 0 { - agmi.ProcInfo = er.Rc.Procinfo - } - - if err = pgS3Client.Store(agmi); err != nil { - status.ExitFromError(status.NewError(2, fmt.Errorf("error trying to store 'coleta': %v", err))) + status.ExitFromError(status.NewError(status.SystemError, fmt.Errorf("error trying to store Remunerations zip in Postgres: %v, error: %v", er.Pr.Remuneracoes, err))) } var paychecks []models.Paycheck var remunerations []models.PaycheckItem m, _ := regexp.Compile("[A-Za-z]") + // Mapeando as rubricas distintas da folha de contracheque + itemValues := make(map[string]float64) + // Contracheques for id, p := range er.Rc.Folha.ContraCheque { salary, benefits, discounts, remuneration := calcBaseSalary(*p) @@ -216,23 +171,80 @@ func main() { } else { // Se a rubrica não for inconsistente, faremos uma cópia sanitizada na coluna item_sanitizado. itemSanitizado := sanitizarItem(r.Item) + // agregamos o valor por rubrica (não considerando descontos) + if r.Natureza != coleta.Remuneracao_D { + itemValues[itemSanitizado] += math.Abs(r.Valor) + } remunerations[len(remunerations)-1].SanitizedItem = &itemSanitizado } i++ } } } + + agmi := models.AgencyMonthlyInfo{ + AgencyID: er.Rc.Coleta.Orgao, + Month: int(er.Rc.Coleta.Mes), + Year: int(er.Rc.Coleta.Ano), + CrawlerRepo: er.Rc.Coleta.RepositorioColetor, + CrawlerVersion: er.Rc.Coleta.VersaoColetor, + ParserRepo: er.Rc.Coleta.RepositorioParser, + ParserVersion: er.Rc.Coleta.VersaoParser, + CrawlingTimestamp: er.Rc.Coleta.TimestampColeta, + Summary: summary(er.Rc.Folha.ContraCheque, itemValues), + Backups: []models.Backup{*s3Backups}, + Meta: &models.Meta{ + OpenFormat: er.Rc.Metadados.FormatoAberto, + Access: er.Rc.Metadados.Acesso.String(), + Extension: er.Rc.Metadados.Extensao.String(), + StrictlyTabular: er.Rc.Metadados.EstritamenteTabular, + ConsistentFormat: er.Rc.Metadados.FormatoConsistente, + HaveEnrollment: er.Rc.Metadados.TemMatricula, + ThereIsACapacity: er.Rc.Metadados.TemLotacao, + HasPosition: er.Rc.Metadados.TemCargo, + BaseRevenue: er.Rc.Metadados.ReceitaBase.String(), + OtherRecipes: er.Rc.Metadados.OutrasReceitas.String(), + Expenditure: er.Rc.Metadados.Despesas.String(), + }, + Score: &models.Score{ + Score: float64(er.Rc.Metadados.IndiceTransparencia), + EasinessScore: float64(er.Rc.Metadados.IndiceFacilidade), + CompletenessScore: float64(er.Rc.Metadados.IndiceCompletude), + }, + ProcInfo: er.Rc.Procinfo, + Package: s3Backup, + } + // Calculando o tempo de execução da coleta + if c.StartTime != "" { + const layout = "2006-01-02 15:04:05.000000" // formato data-hora + t, err := time.Parse(layout, c.StartTime) // transformando a hora (string) para o tipo time.Time + if err != nil { + status.ExitFromError(status.NewError(status.SystemError, fmt.Errorf("error calculating collection time: %v", err))) + } + Duration := time.Since(t) // Calcula a diferença da hora dada com a hora atual (UTC+0) + agmi.Duration = Duration.Seconds() + } + if er.Rc.Procinfo != nil && er.Rc.Procinfo.Status != 0 { + agmi.ProcInfo = er.Rc.Procinfo + } + + if err = pgS3Client.Store(agmi); err != nil { + status.ExitFromError(status.NewError(status.SystemError, fmt.Errorf("error trying to store 'coleta': %v", err))) + } + if err := pgS3Client.StorePaychecks(paychecks, remunerations); err != nil { - status.ExitFromError(status.NewError(2, fmt.Errorf("error trying to store 'contracheques' and 'remuneracoes': %v", err))) + status.ExitFromError(status.NewError(status.SystemError, fmt.Errorf("error trying to store 'contracheques' and 'remuneracoes': %v", err))) } fmt.Println("Store Executed...") } // summary aux func to make all necessary calculations to DataSummary Struct -func summary(employees []*coleta.ContraCheque) *models.Summary { +func summary(employees []*coleta.ContraCheque, itemValues map[string]float64) *models.Summary { + itemSummary := aggregatingItems(itemValues) memberActive := models.Summary{ IncomeHistogram: map[int]int{10000: 0, 20000: 0, 30000: 0, 40000: 0, 50000: 0, -1: 0}, + ItemSummary: itemSummary, } for _, emp := range employees { // checking if the employee instance has the required data to build the summary @@ -341,3 +353,62 @@ func sanitizarItem(item string) string { return item } + +// Realiza o download do json com as rubricas desambiguadas +func getItems() map[string][]string { + // json com rubricas desambiguadas + const url = "https://raw.githubusercontent.com/dadosjusbr/desambiguador/main/rubricas.json" + + res, err := http.Get(url) + if err != nil { + status.ExitFromError(status.NewError(status.ConnectionError, err)) + } + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + status.ExitFromError(status.NewError(status.SystemError, err)) + } + + var itemJson map[string][]string + + // unmarshall + if err := json.Unmarshal(body, &itemJson); err != nil { + status.ExitFromError(status.NewError(status.SystemError, fmt.Errorf("error unmarshalling 'rubricas.json': %w", err))) + } + + return itemJson +} + +// Com a lista de rubricas distintas da folha de contracheque (e seu somatório), +// comparamos com a lista de rubricas desambiguadas e criamos o json da coluna 'resumo' +// alocando o valor de cada rubrica a seu respectivo grupo. +func aggregatingItems(itemValues map[string]float64) models.ItemSummary { + items := getItems() + var itemSummary models.ItemSummary + var others float64 + + // Esse processo visa facilitar a iteração mútua de rubricas do contracheque <> rubricas desambiguadas + itemStruct := make(map[string]map[string]struct{}, len(items)) + for key, values := range items { + itemStruct[key] = make(map[string]struct{}, len(values)) + for _, value := range values { + itemStruct[key][value] = struct{}{} + } + } + + for item, value := range itemValues { + for key, listItems := range itemStruct { + others = value + if _, ok := listItems[item]; ok { + switch key { + case "auxilio-alimentacao": + itemSummary.FoodAllowance += value + others = 0 + } + break + } + } + itemSummary.Others += others + } + return itemSummary +}