diff --git a/pkg/adapters/geo/maxmind_geo_provider.go b/pkg/adapters/geo/maxmind_geo_provider.go index 27bfc30..b428f55 100644 --- a/pkg/adapters/geo/maxmind_geo_provider.go +++ b/pkg/adapters/geo/maxmind_geo_provider.go @@ -3,6 +3,7 @@ package geo import ( "log" "net" + "sync" "tango/pkg/services/report" "github.com/oschwald/geoip2-golang" @@ -11,6 +12,7 @@ import ( // MaxMindGeoProvider type MaxMindGeoProvider struct { maxmindCityDatabase *geoip2.Reader + mu sync.Mutex } // NewMaxMindGeoProvider creates a new instance of MaxMindGeoProvider @@ -23,12 +25,16 @@ func NewMaxMindGeoProvider(maxmindGeoLibPath string) *MaxMindGeoProvider { return &MaxMindGeoProvider{ maxmindCityDatabase: maxmindCityDatabase, + mu: sync.Mutex{}, } } // GetGeoDataByIP provides geo location data by IP -func (p *MaxMindGeoProvider) GetGeoDataByIP(ip string) *report.GeoData { - parsedIP := net.ParseIP(ip) +func (p *MaxMindGeoProvider) GetGeoDataByIP(IP string) *report.GeoData { + p.mu.Lock() + defer p.mu.Unlock() + + parsedIP := net.ParseIP(IP) geoLocation, err := p.maxmindCityDatabase.City(parsedIP) if err != nil { diff --git a/pkg/adapters/reader/access_log_reader.go b/pkg/adapters/reader/access_log_reader.go index eb42bc4..7d28246 100644 --- a/pkg/adapters/reader/access_log_reader.go +++ b/pkg/adapters/reader/access_log_reader.go @@ -4,7 +4,6 @@ import ( "bufio" "log" "os" - "tango/pkg/services" ) // AccessLogReader @@ -17,22 +16,54 @@ func NewAccessLogReader() *AccessLogReader { } // Read given access log file -func (r *AccessLogReader) Read(filePath string, readAccessLogFunc services.ReadAccessLogFunc) { +func (r *AccessLogReader) Read(filePath string) (<-chan string, <-chan int) { file, err := os.Open(filePath) if err != nil { log.Fatal(err) } - defer file.Close() + logChan := make(chan string) + bytesReadChan := make(chan int) - scanner := bufio.NewScanner(file) + go func() { + defer func() { + err := file.Close() + if err != nil { + log.Fatal(err) + } + }() - for scanner.Scan() { - readAccessLogFunc(scanner.Text(), len(scanner.Bytes())) + scanner := bufio.NewScanner(file) + + for scanner.Scan() { + logChan <- scanner.Text() + bytesReadChan <- len(scanner.Bytes()) + } + + close(logChan) + close(bytesReadChan) + + if err := scanner.Err(); err != nil { + log.Fatal(err) + } + }() + + return logChan, bytesReadChan +} + +func (r *AccessLogReader) FileSize(filePath string) int64 { + file, err := os.Open(filePath) + + if err != nil { + log.Fatal(err) } - if err := scanner.Err(); err != nil { + fileInfo, err := file.Stat() + + if err != nil { log.Fatal(err) } + + return fileInfo.Size() } diff --git a/pkg/adapters/reader/reader_progress.go b/pkg/adapters/reader/reader_progress.go new file mode 100644 index 0000000..3e33a8b --- /dev/null +++ b/pkg/adapters/reader/reader_progress.go @@ -0,0 +1,33 @@ +package reader + +import ( + "github.com/cheggaaa/pb" +) + +type ReadProgress struct { + progressBar *pb.ProgressBar +} + +// +func NewReadProgress() *ReadProgress { + return &ReadProgress{ + progressBar: nil, + } +} + +func (r *ReadProgress) Track(bytesReadChan <-chan int, fileSize int64) { + r.progressBar = pb.New64(fileSize) + + r.progressBar.Format("[=>_]") + r.progressBar.SetUnits(pb.U_BYTES) + r.progressBar.SetMaxWidth(100) + + go func() { + r.progressBar.Start() + defer r.progressBar.Finish() + + for bytesRead := range bytesReadChan { + r.progressBar.Add(bytesRead) + } + }() +} diff --git a/pkg/adapters/writer/custom_report_csv.go b/pkg/adapters/writer/custom_report_csv.go index 7b9a453..f6613a3 100644 --- a/pkg/adapters/writer/custom_report_csv.go +++ b/pkg/adapters/writer/custom_report_csv.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" "tango/pkg/entity" + "tango/pkg/services/mapper" ) var timeFormat = "2006-01-02 15:04:05 -0700" // todo: add localization for US/EU formats @@ -22,15 +23,18 @@ var CustomReportHeader = []string{ // type CustomReportCsvWriter struct { + logMapper *mapper.AccessLogMapper } // -func NewCustomReportCsvWriter() *CustomReportCsvWriter { - return &CustomReportCsvWriter{} +func NewCustomReportCsvWriter(logMapper *mapper.AccessLogMapper) *CustomReportCsvWriter { + return &CustomReportCsvWriter{ + logMapper: logMapper, + } } // Save GeoLocation Report to CSV file -func (w *CustomReportCsvWriter) Save(filePath string, accessLogs []entity.AccessLogRecord) { +func (w *CustomReportCsvWriter) Save(filePath string, logChan <-chan entity.AccessLogRecord) { file, err := os.Create(filePath) if err != nil { @@ -46,7 +50,7 @@ func (w *CustomReportCsvWriter) Save(filePath string, accessLogs []entity.Access } // Body - for _, accessLog := range accessLogs { + for accessLog := range logChan { err := writer.Write([]string{ accessLog.Time.Format(timeFormat), strings.Join(accessLog.IP, ", "), @@ -59,5 +63,7 @@ func (w *CustomReportCsvWriter) Save(filePath string, accessLogs []entity.Access if err != nil { log.Fatal("Error on writing custom report: ", err) } + + w.logMapper.Recycle(accessLog) } } diff --git a/pkg/adapters/writer/geo_report_csv.go b/pkg/adapters/writer/geo_report_csv.go index 2adb9a7..f026514 100644 --- a/pkg/adapters/writer/geo_report_csv.go +++ b/pkg/adapters/writer/geo_report_csv.go @@ -27,7 +27,7 @@ func NewGeoReportCsvWriter() *GeoReportCsvWriter { } // Save GeoLocation Report to CSV file -func (w *GeoReportCsvWriter) Save(filePath string, geolocationReport map[string]*report.Geolocation) { +func (w *GeoReportCsvWriter) Save(filePath string, geoReport *report.GeoReport) { file, err := os.Create(filePath) if err != nil { @@ -43,15 +43,15 @@ func (w *GeoReportCsvWriter) Save(filePath string, geolocationReport map[string] } // Body - for ip, geoLocation := range geolocationReport { + for ip, geoRecord := range geoReport.Report() { err := writer.Write([]string{ ip, - geoLocation.GeoData.Country, - geoLocation.GeoData.City, - geoLocation.GeoData.Continent, - geoLocation.SampleRequest, - geoLocation.BrowserAgent, - strconv.FormatUint(geoLocation.Requests, 10), + geoRecord.GeoData.Country, + geoRecord.GeoData.City, + geoRecord.GeoData.Continent, + geoRecord.SampleRequest, + geoRecord.BrowserAgent, + strconv.FormatUint(geoRecord.Requests, 10), }) if err != nil { diff --git a/pkg/adapters/writer/request_report_csv.go b/pkg/adapters/writer/request_report_csv.go index 49a6b6f..99a8962 100644 --- a/pkg/adapters/writer/request_report_csv.go +++ b/pkg/adapters/writer/request_report_csv.go @@ -24,7 +24,7 @@ func NewRequestReportCsvWriter() *RequestReportCsvWriter { } // Save request report to CSV file -func (w *RequestReportCsvWriter) Save(filePath string, requestReport map[string]*report.RequestReportItem) { +func (w *RequestReportCsvWriter) Save(filePath string, requestReport *report.RequestReport) { file, err := os.Create(filePath) if err != nil { @@ -40,7 +40,7 @@ func (w *RequestReportCsvWriter) Save(filePath string, requestReport map[string] } // Body - for _, requestReportItem := range requestReport { + for _, requestReportItem := range requestReport.Report() { err := writer.Write([]string{ requestReportItem.Path, strconv.FormatUint(requestReportItem.Requests, 10), diff --git a/pkg/adapters/writer/writer-packr.go b/pkg/adapters/writer/writer-packr.go new file mode 100644 index 0000000..d048f86 --- /dev/null +++ b/pkg/adapters/writer/writer-packr.go @@ -0,0 +1,8 @@ +// +build !skippackr +// Code generated by github.com/gobuffalo/packr/v2. DO NOT EDIT. + +// You can use the "packr clean" command to clean up this, +// and any other packr generated files. +package writer + +import _ "tango/packrd" diff --git a/pkg/cli/command/browser_report.go b/pkg/cli/command/browser_report.go index 780b3fc..3a920bf 100644 --- a/pkg/cli/command/browser_report.go +++ b/pkg/cli/command/browser_report.go @@ -3,6 +3,7 @@ package command import ( "fmt" "tango/pkg/di" + "tango/pkg/services/mapper" "github.com/urfave/cli" ) @@ -12,18 +13,20 @@ func BrowserReportCommand(cliContext *cli.Context) error { reportConfig := di.InitReportConfig(cliContext) filterConfig := di.InitFilterConfig(cliContext) processorConfig := di.InitProcessorConfig(cliContext) - readAccessLogService := di.InitReadAccessLogService(processorConfig, filterConfig) - browserReportService := di.InitBrowserReportService() + + logMapper := mapper.NewAccessLogMapper() + readAccessLogService := di.InitReadAccessLogService(logMapper, processorConfig, filterConfig) + browserReportService := di.InitBrowserReportService(logMapper) fmt.Println("💃 Tango is on the scene!") fmt.Println("💃 started to generate a browser report...") fmt.Println("💃 reading access logs...") - accessLogRecords := readAccessLogService.Read(reportConfig.LogFile) + logChan := readAccessLogService.Read(reportConfig.LogFile) fmt.Println("💃 saving the browser report...") - browserReportService.GenerateReport(reportConfig.ReportFile, accessLogRecords) + browserReportService.GenerateReport(reportConfig.ReportFile, logChan) fmt.Println("🎉 browser report has been generated") diff --git a/pkg/cli/command/custom_report.go b/pkg/cli/command/custom_report.go index 208ef2f..c0aa1bf 100644 --- a/pkg/cli/command/custom_report.go +++ b/pkg/cli/command/custom_report.go @@ -3,6 +3,7 @@ package command import ( "fmt" "tango/pkg/di" + "tango/pkg/services/mapper" "github.com/urfave/cli" ) @@ -12,18 +13,20 @@ func CustomReportCommand(cliContext *cli.Context) error { reportConfig := di.InitReportConfig(cliContext) filterConfig := di.InitFilterConfig(cliContext) processorConfig := di.InitProcessorConfig(cliContext) - readAccessLogService := di.InitReadAccessLogService(processorConfig, filterConfig) - customReportService := di.InitCustomReportService() + + logMapper := mapper.NewAccessLogMapper() + readAccessLogService := di.InitReadAccessLogService(logMapper, processorConfig, filterConfig) + customReportService := di.InitCustomReportService(logMapper) fmt.Println("💃 Tango is on the scene!") fmt.Println("💃 started to generate a custom report...") fmt.Println("💃 reading access logs...") - accessLogRecords := readAccessLogService.Read(reportConfig.LogFile) + logChan := readAccessLogService.Read(reportConfig.LogFile) fmt.Println("💃 saving the custom report...") - customReportService.GenerateReport(reportConfig.ReportFile, accessLogRecords) + customReportService.GenerateReport(reportConfig.ReportFile, logChan) fmt.Println("🎉 custom report has been generated") diff --git a/pkg/cli/command/geo_report.go b/pkg/cli/command/geo_report.go index 4c558ac..decc9bd 100644 --- a/pkg/cli/command/geo_report.go +++ b/pkg/cli/command/geo_report.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "tango/pkg/di" + "tango/pkg/services/mapper" "github.com/urfave/cli" ) @@ -13,7 +14,9 @@ func GeoReportCommand(cliContext *cli.Context) error { reportConfig := di.InitReportConfig(cliContext) filterConfig := di.InitFilterConfig(cliContext) processorConfig := di.InitProcessorConfig(cliContext) - readAccessLogService := di.InitReadAccessLogService(processorConfig, filterConfig) + + logMapper := mapper.NewAccessLogMapper() + readAccessLogService := di.InitReadAccessLogService(logMapper, processorConfig, filterConfig) geoLibResolver := di.InitMaxmindGeoLibResolver() fmt.Println("💃 Tango is on the scene!") @@ -28,16 +31,16 @@ func GeoReportCommand(cliContext *cli.Context) error { return nil } - geoReportService := di.InitGeoReportService(geoLibPath) + geoReportService := di.InitGeoReportService(logMapper, geoLibPath) fmt.Println("💃 started to generate a geo report...") fmt.Println("💃 reading access logs...") - accessLogRecords := readAccessLogService.Read(reportConfig.LogFile) + logChan := readAccessLogService.Read(reportConfig.LogFile) fmt.Println("💃 saving the geo report...") - geoReportService.GenerateReport(reportConfig.ReportFile, accessLogRecords) + geoReportService.GenerateReport(reportConfig.ReportFile, logChan) fmt.Println("🎉 geo report has been generated") diff --git a/pkg/cli/command/journey_report.go b/pkg/cli/command/journey_report.go index 2be8514..da1aa56 100644 --- a/pkg/cli/command/journey_report.go +++ b/pkg/cli/command/journey_report.go @@ -3,6 +3,7 @@ package command import ( "fmt" "tango/pkg/di" + "tango/pkg/services/mapper" "github.com/urfave/cli" ) @@ -13,19 +14,20 @@ func JourneyReportCommand(cliContext *cli.Context) error { generalConfig := di.InitGeneralConfig(cliContext) filterConfig := di.InitFilterConfig(cliContext) processorConfig := di.InitProcessorConfig(cliContext) - readAccessLogService := di.InitReadAccessLogService(processorConfig, filterConfig) - journeyReportService := di.InitJourneyReportService(generalConfig) + logMapper := mapper.NewAccessLogMapper() + readAccessLogService := di.InitReadAccessLogService(logMapper, processorConfig, filterConfig) + journeyReportService := di.InitJourneyReportService(logMapper, generalConfig) fmt.Println("💃 Tango is on the scene!") fmt.Println("💃 started to generate a visitor's journey report...") fmt.Println("💃 reading access logs...") - accessLogRecords := readAccessLogService.Read(reportConfig.LogFile) + logChan := readAccessLogService.Read(reportConfig.LogFile) fmt.Println("💃 saving visitor's journey report...") - journeyReportService.GenerateReport(reportConfig.ReportFile, accessLogRecords) + journeyReportService.GenerateReport(reportConfig.ReportFile, logChan) fmt.Println("🎉 visitor's journey report has been generated") diff --git a/pkg/cli/command/pace_report.go b/pkg/cli/command/pace_report.go index 0663470..4c1bacd 100644 --- a/pkg/cli/command/pace_report.go +++ b/pkg/cli/command/pace_report.go @@ -3,6 +3,7 @@ package command import ( "fmt" "tango/pkg/di" + "tango/pkg/services/mapper" "github.com/urfave/cli" ) @@ -12,19 +13,20 @@ func PaceReportCommand(cliContext *cli.Context) error { reportConfig := di.InitReportConfig(cliContext) filterConfig := di.InitFilterConfig(cliContext) processorConfig := di.InitProcessorConfig(cliContext) - readAccessLogService := di.InitReadAccessLogService(processorConfig, filterConfig) - paceReportService := di.InitPaceReportService() + logMapper := mapper.NewAccessLogMapper() + readAccessLogService := di.InitReadAccessLogService(logMapper, processorConfig, filterConfig) + paceReportService := di.InitPaceReportService(logMapper) fmt.Println("💃 Tango is on the scene!") fmt.Println("💃 started to generate a request pace report...") fmt.Println("💃 reading access logs...") - accessLogRecords := readAccessLogService.Read(reportConfig.LogFile) + logChan := readAccessLogService.Read(reportConfig.LogFile) fmt.Println("💃 saving the request pace report...") - paceReportService.GenerateReport(reportConfig.ReportFile, accessLogRecords) + paceReportService.GenerateReport(reportConfig.ReportFile, logChan) fmt.Println("🎉 request pace report has been generated") diff --git a/pkg/cli/command/request_report.go b/pkg/cli/command/request_report.go index 61a8219..53851a2 100644 --- a/pkg/cli/command/request_report.go +++ b/pkg/cli/command/request_report.go @@ -3,6 +3,7 @@ package command import ( "fmt" "tango/pkg/di" + "tango/pkg/services/mapper" "github.com/urfave/cli" ) @@ -12,18 +13,17 @@ func RequestReportCommand(cliContext *cli.Context) error { reportConfig := di.InitReportConfig(cliContext) filterConfig := di.InitFilterConfig(cliContext) processorConfig := di.InitProcessorConfig(cliContext) - readAccessLogService := di.InitReadAccessLogService(processorConfig, filterConfig) - requestReportService := di.InitRequestReportService() + + logMapper := mapper.NewAccessLogMapper() + readAccessLogService := di.InitReadAccessLogService(logMapper, processorConfig, filterConfig) + requestReportService := di.InitRequestReportService(logMapper) fmt.Println("💃 Tango is on the scene!") fmt.Println("💃 started to generate a request report...") fmt.Println("💃 reading access logs...") - accessLogRecords := readAccessLogService.Read(reportConfig.LogFile) - - fmt.Println("💃 saving the request report...") - - requestReportService.GenerateReport(reportConfig.ReportFile, accessLogRecords) + logChan := readAccessLogService.Read(reportConfig.LogFile) + requestReportService.GenerateReport(reportConfig.ReportFile, logChan) fmt.Println("🎉 request report has been generated") diff --git a/pkg/cli/component/reader_progress_decorator.go b/pkg/cli/component/reader_progress_decorator.go deleted file mode 100644 index fc9e75e..0000000 --- a/pkg/cli/component/reader_progress_decorator.go +++ /dev/null @@ -1,66 +0,0 @@ -package component - -import ( - "log" - "os" - "tango/pkg/adapters/reader" - "tango/pkg/services" - - "github.com/cheggaaa/pb" -) - -type ReaderProgressDecorator struct { - accessLogReader reader.AccessLogReader - progressBar *pb.ProgressBar -} - -// -func NewReaderProgressDecorator(accessLogReader *reader.AccessLogReader) *ReaderProgressDecorator { - return &ReaderProgressDecorator{ - accessLogReader: *accessLogReader, - progressBar: nil, - } -} - -func (r *ReaderProgressDecorator) Read(filePath string, readAccessLogFunc services.ReadAccessLogFunc) { - file, err := os.Open(filePath) - - if err != nil { - log.Fatal(err) - } - defer file.Close() - - fileInfo, err := file.Stat() - - if err != nil { - log.Fatal(err) - } - - fileSize := fileInfo.Size() - - r.progressBar = pb.New64(fileSize) - - r.progressBar.Format("[=>_]") - r.progressBar.SetUnits(pb.U_BYTES) - r.progressBar.SetMaxWidth(100) - - r.start() - - r.accessLogReader.Read(filePath, func(accessLogRecord string, byteCount int) { - readAccessLogFunc(accessLogRecord, byteCount) - - r.update(byteCount) - }) - - r.progressBar.Finish() -} - -// -func (r *ReaderProgressDecorator) start() { - r.progressBar.Start() -} - -// -func (r *ReaderProgressDecorator) update(byteCount int) { - r.progressBar.Add(byteCount) -} diff --git a/pkg/di/containers.go b/pkg/di/containers.go index 0e03f58..0034aab 100644 --- a/pkg/di/containers.go +++ b/pkg/di/containers.go @@ -5,12 +5,12 @@ import ( "tango/pkg/adapters/geo" "tango/pkg/adapters/reader" "tango/pkg/adapters/writer" - "tango/pkg/cli/component" "tango/pkg/cli/factory" "tango/pkg/services" "tango/pkg/services/config" "tango/pkg/services/filter" "tango/pkg/services/geodata" + "tango/pkg/services/mapper" "tango/pkg/services/processor" "tango/pkg/services/report" @@ -85,54 +85,61 @@ func InitGenerateMaxmindConfService() *geodata.GenerateMaxmindConfService { } // InitReadAccessLogService inits a services -func InitReadAccessLogService(processorConfig config.ProcessorConfig, filterConfig config.FilterConfig) *services.ReadAccessLogService { - accessLogReader := reader.NewAccessLogReader() - readerProgressDecorator := component.NewReaderProgressDecorator(accessLogReader) +func InitReadAccessLogService(logMapper *mapper.AccessLogMapper, processorConfig config.ProcessorConfig, filterConfig config.FilterConfig) *services.ReadAccessLogService { + logReader := reader.NewAccessLogReader() + readProgress := reader.NewReadProgress() + ipProcessor := InitIpProcessor(processorConfig) filterAccessLogService := InitFilterAccessLogService(filterConfig) - return services.NewReadAccessLogService(readerProgressDecorator, filterAccessLogService, ipProcessor) + return services.NewReadAccessLogService( + logMapper, + logReader, + readProgress, + filterAccessLogService, + ipProcessor, + ) } // InitGeoReportService inits a services -func InitGeoReportService(maxmindGeoLibPath string) *report.GeoReportService { +func InitGeoReportService(logMapper *mapper.AccessLogMapper, maxmindGeoLibPath string) *report.GeoReportService { geoProvider := geo.NewMaxMindGeoProvider(maxmindGeoLibPath) geoReportWriter := writer.NewGeoReportCsvWriter() - return report.NewGeoReportService(geoProvider, geoReportWriter) + return report.NewGeoReportService(logMapper, geoProvider, geoReportWriter) } // InitBrowserReportService inits a services -func InitBrowserReportService() *report.BrowserReportService { +func InitBrowserReportService(logMapper *mapper.AccessLogMapper) *report.BrowserReportService { browserReportWriter := writer.NewBrowserReportCsvWriter() - return report.NewBrowserReportService(browserReportWriter) + return report.NewBrowserReportService(logMapper, browserReportWriter) } // InitRequestReportService inits a services -func InitRequestReportService() *report.RequestReportService { +func InitRequestReportService(logMapper *mapper.AccessLogMapper) *report.RequestReportService { requestReportWriter := writer.NewRequestReportCsvWriter() - return report.NewRequestReportService(requestReportWriter) + return report.NewRequestReportService(logMapper, requestReportWriter) } // InitPaceReportService inits a services -func InitPaceReportService() *report.PaceReportService { +func InitPaceReportService(logMapper *mapper.AccessLogMapper) *report.PaceReportService { paceReportWriter := writer.NewPaceReportCsvWriter() - return report.NewPaceReportService(paceReportWriter) + return report.NewPaceReportService(logMapper, paceReportWriter) } // InitJourneyReportService inits a services -func InitJourneyReportService(generalConfig config.GeneralConfig) *report.JourneyReportService { +func InitJourneyReportService(logMapper *mapper.AccessLogMapper, generalConfig config.GeneralConfig) *report.JourneyReportService { journeyReportWriter := writer.NewJourneyReportHTMLWriter() - return report.NewJourneyReportService(generalConfig, journeyReportWriter) + return report.NewJourneyReportService(logMapper, generalConfig, journeyReportWriter) } // InitCustomReportService inits a services -func InitCustomReportService() *report.CustomReportService { - customReportWriter := writer.NewCustomReportCsvWriter() +func InitCustomReportService(logMapper *mapper.AccessLogMapper) *report.CustomReportService { + customReportWriter := writer.NewCustomReportCsvWriter(logMapper) return report.NewCustomReportService(customReportWriter) } diff --git a/pkg/services/mapper/access_log.go b/pkg/services/mapper/access_log.go index 47010c0..cb122fb 100644 --- a/pkg/services/mapper/access_log.go +++ b/pkg/services/mapper/access_log.go @@ -4,6 +4,7 @@ import ( "regexp" "strconv" "strings" + "sync" "tango/pkg/entity" "time" ) @@ -11,35 +12,29 @@ import ( var timeFormat = "02/Jan/2006:15:04:05 -0700" var combinedLogFormat = `^(?P<ip_list>[\S, ]+) (\-) \[(?P<time>[\w:/]+\s[+\-]\d{4})\] "(?P<request_method>\S+)\s?(?P<uri>\S+)?\s?(?P<protocol>\S+)?" (?P<response_code>\d{3}|-) (?P<response_size>\d+|-)\s?"?(?P<referer_url>[^"]*)"?\s?"?(?P<user_agent>[^"]*)?"?$` -func filter(s []string, r string) []string { - for i, v := range s { - if v == r { - return append(s[:i], s[i+1:]...) - } - } - return s +type AccessLogMapper struct { + pool sync.Pool + parser *regexp.Regexp } -func findNamedMatches(regex *regexp.Regexp, str string) map[string]string { - match := regex.FindStringSubmatch(str) - - results := map[string]string{} +func NewAccessLogMapper() *AccessLogMapper { + accessLogParser, _ := regexp.Compile(combinedLogFormat) - for i, name := range match { - results[regex.SubexpNames()[i]] = name + return &AccessLogMapper{ + pool: sync.Pool{ + New: func() interface{} { + return entity.AccessLogRecord{} + }, + }, + parser: accessLogParser, } - - return results } // Map access logs line to AccessLogRecord type -func MapAccessLogRecord(accessLogRecord string) entity.AccessLogRecord { - // todo: move compiling from the map method, we need it once and then use compiled pattern - accessLogParser, _ := regexp.Compile(combinedLogFormat) +func (m *AccessLogMapper) Map(recordLine string) entity.AccessLogRecord { + accessRecordInformation := m.parseLine(m.parser, strings.TrimSpace(recordLine)) - accessRecordInformation := findNamedMatches(accessLogParser, strings.TrimSpace(accessLogRecord)) - - ipList := filter( + ipList := m.filter( strings.Split( strings.ReplaceAll(accessRecordInformation["ip_list"], ", ", " "), " ", @@ -47,20 +42,47 @@ func MapAccessLogRecord(accessLogRecord string) entity.AccessLogRecord { "-", ) - time, _ := time.Parse(timeFormat, accessRecordInformation["time"]) + createdAt, _ := time.Parse(timeFormat, accessRecordInformation["time"]) responseCode, _ := strconv.ParseUint(accessRecordInformation["response_code"], 10, 64) responseSize, _ := strconv.ParseUint(accessRecordInformation["response_size"], 10, 64) - return entity.AccessLogRecord{ - IP: ipList, - URI: accessRecordInformation["uri"], - Time: time, - RequestMethod: accessRecordInformation["request_method"], - Protocol: accessRecordInformation["protocol"], - ResponseCode: responseCode, - ResponseSize: responseSize, - RefererURL: accessRecordInformation["referer_url"], - UserAgent: accessRecordInformation["user_agent"], + accessLogRecord := m.pool.Get().(entity.AccessLogRecord) + + accessLogRecord.IP = ipList + accessLogRecord.URI = accessRecordInformation["uri"] + accessLogRecord.Time = createdAt + accessLogRecord.RequestMethod = accessRecordInformation["request_method"] + accessLogRecord.Protocol = accessRecordInformation["protocol"] + accessLogRecord.ResponseCode = responseCode + accessLogRecord.ResponseSize = responseSize + accessLogRecord.RefererURL = accessRecordInformation["referer_url"] + accessLogRecord.UserAgent = accessRecordInformation["user_agent"] + + return accessLogRecord +} + +func (m *AccessLogMapper) parseLine(regex *regexp.Regexp, str string) map[string]string { + match := regex.FindStringSubmatch(str) + + results := map[string]string{} + + for i, name := range match { + results[regex.SubexpNames()[i]] = name } + + return results +} + +func (m *AccessLogMapper) filter(s []string, r string) []string { + for i, v := range s { + if v == r { + return append(s[:i], s[i+1:]...) + } + } + return s +} + +func (m *AccessLogMapper) Recycle(logRecord entity.AccessLogRecord) { + m.pool.Put(logRecord) } diff --git a/pkg/services/read_log_file.go b/pkg/services/read_log_file.go index 52b44fc..72e5e15 100644 --- a/pkg/services/read_log_file.go +++ b/pkg/services/read_log_file.go @@ -1,6 +1,8 @@ package services import ( + "sync" + adapters "tango/pkg/adapters/reader" "tango/pkg/entity" "tango/pkg/services/mapper" "tango/pkg/services/processor" @@ -9,51 +11,72 @@ import ( // type ReadAccessLogFunc func(accessLogRecord string, bytes int) -// -type AccessLogReader interface { - Read(filePath string, readAccessLogFunc ReadAccessLogFunc) -} - // type ReadAccessLogService struct { - accessLogReader AccessLogReader - filterAccessLogService FilterAccessLogService - ipProcessor processor.IPProcessor + logMapper *mapper.AccessLogMapper + logReader *adapters.AccessLogReader + readProgress *adapters.ReadProgress + filterLogService FilterAccessLogService + ipProcessor processor.IPProcessor } // NewReadAccessLogService Create a new ReadAccessLogService func NewReadAccessLogService( - accessLogReader AccessLogReader, - filterAccessLogService FilterAccessLogService, + logMapper *mapper.AccessLogMapper, + accessLogReader *adapters.AccessLogReader, + readProgress *adapters.ReadProgress, + filterLogService FilterAccessLogService, ipProcessor processor.IPProcessor, ) *ReadAccessLogService { return &ReadAccessLogService{ - accessLogReader: accessLogReader, - filterAccessLogService: filterAccessLogService, - ipProcessor: ipProcessor, + logMapper: logMapper, + logReader: accessLogReader, + readProgress: readProgress, + filterLogService: filterLogService, + ipProcessor: ipProcessor, } } -// Read Access Logs and convert them to array of AccessLogRecord-s -func (u *ReadAccessLogService) Read(filePath string) []entity.AccessLogRecord { - accessRecords := make([]entity.AccessLogRecord, 0) +// Read Access Logs (filter them out if needed) and add to the channel +func (s *ReadAccessLogService) Read(filePath string) <-chan entity.AccessLogRecord { + logFileSize := s.logReader.FileSize(filePath) + rawLogChan, bytesReadChan := s.logReader.Read(filePath) + + parsedLogChan := make(chan entity.AccessLogRecord) + + s.readProgress.Track(bytesReadChan, logFileSize) + + var waitGroup sync.WaitGroup - u.accessLogReader.Read(filePath, func(accessLogRecord string, bytes int) { - accessRecord := mapper.MapAccessLogRecord(accessLogRecord) + // TODO: Configure thread number + for i := 0; i < 4; i++ { + waitGroup.Add(1) - // process parsed access log record - accessRecord = u.ipProcessor.Process(accessRecord) + go func() { + defer waitGroup.Done() - // filter/skip parsed access log record if needed - if u.filterAccessLogService.Filter(accessRecord) { - return - } + for rawLog := range rawLogChan { + logRecord := s.logMapper.Map(rawLog) + + // process a parsed access log record + logRecord = s.ipProcessor.Process(logRecord) + + // filter/skip parsed access log record if needed + if s.filterLogService.Filter(logRecord) { + s.logMapper.Recycle(logRecord) + + continue + } + + parsedLogChan <- logRecord + } + }() + } - accessRecords = append( - accessRecords, - accessRecord, - ) - }) + go func() { + waitGroup.Wait() + close(parsedLogChan) + }() - return accessRecords + return parsedLogChan } diff --git a/pkg/services/report/browser_report.go b/pkg/services/report/browser_report.go index 299de1d..66bf21a 100644 --- a/pkg/services/report/browser_report.go +++ b/pkg/services/report/browser_report.go @@ -2,7 +2,9 @@ package report import ( "strings" - entity2 "tango/pkg/entity" + "sync" + entity "tango/pkg/entity" + "tango/pkg/services/mapper" ) // @@ -22,58 +24,80 @@ type BrowserReportWriter interface { // type BrowserReportService struct { + logMapper *mapper.AccessLogMapper browserReportWriter BrowserReportWriter } // -func NewBrowserReportService(browserReportWriter BrowserReportWriter) *BrowserReportService { +func NewBrowserReportService(logMapper *mapper.AccessLogMapper, browserReportWriter BrowserReportWriter) *BrowserReportService { return &BrowserReportService{ + logMapper: logMapper, browserReportWriter: browserReportWriter, } } // Process access logs and collect browser reports -func (u *BrowserReportService) GenerateReport(reportPath string, accessRecords []entity2.AccessLogRecord) { +func (s *BrowserReportService) GenerateReport(reportPath string, logChan <-chan entity.AccessLogRecord) { + var browserCategories = entity.GetBrowserClassification() + var browserReport = make(map[string]*BrowserReportItem) - var browserCategories = entity2.GetBrowserClassification() + var mutex sync.Mutex - for _, accessRecord := range accessRecords { - userAgent := accessRecord.UserAgent + var waitGroup sync.WaitGroup - browserAgent := userAgent - browserCategory := "Unknown" + for i := 0; i < 4; i++ { + waitGroup.Add(1) - // classify the current browser - for _, browserCategoryItem := range browserCategories { - if strings.Contains(userAgent, browserCategoryItem.Agent) { - browserAgent = browserCategoryItem.Agent - browserCategory = browserCategoryItem.Category + go func() { + defer waitGroup.Done() - break - } - } + for accessRecord := range logChan { + userAgent := accessRecord.UserAgent - if _, ok := browserReport[browserAgent]; ok { - browserReport[browserAgent].Requests++ - browserReport[browserAgent].Bandwidth += accessRecord.ResponseSize + browserAgent := userAgent + browserCategory := "Unknown" - // add a new unique occurance of user agent - if _, found := browserReport[browserAgent].UserAgents[userAgent]; !found { - browserReport[browserAgent].UserAgents[userAgent] = true - } + // classify the current browser + for _, browserCategoryItem := range browserCategories { + if strings.Contains(userAgent, browserCategoryItem.Agent) { + browserAgent = browserCategoryItem.Agent + browserCategory = browserCategoryItem.Category + + break + } + } + + mutex.Lock() - continue - } - - browserReport[browserAgent] = &BrowserReportItem{ - Browser: browserAgent, - Category: browserCategory, - SampleUrl: accessRecord.URI, - Requests: 1, - Bandwidth: accessRecord.ResponseSize, - UserAgents: map[string]bool{userAgent: true}, - } + if _, ok := browserReport[browserAgent]; ok { + browserReport[browserAgent].Requests++ + browserReport[browserAgent].Bandwidth += accessRecord.ResponseSize + + // add a new unique occurance of user agent + if _, found := browserReport[browserAgent].UserAgents[userAgent]; !found { + browserReport[browserAgent].UserAgents[userAgent] = true + } + + s.logMapper.Recycle(accessRecord) + mutex.Unlock() + continue + } + + browserReport[browserAgent] = &BrowserReportItem{ + Browser: browserAgent, + Category: browserCategory, + SampleUrl: accessRecord.URI, + Requests: 1, + Bandwidth: accessRecord.ResponseSize, + UserAgents: map[string]bool{userAgent: true}, + } + s.logMapper.Recycle(accessRecord) + mutex.Unlock() + } + }() } - u.browserReportWriter.Save(reportPath, browserReport) + waitGroup.Wait() + + s.browserReportWriter.Save(reportPath, browserReport) } diff --git a/pkg/services/report/custom_report.go b/pkg/services/report/custom_report.go index c82beba..187ba49 100644 --- a/pkg/services/report/custom_report.go +++ b/pkg/services/report/custom_report.go @@ -6,7 +6,7 @@ import ( // type CustomReportWriter interface { - Save(reportPath string, accessLogs []entity.AccessLogRecord) + Save(reportPath string, logChan <-chan entity.AccessLogRecord) } // @@ -22,7 +22,7 @@ func NewCustomReportService(customReportWriter CustomReportWriter) *CustomReport } // Save a custom log based on access logs -func (u *CustomReportService) GenerateReport(reportPath string, accessRecords []entity.AccessLogRecord) { +func (u *CustomReportService) GenerateReport(reportPath string, logChan <-chan entity.AccessLogRecord) { // nothing to do here yet - u.customReportWriter.Save(reportPath, accessRecords) + u.customReportWriter.Save(reportPath, logChan) } diff --git a/pkg/services/report/geo_report.go b/pkg/services/report/geo_report.go index 72ff9c2..318beae 100644 --- a/pkg/services/report/geo_report.go +++ b/pkg/services/report/geo_report.go @@ -1,24 +1,62 @@ package report import ( + "sync" "tango/pkg/entity" + "tango/pkg/services/mapper" ) -// Geo Location Data +// GeoData Location Data type GeoData struct { Country string City string Continent string } -// Geolocation report information -type Geolocation struct { +// GeoRecord report information +type GeoRecord struct { GeoData *GeoData SampleRequest string BrowserAgent string Requests uint64 } +// GeoReport +type GeoReport struct { + report map[string]*GeoRecord + mu sync.Mutex +} + +// AddRequest +func (r *GeoReport) AddRequest(logRecord entity.AccessLogRecord, IP string, geoData *GeoData) { + r.mu.Lock() + defer r.mu.Unlock() + + if geoRecord, found := r.report[IP]; found { + geoRecord.Requests += 1 + return + } + + r.report[IP] = &GeoRecord{ + GeoData: geoData, + SampleRequest: logRecord.URI, + BrowserAgent: logRecord.UserAgent, + Requests: 1, + } +} + +func (r *GeoReport) Report() map[string]*GeoRecord { + return r.report +} + +// NewGeoReport +func NewGeoReport() *GeoReport { + return &GeoReport{ + report: make(map[string]*GeoRecord), + mu: sync.Mutex{}, + } +} + // GeoLocationProvider provides ability to find geolocation data by IP type GeoLocationProvider interface { GetGeoDataByIP(ip string) *GeoData // keep geoip2 references in the services layer, don't plan to support anything else @@ -27,47 +65,51 @@ type GeoLocationProvider interface { // GeoReportWriter is an interface for saving geo location reports type GeoReportWriter interface { - Save(reportPath string, geolocationReport map[string]*Geolocation) + Save(reportPath string, geoReport *GeoReport) } // GeoReportService knows how to generate geo reports type GeoReportService struct { + logMapper *mapper.AccessLogMapper geoLocationProvider GeoLocationProvider geoReportWriter GeoReportWriter } // NewGeoReportService -func NewGeoReportService(geoLocationProvider GeoLocationProvider, geoReportWriter GeoReportWriter) *GeoReportService { +func NewGeoReportService(logMapper *mapper.AccessLogMapper, geoLocationProvider GeoLocationProvider, geoReportWriter GeoReportWriter) *GeoReportService { return &GeoReportService{ + logMapper: logMapper, geoLocationProvider: geoLocationProvider, geoReportWriter: geoReportWriter, } } // GenerateReport processes access logs and collect geo reports -func (u *GeoReportService) GenerateReport(reportPath string, accessRecords []entity.AccessLogRecord) { - var geoReport = make(map[string]*Geolocation) +func (s *GeoReportService) GenerateReport(reportPath string, logChan <-chan entity.AccessLogRecord) { + geoReport := NewGeoReport() - defer u.geoLocationProvider.Close() + defer s.geoLocationProvider.Close() - for _, accessRecord := range accessRecords { - for _, ip := range accessRecord.IP { + var waitGroup sync.WaitGroup - if _, ok := geoReport[ip]; ok { - geoReport[ip].Requests++ - continue - } + for i := 0; i < 4; i++ { + waitGroup.Add(1) - geoData := u.geoLocationProvider.GetGeoDataByIP(ip) + go func() { + defer waitGroup.Done() - geoReport[ip] = &Geolocation{ - GeoData: geoData, - SampleRequest: accessRecord.URI, - BrowserAgent: accessRecord.UserAgent, - Requests: 1, + for accessRecord := range logChan { + for _, ip := range accessRecord.IP { + geoData := s.geoLocationProvider.GetGeoDataByIP(ip) + geoReport.AddRequest(accessRecord, ip, geoData) + } + + s.logMapper.Recycle(accessRecord) } - } + }() } - u.geoReportWriter.Save(reportPath, geoReport) + waitGroup.Wait() + + s.geoReportWriter.Save(reportPath, geoReport) } diff --git a/pkg/services/report/journey_report.go b/pkg/services/report/journey_report.go index 68a1b26..aae0504 100644 --- a/pkg/services/report/journey_report.go +++ b/pkg/services/report/journey_report.go @@ -5,8 +5,10 @@ import ( "fmt" "log" "strings" + "sync" entity2 "tango/pkg/entity" "tango/pkg/services/config" + "tango/pkg/services/mapper" ) // JourneyReportWriter knows how to save journey report @@ -16,13 +18,15 @@ type JourneyReportWriter interface { // JourneyReportService knows how to prepare journey reports type JourneyReportService struct { + logMapper *mapper.AccessLogMapper baseURL string journeyReportWriter JourneyReportWriter } // NewJourneyReportService creates a new instance of the services -func NewJourneyReportService(generalConfig config.GeneralConfig, journeyReportWriter JourneyReportWriter) *JourneyReportService { +func NewJourneyReportService(logMapper *mapper.AccessLogMapper, generalConfig config.GeneralConfig, journeyReportWriter JourneyReportWriter) *JourneyReportService { return &JourneyReportService{ + logMapper: logMapper, baseURL: generalConfig.BaseURL, journeyReportWriter: journeyReportWriter, } @@ -41,25 +45,40 @@ func getUUID() string { } // GenerateReport processes access logs and determine visitor's journeys on the website -func (u *JourneyReportService) GenerateReport(reportPath string, accessRecords []entity2.AccessLogRecord) { +func (s *JourneyReportService) GenerateReport(reportPath string, logChan <-chan entity2.AccessLogRecord) { journeyReport := make(map[string]*entity2.Journey, 0) - - for _, accessRecord := range accessRecords { - ipList := accessRecord.IP - - for _, ip := range ipList { - if _, ok := journeyReport[ip]; !ok { - journeyReport[ip] = &entity2.Journey{ - ID: getUUID(), - IP: ip, + var mutex sync.Mutex // TODO: try to use sync.Map + var waitGroup sync.WaitGroup + + for i := 0; i < 4; i++ { + waitGroup.Add(1) + + go func() { + defer waitGroup.Done() + + for accessRecord := range logChan { + ipList := accessRecord.IP + + for _, ip := range ipList { + mutex.Lock() + if _, ok := journeyReport[ip]; !ok { + journeyReport[ip] = &entity2.Journey{ + ID: getUUID(), + IP: ip, + } + } + + s.addPlace(journeyReport[ip], accessRecord) + s.logMapper.Recycle(accessRecord) + mutex.Unlock() } } - - u.addPlace(journeyReport[ip], accessRecord) - } + }() } - u.journeyReportWriter.Save(reportPath, journeyReport) + waitGroup.Wait() + + s.journeyReportWriter.Save(reportPath, journeyReport) } // addPlace @@ -85,7 +104,7 @@ func (u *JourneyReportService) addPlace(journey *entity2.Journey, accessLogRecor Time: accessLogRecord.Time, UserAgent: accessLogRecord.UserAgent, Protocol: accessLogRecord.Protocol, - ResponseCode: 200, // assume that previous request was successfull + ResponseCode: 200, // assume that previous request was successful ResponseSize: 0, // hard to say about size, keep 0 bytes RequestMethod: "GET", // usually GET method is cachable, so assume it was used for this request as well RefererURL: "-", diff --git a/pkg/services/report/pace_report.go b/pkg/services/report/pace_report.go index 54578ac..7dd923f 100644 --- a/pkg/services/report/pace_report.go +++ b/pkg/services/report/pace_report.go @@ -1,7 +1,9 @@ package report import ( + "sync" "tango/pkg/entity" + "tango/pkg/services/mapper" ) var minuteTimeFormat = "2006-01-02 15:04" // minute time group template @@ -35,46 +37,64 @@ type PaceReportWriter interface { // PaceReportService type PaceReportService struct { + logMapper *mapper.AccessLogMapper paceReportWriter PaceReportWriter } // -func NewPaceReportService(paceReportWriter PaceReportWriter) *PaceReportService { +func NewPaceReportService(logMapper *mapper.AccessLogMapper, paceReportWriter PaceReportWriter) *PaceReportService { return &PaceReportService{ + logMapper: logMapper, paceReportWriter: paceReportWriter, } } // GenerateReport processes access logs and collects request pace reports -func (u *PaceReportService) GenerateReport(reportPath string, accessRecords []entity.AccessLogRecord) { +func (s *PaceReportService) GenerateReport(reportPath string, logChan <-chan entity.AccessLogRecord) { var paceReport = make([]*PaceHourReportItem, 0) - - for _, accessRecord := range accessRecords { - ipList := accessRecord.IP - browser := accessRecord.UserAgent - hourTimeGroup := accessRecord.Time.Format(hourTimeFormat) - minuteTimeGroup := accessRecord.Time.Format(minuteTimeFormat) - - lastHourReportItem := u.findPaceHourReportItem(&paceReport, hourTimeGroup) - lastMinuteReportItem := u.findPaceMinuteReportItem(&lastHourReportItem.MinutePaceItems, minuteTimeGroup) - - for _, ip := range ipList { - if _, found := lastMinuteReportItem.IpPaces[ip]; !found { - lastMinuteReportItem.IpPaces[ip] = &PaceIpReportItem{ - IP: ip, - Requests: 0, - Browser: browser, + var mutex sync.Mutex // TODO: try to use sync.Map + + var waitGroup sync.WaitGroup + + for i := 0; i < 4; i++ { + waitGroup.Add(1) + go func() { + defer waitGroup.Done() + + for accessRecord := range logChan { + ipList := accessRecord.IP + browser := accessRecord.UserAgent + hourTimeGroup := accessRecord.Time.Format(hourTimeFormat) + minuteTimeGroup := accessRecord.Time.Format(minuteTimeFormat) + + mutex.Lock() + lastHourReportItem := s.findPaceHourReportItem(&paceReport, hourTimeGroup) + lastMinuteReportItem := s.findPaceMinuteReportItem(&lastHourReportItem.MinutePaceItems, minuteTimeGroup) + + for _, ip := range ipList { + if _, found := lastMinuteReportItem.IpPaces[ip]; !found { + lastMinuteReportItem.IpPaces[ip] = &PaceIpReportItem{ + IP: ip, + Requests: 0, + Browser: browser, + } + } + + lastMinuteReportItem.IpPaces[ip].Requests++ } - } - lastMinuteReportItem.IpPaces[ip].Requests++ - } + lastMinuteReportItem.Requests++ + lastHourReportItem.Requests++ - lastMinuteReportItem.Requests++ - lastHourReportItem.Requests++ + s.logMapper.Recycle(accessRecord) + mutex.Unlock() + } + }() } - u.paceReportWriter.Save(reportPath, paceReport) + waitGroup.Wait() + + s.paceReportWriter.Save(reportPath, paceReport) } func (u *PaceReportService) findPaceHourReportItem(paceHourReport *[]*PaceHourReportItem, time string) *PaceHourReportItem { diff --git a/pkg/services/report/request_report.go b/pkg/services/report/request_report.go index 3350ab8..c5e1848 100644 --- a/pkg/services/report/request_report.go +++ b/pkg/services/report/request_report.go @@ -4,7 +4,9 @@ import ( "fmt" "net/url" "regexp" + "sync" "tango/pkg/entity" + "tango/pkg/services/mapper" ) // RequestReportItem @@ -15,26 +17,67 @@ type RequestReportItem struct { RefererURLs map[string]bool } +type RequestReport struct { + report map[string]*RequestReportItem + mu sync.Mutex +} + +func (r *RequestReport) AddRequest(path string, refererURL string, logRecord entity.AccessLogRecord) { + r.mu.Lock() + defer r.mu.Unlock() + + if requestRecord, exists := r.report[path]; exists { + requestRecord.Requests++ + + // collect referer URLs + if _, found := requestRecord.RefererURLs[refererURL]; !found { + requestRecord.RefererURLs[refererURL] = true + } + + return + } + + r.report[path] = &RequestReportItem{ + Path: path, + Requests: 1, + ResponseCode: logRecord.ResponseCode, + RefererURLs: map[string]bool{refererURL: true}, + } +} + +func (r *RequestReport) Report() map[string]*RequestReportItem { + return r.report +} + +func NewRequestReport() *RequestReport { + return &RequestReport{ + report: make(map[string]*RequestReportItem), + mu: sync.Mutex{}, + } +} + // RequestReportWriter type RequestReportWriter interface { - Save(reportPath string, browserReport map[string]*RequestReportItem) + Save(reportPath string, requestReport *RequestReport) } // RequestReportService type RequestReportService struct { + logMapper *mapper.AccessLogMapper requestReportWriter RequestReportWriter } // -func NewRequestReportService(requestReportWriter RequestReportWriter) *RequestReportService { +func NewRequestReportService(logMapper *mapper.AccessLogMapper, requestReportWriter RequestReportWriter) *RequestReportService { return &RequestReportService{ + logMapper: logMapper, requestReportWriter: requestReportWriter, } } // GenerateReport processes access logs and collect request reports -func (u *RequestReportService) GenerateReport(reportPath string, accessRecords []entity.AccessLogRecord) { - var requestReport = make(map[string]*RequestReportItem) +func (s *RequestReportService) GenerateReport(reportPath string, logChan <-chan entity.AccessLogRecord) { + requestReport := NewRequestReport() // todo: move to configs queryPatterns := []string{ @@ -55,44 +98,42 @@ func (u *RequestReportService) GenerateReport(reportPath string, accessRecords [ pathFilters = append(pathFilters, filter) } - for _, accessRecord := range accessRecords { - requestURI := accessRecord.URI - refererURL := accessRecord.RefererURL + var waitGroup sync.WaitGroup - parsedURI, err := url.Parse(requestURI) + for i := 0; i < 4; i++ { + waitGroup.Add(1) - path := "" + go func() { + defer waitGroup.Done() - if err != nil { - // during security scans it's possible to submit a request which triggers a panic in url.Parse() - // in that case, just use the original URI - path = requestURI - } else { - path = parsedURI.Path - } + for accessRecord := range logChan { + requestURI := accessRecord.URI + refererURL := accessRecord.RefererURL - for _, filter := range pathFilters { - path = filter.ReplaceAllString(path, "") - } + parsedURI, err := url.Parse(requestURI) - if _, ok := requestReport[path]; ok { - requestReport[path].Requests++ + path := "" - // collect referer URLs - if _, found := requestReport[path].RefererURLs[refererURL]; !found { - requestReport[path].RefererURLs[refererURL] = true - } + if err != nil { + // during security scans it's possible to submit a request which triggers a panic in url.Parse() + // in that case, just use the original URI + path = requestURI + } else { + path = parsedURI.Path + } - continue - } + for _, filter := range pathFilters { + path = filter.ReplaceAllString(path, "") + } - requestReport[path] = &RequestReportItem{ - Path: path, - Requests: 1, - ResponseCode: accessRecord.ResponseCode, - RefererURLs: map[string]bool{refererURL: true}, - } + requestReport.AddRequest(path, refererURL, accessRecord) + s.logMapper.Recycle(accessRecord) + } + }() } - u.requestReportWriter.Save(reportPath, requestReport) + waitGroup.Wait() + + fmt.Println("💃 saving the request report...") + s.requestReportWriter.Save(reportPath, requestReport) } diff --git a/test/create_geo_report_test.go b/test/create_geo_report_test.go index d63c475..cd525c0 100644 --- a/test/create_geo_report_test.go +++ b/test/create_geo_report_test.go @@ -11,7 +11,7 @@ import ( ) func TestInstallGeoLib(t *testing.T) { - assert := assert.New(t) + asserter := assert.New(t) tangoCli := cli.NewTangoCli("0.0.0-test", "dummycommithash") tangoCli.Run([]string{ @@ -24,14 +24,14 @@ func TestInstallGeoLib(t *testing.T) { }) _, geoConfExistErr := di.InitMaxmindConfResolver().GetPath() - assert.False(os.IsNotExist(geoConfExistErr), "MaxMind Configuration File was not created") + asserter.False(os.IsNotExist(geoConfExistErr), "MaxMind Configuration File was not created") _, geoLibExistErr := di.InitMaxmindGeoLibResolver().GetPath() - assert.False(os.IsNotExist(geoLibExistErr), "MaxMind Geo Lib was not created") + asserter.False(os.IsNotExist(geoLibExistErr), "MaxMind Geo Lib was not created") } func TestCreateGeoReportWithSystemIpProcessor(t *testing.T) { - assert := assert.New(t) + asserter := assert.New(t) tangoCli := cli.NewTangoCli("0.0.0-test", "dummycommithash") reportFilePath := "results/geo-report-with-system-ip-processor.csv" @@ -51,25 +51,25 @@ func TestCreateGeoReportWithSystemIpProcessor(t *testing.T) { reportHeader, reportBody := testReport[0], testReport[1:] - assert.Equal(reportHeader, writer.GeoReportHeader, "Geo Report Header is different") - assert.Len(reportBody, 40, "Geo Report should contain 40 record") + asserter.Equal(reportHeader, writer.GeoReportHeader, "Geo Report Header is different") + asserter.Len(reportBody, 40, "Geo Report should contain 40 record") testGeoData := map[string][]string{ - "130.93.253.236": []string{ - "130.93.253.236", // IP - "Hungary", // Country - "", // City - "/cstm/sec/load?sections=&updsecid=false&_=1562558398896", // Sample Request - "Europe", // Continent + "130.93.253.236": { + "130.93.253.236", // IP + "Hungary", // Country + "Szentlorinckata", // City + "", // Sample Request, keep it empty as it may change since it's a random sampled URL + "Europe", // Continent "Mozilla/5.0 (iPhone; CPU iPhone OS 12_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.1.1 Mobile/15E148 Safari/604.1", //Browser Agent "18", // Count of Requests }, - "121.79.80.29": []string{ - "121.79.80.29", // IP - "Australia", // Country - "", // City - "/product-tk8-smawbtk.html", // Sample Request - "Oceania", // Continent + "121.79.80.29": { + "121.79.80.29", // IP + "Australia", // Country + "", // City + "", // Sample Request, keep it empty as it may change since it's a random sampled URL + "Oceania", // Continent "Mozilla/5.0 (X11; CrOS x86_64 12105.75.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.102 Safari/537.36", //Browser Agent "4", // Count of Requests }, @@ -90,13 +90,15 @@ func TestCreateGeoReportWithSystemIpProcessor(t *testing.T) { } for _, reportItem := range reportBody { - ip := reportItem[0] + IP := reportItem[0] - if expectedItem, ok := testGeoData[ip]; ok { - assert.ElementsMatch(expectedItem, reportItem, "Sample Geo Report Item should match") + if expectedItem, found := testGeoData[IP]; found { + reportItem[4] = "" // ignore the randomly sampled URL + + asserter.ElementsMatch(expectedItem, reportItem, "Sample Geo Report Item should match") } - assert.NotContains(testSystemIpList, ip, "System IP should be filtered") + asserter.NotContains(testSystemIpList, IP, "System IP should be filtered") } }