Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#45 Multithreaded Processing #53

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pkg/adapters/geo/maxmind_geo_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package geo
import (
"log"
"net"
"sync"
"tango/pkg/services/report"

"github.com/oschwald/geoip2-golang"
Expand All @@ -11,6 +12,7 @@ import (
// MaxMindGeoProvider
type MaxMindGeoProvider struct {
maxmindCityDatabase *geoip2.Reader
mu sync.Mutex
}

// NewMaxMindGeoProvider creates a new instance of MaxMindGeoProvider
Expand All @@ -23,12 +25,16 @@ func NewMaxMindGeoProvider(maxmindGeoLibPath string) *MaxMindGeoProvider {

return &MaxMindGeoProvider{
maxmindCityDatabase: maxmindCityDatabase,
mu: sync.Mutex{},
}
}

// GetGeoDataByIP provides geo location data by IP
func (p *MaxMindGeoProvider) GetGeoDataByIP(ip string) *report.GeoData {
parsedIP := net.ParseIP(ip)
func (p *MaxMindGeoProvider) GetGeoDataByIP(IP string) *report.GeoData {
p.mu.Lock()
defer p.mu.Unlock()

parsedIP := net.ParseIP(IP)
geoLocation, err := p.maxmindCityDatabase.City(parsedIP)

if err != nil {
Expand Down
45 changes: 38 additions & 7 deletions pkg/adapters/reader/access_log_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"log"
"os"
"tango/pkg/services"
)

// AccessLogReader
Expand All @@ -17,22 +16,54 @@ func NewAccessLogReader() *AccessLogReader {
}

// Read given access log file
func (r *AccessLogReader) Read(filePath string, readAccessLogFunc services.ReadAccessLogFunc) {
func (r *AccessLogReader) Read(filePath string) (<-chan string, <-chan int) {
file, err := os.Open(filePath)

if err != nil {
log.Fatal(err)
}

defer file.Close()
logChan := make(chan string)
bytesReadChan := make(chan int)

scanner := bufio.NewScanner(file)
go func() {
defer func() {
err := file.Close()
if err != nil {
log.Fatal(err)
}
}()

for scanner.Scan() {
readAccessLogFunc(scanner.Text(), len(scanner.Bytes()))
scanner := bufio.NewScanner(file)

for scanner.Scan() {
logChan <- scanner.Text()
bytesReadChan <- len(scanner.Bytes())
}

close(logChan)
close(bytesReadChan)

if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}()

return logChan, bytesReadChan
}

func (r *AccessLogReader) FileSize(filePath string) int64 {
file, err := os.Open(filePath)

if err != nil {
log.Fatal(err)
}

if err := scanner.Err(); err != nil {
fileInfo, err := file.Stat()

if err != nil {
log.Fatal(err)
}

return fileInfo.Size()
}
33 changes: 33 additions & 0 deletions pkg/adapters/reader/reader_progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package reader

import (
"github.com/cheggaaa/pb"
)

type ReadProgress struct {
progressBar *pb.ProgressBar
}

//
func NewReadProgress() *ReadProgress {
return &ReadProgress{
progressBar: nil,
}
}

func (r *ReadProgress) Track(bytesReadChan <-chan int, fileSize int64) {
r.progressBar = pb.New64(fileSize)

r.progressBar.Format("[=>_]")
r.progressBar.SetUnits(pb.U_BYTES)
r.progressBar.SetMaxWidth(100)

go func() {
r.progressBar.Start()
defer r.progressBar.Finish()

for bytesRead := range bytesReadChan {
r.progressBar.Add(bytesRead)
}
}()
}
14 changes: 10 additions & 4 deletions pkg/adapters/writer/custom_report_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"tango/pkg/entity"
"tango/pkg/services/mapper"
)

var timeFormat = "2006-01-02 15:04:05 -0700" // todo: add localization for US/EU formats
Expand All @@ -22,15 +23,18 @@ var CustomReportHeader = []string{

//
type CustomReportCsvWriter struct {
logMapper *mapper.AccessLogMapper
}

//
func NewCustomReportCsvWriter() *CustomReportCsvWriter {
return &CustomReportCsvWriter{}
func NewCustomReportCsvWriter(logMapper *mapper.AccessLogMapper) *CustomReportCsvWriter {
return &CustomReportCsvWriter{
logMapper: logMapper,
}
}

// Save GeoLocation Report to CSV file
func (w *CustomReportCsvWriter) Save(filePath string, accessLogs []entity.AccessLogRecord) {
func (w *CustomReportCsvWriter) Save(filePath string, logChan <-chan entity.AccessLogRecord) {
file, err := os.Create(filePath)

if err != nil {
Expand All @@ -46,7 +50,7 @@ func (w *CustomReportCsvWriter) Save(filePath string, accessLogs []entity.Access
}

// Body
for _, accessLog := range accessLogs {
for accessLog := range logChan {
err := writer.Write([]string{
accessLog.Time.Format(timeFormat),
strings.Join(accessLog.IP, ", "),
Expand All @@ -59,5 +63,7 @@ func (w *CustomReportCsvWriter) Save(filePath string, accessLogs []entity.Access
if err != nil {
log.Fatal("Error on writing custom report: ", err)
}

w.logMapper.Recycle(accessLog)
}
}
16 changes: 8 additions & 8 deletions pkg/adapters/writer/geo_report_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewGeoReportCsvWriter() *GeoReportCsvWriter {
}

// Save GeoLocation Report to CSV file
func (w *GeoReportCsvWriter) Save(filePath string, geolocationReport map[string]*report.Geolocation) {
func (w *GeoReportCsvWriter) Save(filePath string, geoReport *report.GeoReport) {
file, err := os.Create(filePath)

if err != nil {
Expand All @@ -43,15 +43,15 @@ func (w *GeoReportCsvWriter) Save(filePath string, geolocationReport map[string]
}

// Body
for ip, geoLocation := range geolocationReport {
for ip, geoRecord := range geoReport.Report() {
err := writer.Write([]string{
ip,
geoLocation.GeoData.Country,
geoLocation.GeoData.City,
geoLocation.GeoData.Continent,
geoLocation.SampleRequest,
geoLocation.BrowserAgent,
strconv.FormatUint(geoLocation.Requests, 10),
geoRecord.GeoData.Country,
geoRecord.GeoData.City,
geoRecord.GeoData.Continent,
geoRecord.SampleRequest,
geoRecord.BrowserAgent,
strconv.FormatUint(geoRecord.Requests, 10),
})

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/adapters/writer/request_report_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewRequestReportCsvWriter() *RequestReportCsvWriter {
}

// Save request report to CSV file
func (w *RequestReportCsvWriter) Save(filePath string, requestReport map[string]*report.RequestReportItem) {
func (w *RequestReportCsvWriter) Save(filePath string, requestReport *report.RequestReport) {
file, err := os.Create(filePath)

if err != nil {
Expand All @@ -40,7 +40,7 @@ func (w *RequestReportCsvWriter) Save(filePath string, requestReport map[string]
}

// Body
for _, requestReportItem := range requestReport {
for _, requestReportItem := range requestReport.Report() {
err := writer.Write([]string{
requestReportItem.Path,
strconv.FormatUint(requestReportItem.Requests, 10),
Expand Down
8 changes: 8 additions & 0 deletions pkg/adapters/writer/writer-packr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions pkg/cli/command/browser_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package command
import (
"fmt"
"tango/pkg/di"
"tango/pkg/services/mapper"

"github.com/urfave/cli"
)
Expand All @@ -12,18 +13,20 @@ func BrowserReportCommand(cliContext *cli.Context) error {
reportConfig := di.InitReportConfig(cliContext)
filterConfig := di.InitFilterConfig(cliContext)
processorConfig := di.InitProcessorConfig(cliContext)
readAccessLogService := di.InitReadAccessLogService(processorConfig, filterConfig)
browserReportService := di.InitBrowserReportService()

logMapper := mapper.NewAccessLogMapper()
readAccessLogService := di.InitReadAccessLogService(logMapper, processorConfig, filterConfig)
browserReportService := di.InitBrowserReportService(logMapper)

fmt.Println("💃 Tango is on the scene!")
fmt.Println("💃 started to generate a browser report...")
fmt.Println("💃 reading access logs...")

accessLogRecords := readAccessLogService.Read(reportConfig.LogFile)
logChan := readAccessLogService.Read(reportConfig.LogFile)

fmt.Println("💃 saving the browser report...")

browserReportService.GenerateReport(reportConfig.ReportFile, accessLogRecords)
browserReportService.GenerateReport(reportConfig.ReportFile, logChan)

fmt.Println("🎉 browser report has been generated")

Expand Down
11 changes: 7 additions & 4 deletions pkg/cli/command/custom_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package command
import (
"fmt"
"tango/pkg/di"
"tango/pkg/services/mapper"

"github.com/urfave/cli"
)
Expand All @@ -12,18 +13,20 @@ func CustomReportCommand(cliContext *cli.Context) error {
reportConfig := di.InitReportConfig(cliContext)
filterConfig := di.InitFilterConfig(cliContext)
processorConfig := di.InitProcessorConfig(cliContext)
readAccessLogService := di.InitReadAccessLogService(processorConfig, filterConfig)
customReportService := di.InitCustomReportService()

logMapper := mapper.NewAccessLogMapper()
readAccessLogService := di.InitReadAccessLogService(logMapper, processorConfig, filterConfig)
customReportService := di.InitCustomReportService(logMapper)

fmt.Println("💃 Tango is on the scene!")
fmt.Println("💃 started to generate a custom report...")
fmt.Println("💃 reading access logs...")

accessLogRecords := readAccessLogService.Read(reportConfig.LogFile)
logChan := readAccessLogService.Read(reportConfig.LogFile)

fmt.Println("💃 saving the custom report...")

customReportService.GenerateReport(reportConfig.ReportFile, accessLogRecords)
customReportService.GenerateReport(reportConfig.ReportFile, logChan)

fmt.Println("🎉 custom report has been generated")

Expand Down
11 changes: 7 additions & 4 deletions pkg/cli/command/geo_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"tango/pkg/di"
"tango/pkg/services/mapper"

"github.com/urfave/cli"
)
Expand All @@ -13,7 +14,9 @@ func GeoReportCommand(cliContext *cli.Context) error {
reportConfig := di.InitReportConfig(cliContext)
filterConfig := di.InitFilterConfig(cliContext)
processorConfig := di.InitProcessorConfig(cliContext)
readAccessLogService := di.InitReadAccessLogService(processorConfig, filterConfig)

logMapper := mapper.NewAccessLogMapper()
readAccessLogService := di.InitReadAccessLogService(logMapper, processorConfig, filterConfig)
geoLibResolver := di.InitMaxmindGeoLibResolver()

fmt.Println("💃 Tango is on the scene!")
Expand All @@ -28,16 +31,16 @@ func GeoReportCommand(cliContext *cli.Context) error {
return nil
}

geoReportService := di.InitGeoReportService(geoLibPath)
geoReportService := di.InitGeoReportService(logMapper, geoLibPath)

fmt.Println("💃 started to generate a geo report...")
fmt.Println("💃 reading access logs...")

accessLogRecords := readAccessLogService.Read(reportConfig.LogFile)
logChan := readAccessLogService.Read(reportConfig.LogFile)

fmt.Println("💃 saving the geo report...")

geoReportService.GenerateReport(reportConfig.ReportFile, accessLogRecords)
geoReportService.GenerateReport(reportConfig.ReportFile, logChan)

fmt.Println("🎉 geo report has been generated")

Expand Down
10 changes: 6 additions & 4 deletions pkg/cli/command/journey_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package command
import (
"fmt"
"tango/pkg/di"
"tango/pkg/services/mapper"

"github.com/urfave/cli"
)
Expand All @@ -13,19 +14,20 @@ func JourneyReportCommand(cliContext *cli.Context) error {
generalConfig := di.InitGeneralConfig(cliContext)
filterConfig := di.InitFilterConfig(cliContext)
processorConfig := di.InitProcessorConfig(cliContext)
readAccessLogService := di.InitReadAccessLogService(processorConfig, filterConfig)

journeyReportService := di.InitJourneyReportService(generalConfig)
logMapper := mapper.NewAccessLogMapper()
readAccessLogService := di.InitReadAccessLogService(logMapper, processorConfig, filterConfig)
journeyReportService := di.InitJourneyReportService(logMapper, generalConfig)

fmt.Println("💃 Tango is on the scene!")
fmt.Println("💃 started to generate a visitor's journey report...")
fmt.Println("💃 reading access logs...")

accessLogRecords := readAccessLogService.Read(reportConfig.LogFile)
logChan := readAccessLogService.Read(reportConfig.LogFile)

fmt.Println("💃 saving visitor's journey report...")

journeyReportService.GenerateReport(reportConfig.ReportFile, accessLogRecords)
journeyReportService.GenerateReport(reportConfig.ReportFile, logChan)

fmt.Println("🎉 visitor's journey report has been generated")

Expand Down
Loading