Skip to content

Commit

Permalink
Remove temp file usage for logs (#607)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanlinsley authored Oct 11, 2024
1 parent 9874b00 commit 4b82419
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 225 deletions.
22 changes: 4 additions & 18 deletions input/postgres/log_pg_read_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,29 +82,22 @@ func LogPgReadFile(ctx context.Context, server *state.Server, globalCollectionOp
for _, fileName := range fileNames {
if err != nil {
err = fmt.Errorf("LogFileSql/Scan: %s", err)
goto ErrorCleanup
return server.LogPrevState, nil, nil, err
}
var logData string
var newOffset int64
prevOffset, _ := psl.ReadFileMarkers[fileName]
err = db.QueryRowContext(ctx, QueryMarkerSQL+logReadSql, fileName, prevOffset).Scan(&newOffset, &logData)
if err != nil {
err = fmt.Errorf("LogReadSql/QueryRow: %s", err)
goto ErrorCleanup
return server.LogPrevState, nil, nil, err
}

var logFile state.LogFile
logFile, err = state.NewLogFile(nil, fileName)
logFile, err = state.NewLogFile(fileName)
if err != nil {
err = fmt.Errorf("error initializing log file: %s", err)
goto ErrorCleanup
}

_, err := logFile.TmpFile.WriteString(logData)
if err != nil {
err = fmt.Errorf("Error writing to tempfile: %s", err)
logFile.Cleanup(logger)
goto ErrorCleanup
return server.LogPrevState, nil, nil, err
}

logReader := bufio.NewReader(strings.NewReader(logData))
Expand All @@ -119,11 +112,4 @@ func LogPgReadFile(ctx context.Context, server *state.Server, globalCollectionOp
psl.ReadFileMarkers = newMarkers

return psl, logFiles, samples, err

ErrorCleanup:
for _, logFile := range logFiles {
logFile.Cleanup(logger)
}

return server.LogPrevState, nil, nil, err
}
138 changes: 35 additions & 103 deletions input/system/rds/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"strings"
"time"

Expand All @@ -21,6 +18,16 @@ import (
"github.com/aws/aws-sdk-go/service/rds"
)

// Analyze and submit at most the trailing 10 megabytes of the retrieved RDS log file portions
//
// This avoids an OOM in two edge cases:
// 1) When starting the collector, as we always load the last 10,000 lines (which may be very long)
// 2) When extremely large values are output in a single log event (e.g. query parameters in a DETAIL line)
//
// We intentionally throw away data here (and warn the user about it), since the alternative
// is often a collector crash (due to OOM), which would be less desirable.
const maxLogParsingSize = 10 * 1024 * 1024

// DownloadLogFiles - Gets log files for an Amazon RDS instance
func DownloadLogFiles(ctx context.Context, server *state.Server, logger *util.Logger) (state.PersistedLogState, []state.LogFile, []state.PostgresQuerySample, error) {
var err error
Expand Down Expand Up @@ -60,58 +67,52 @@ func DownloadLogFiles(ctx context.Context, server *state.Server, logger *util.Lo
var newMarkers = make(map[string]string)

for _, rdsLogFile := range resp.DescribeDBLogFiles {
var content []byte
var lastMarker *string
var bytesWritten int64

prevMarker, ok := psl.AwsMarkers[*rdsLogFile.LogFileName]
if ok {
lastMarker = &prevMarker
}

var tmpFile *os.File
tmpFile, err = ioutil.TempFile("", "")
if err != nil {
err = fmt.Errorf("Error allocating tempfile for logs: %s", err)
goto ErrorCleanup
}

for {
var newBytesWritten int
var newContent string
var newMarker *string
var additionalDataPending bool
newBytesWritten, newMarker, additionalDataPending, err = downloadRdsLogFilePortion(rdsSvc, tmpFile, logger, &identifier, rdsLogFile.LogFileName, lastMarker)
newContent, newMarker, additionalDataPending, err = downloadRdsLogFilePortion(rdsSvc, logger, &identifier, rdsLogFile.LogFileName, lastMarker)
if err != nil {
util.CleanUpTmpFile(tmpFile, logger)
goto ErrorCleanup
return server.LogPrevState, nil, nil, err
}
if len(newContent) > maxLogParsingSize {
content = []byte(newContent[len(newContent)-maxLogParsingSize:])
} else {
// Shift existing data left if needed
overflow := len(content) + len(newContent) - maxLogParsingSize
if overflow > 0 {
copy(content, content[overflow:])
}
pos := min(len(content), maxLogParsingSize-len(newContent))
// Resize result buffer if needed
if pos+len(newContent) > len(content) {
content = append(content, make([]byte, pos+len(newContent)-len(content))...)
}
copy(content[pos:], newContent)
}

bytesWritten += int64(newBytesWritten)
if newMarker != nil {
lastMarker = newMarker
}

if !additionalDataPending {
break
}
}

var buf []byte
buf, tmpFile, err = readLogFilePortion(tmpFile, bytesWritten, logger)
if err != nil {
util.CleanUpTmpFile(tmpFile, logger)
goto ErrorCleanup
}

fileContent := bufio.NewReader(strings.NewReader(string(buf)))
newLogLines, newSamples := logs.ParseAndAnalyzeBuffer(fileContent, linesNewerThan, server)
stream := bufio.NewReader(strings.NewReader(string(content)))
newLogLines, newSamples := logs.ParseAndAnalyzeBuffer(stream, linesNewerThan, server)

// Pass responsibility to LogFile for cleaning up the temp file
var logFile state.LogFile
logFile, err = state.NewLogFile(tmpFile, *rdsLogFile.LogFileName)
logFile, err = state.NewLogFile(*rdsLogFile.LogFileName)
if err != nil {
err = fmt.Errorf("error initializing log file: %s", err)
util.CleanUpTmpFile(tmpFile, logger)
goto ErrorCleanup
return server.LogPrevState, nil, nil, err
}
logFile.LogLines = append(logFile.LogLines, newLogLines...)
samples = append(samples, newSamples...)
Expand All @@ -125,13 +126,6 @@ func DownloadLogFiles(ctx context.Context, server *state.Server, logger *util.Lo
psl.AwsMarkers = newMarkers

return psl, logFiles, samples, err

ErrorCleanup:
for _, logFile := range logFiles {
logFile.Cleanup(logger)
}

return server.LogPrevState, nil, nil, err
}

var DescribeDBClustersErrorCache *util.TTLMap = util.NewTTLMap(10 * 60)
Expand Down Expand Up @@ -164,7 +158,7 @@ func getAwsDbInstanceID(config config.ServerConfig, sess *session.Session) (stri
return *instance.DBInstanceIdentifier, nil
}

func downloadRdsLogFilePortion(rdsSvc *rds.RDS, tmpFile *os.File, logger *util.Logger, identifier *string, logFileName *string, lastMarker *string) (newBytesWritten int, newMarker *string, additionalDataPending bool, err error) {
func downloadRdsLogFilePortion(rdsSvc *rds.RDS, logger *util.Logger, identifier *string, logFileName *string, lastMarker *string) (content string, newMarker *string, additionalDataPending bool, err error) {
var resp *rds.DownloadDBLogFilePortionOutput
resp, err = rdsSvc.DownloadDBLogFilePortion(&rds.DownloadDBLogFilePortionInput{
DBInstanceIdentifier: identifier,
Expand All @@ -182,71 +176,9 @@ func downloadRdsLogFilePortion(rdsSvc *rds.RDS, tmpFile *os.File, logger *util.L
return
}

if len(*resp.LogFileData) > 0 {
newBytesWritten, err = tmpFile.WriteString(*resp.LogFileData)
if err != nil {
err = fmt.Errorf("Error writing to tempfile: %s", err)
return
}
}

content = *resp.LogFileData
newMarker = resp.Marker
additionalDataPending = *resp.AdditionalDataPending

return
}

// Analyze and submit at most the trailing 10 megabytes of the retrieved RDS log file portions
//
// This avoids an OOM in two edge cases:
// 1) When starting the collector, as we always load the last 10,000 lines (which may be very long)
// 2) When extremely large values are output in a single log event (e.g. query parameters in a DETAIL line)
//
// We intentionally throw away data here (and warn the user about it), since the alternative
// is often a collector crash (due to OOM), which would be less desirable.
const maxLogParsingSize = 10 * 1024 * 1024

func readLogFilePortion(tmpFile *os.File, bytesWritten int64, logger *util.Logger) ([]byte, *os.File, error) {
var err error
var readSize int64

exceededMaxParsingSize := bytesWritten > maxLogParsingSize
if exceededMaxParsingSize {
logger.PrintWarning("RDS log file portion exceeded more than 10 MB of data in 30 second interval, collecting most recent data only (skipping %d bytes)", bytesWritten-maxLogParsingSize)
readSize = maxLogParsingSize
} else {
readSize = bytesWritten
}

// Read the data into memory for analysis
_, err = tmpFile.Seek(bytesWritten-readSize, io.SeekStart)
if err != nil {
return nil, tmpFile, fmt.Errorf("Error seeking tempfile: %s", err)
}
buf := make([]byte, readSize)
_, err = io.ReadFull(tmpFile, buf)
if err != nil {
return nil, tmpFile, fmt.Errorf("Error reading %d bytes from tempfile: %s", len(buf), err)
}

// If necessary, recreate tempfile with just the data we're analyzing
// (this supports the later read of the temp file during the log upload)
if exceededMaxParsingSize {
truncatedTmpFile, err := ioutil.TempFile("", "")
if err != nil {
return nil, tmpFile, fmt.Errorf("Error allocating tempfile for logs: %s", err)
}

_, err = truncatedTmpFile.Write(buf)
if err != nil {
util.CleanUpTmpFile(truncatedTmpFile, logger)
return nil, tmpFile, fmt.Errorf("Error writing to tempfile: %s", err)
}

// We succeeded, so remove the previous file and use the new one going forward
util.CleanUpTmpFile(tmpFile, logger)
tmpFile = truncatedTmpFile
}

return buf, tmpFile, nil
}
5 changes: 0 additions & 5 deletions logs/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -2266,11 +2266,6 @@ func AnalyzeBackendLogLines(logLines []state.LogLine) (logLinesOut []state.LogLi
logLinesOut = append(logLinesOut, logLine)
}

// Remove log line content. Note that ReplaceSecrets adds it back after secrets have been removed.
for idx := range logLinesOut {
logLinesOut[idx].Content = ""
}

return
}

Expand Down
6 changes: 6 additions & 0 deletions logs/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4328,6 +4328,9 @@ var tests = []testpair{
func TestAnalyzeLogLines(t *testing.T) {
for _, pair := range tests {
l, s := logs.AnalyzeLogLines(pair.logLinesIn)
for idx := range l {
l[idx].Content = ""
}

cfg := pretty.CompareConfig
cfg.SkipZeroFields = true
Expand Down Expand Up @@ -4387,6 +4390,9 @@ func TestAnalyzeLogLinesHeroku(t *testing.T) {
t.Setenv("PORT", "dummy")
for _, pair := range testsHeroku {
l, s := logs.AnalyzeLogLines(pair.logLinesIn)
for idx := range l {
l[idx].Content = ""
}

cfg := pretty.CompareConfig
cfg.SkipZeroFields = true
Expand Down
12 changes: 4 additions & 8 deletions logs/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/pganalyze/collector/state"
)

func PrintDebugInfo(logFileContents string, logLines []state.LogLine, samples []state.PostgresQuerySample) {
func PrintDebugInfo(logLines []state.LogLine, samples []state.PostgresQuerySample) {
fmt.Printf("log lines: %d, query samples: %d\n", len(logLines), len(samples))
groups := map[pganalyze_collector.LogLineInformation_LogClassification]int{}
unclassifiedLogLines := []state.LogLine{}
Expand All @@ -35,15 +35,13 @@ func PrintDebugInfo(logFileContents string, logLines []state.LogLine, samples []
if len(unclassifiedLogLines) > 0 {
fmt.Printf("\nUnclassified log lines:\n")
for _, logLine := range unclassifiedLogLines {
fmt.Printf("%s\n", logFileContents[logLine.ByteStart:logLine.ByteEnd])
fmt.Printf(" Level: %s\n", logLine.LogLevel)
fmt.Printf(" Content: %#v\n", logFileContents[logLine.ByteContentStart:logLine.ByteEnd])
fmt.Printf(" %s: %s", logLine.LogLevel, logLine.Content)
fmt.Printf("---\n")
}
}
}

func PrintDebugLogLines(logFileContents string, logLines []state.LogLine, classifications map[pganalyze_collector.LogLineInformation_LogClassification]bool) {
func PrintDebugLogLines(logLines []state.LogLine, classifications map[pganalyze_collector.LogLineInformation_LogClassification]bool) {
fmt.Println("\nParsed log lines:")
linesById := make(map[uuid.UUID]*state.LogLine)
for _, logLine := range logLines {
Expand All @@ -63,15 +61,13 @@ func PrintDebugLogLines(logFileContents string, logLines []state.LogLine, classi
if err != nil {
panic(err)
}
fmt.Printf("%s\n", logFileContents[logLine.ByteStart:logLine.ByteEnd])
fmt.Printf(" Level: %s\n", logLine.LogLevel)
fmt.Printf(" %s: %s", logLine.LogLevel, logLine.Content)
if logLine.ParentUUID == uuid.Nil {
fmt.Printf(" Classification: %s (%d)\n", logLine.Classification, logLine.Classification)
}
if len(logLine.Details) > 0 {
fmt.Printf(" Details: %s\n", detailsStr)
}
fmt.Printf(" Content: %#v\n", logFileContents[logLine.ByteContentStart:logLine.ByteEnd])
fmt.Printf("---\n")
}
}
Expand Down
4 changes: 2 additions & 2 deletions logs/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

const replacement = "[redacted]"

func ReplaceSecrets(input []byte, logLines []state.LogLine, filterLogSecret []state.LogSecretKind) {
func ReplaceSecrets(logLines []state.LogLine, filterLogSecret []state.LogSecretKind) {
filterUnidentified := false
for _, k := range filterLogSecret {
if k == state.UnidentifiedLogSecret {
Expand All @@ -20,10 +20,10 @@ func ReplaceSecrets(input []byte, logLines []state.LogLine, filterLogSecret []st
if filterUnidentified && logLines[idx].Classification == 0 {
logLines[idx].Content = replacement + "\n"
} else {
content := input[logLines[idx].ByteContentStart:logLines[idx].ByteEnd]
sort.Slice(logLine.SecretMarkers, func(i, j int) bool {
return logLine.SecretMarkers[i].ByteStart < logLine.SecretMarkers[j].ByteEnd
})
content := []byte(logLine.Content)
bytesChecked := 0
offset := 0
for _, m := range logLine.SecretMarkers {
Expand Down
2 changes: 1 addition & 1 deletion logs/replace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestReplaceSecrets(t *testing.T) {
server := state.MakeServer(config.ServerConfig{}, false)
server.LogParser = logs.NewLogParser(logs.LogPrefixAmazonRds, nil, false)
logLines, _ := logs.ParseAndAnalyzeBuffer(reader, time.Time{}, server)
logs.ReplaceSecrets([]byte(pair.input), logLines, state.ParseFilterLogSecret(pair.filterLogSecret))
logs.ReplaceSecrets(logLines, state.ParseFilterLogSecret(pair.filterLogSecret))

cfg := pretty.CompareConfig
cfg.SkipZeroFields = true
Expand Down
12 changes: 3 additions & 9 deletions logs/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,14 @@ func isAdditionalLineLevel(str pganalyze_collector.LogLineInformation_LogLevel)
return false
}

// writeTmpLogFile - Setup temporary file that will be used for encryption
func writeTmpLogFile(readyLogLines []state.LogLine, logger *util.Logger) (state.LogFile, error) {
logFile, err := state.NewLogFile(nil, "")
func createLogFile(readyLogLines []state.LogLine, logger *util.Logger) (state.LogFile, error) {
logFile, err := state.NewLogFile("")
if err != nil {
return state.LogFile{}, fmt.Errorf("could not initialize log file: %s", err)
}

currentByteStart := int64(0)
for idx, logLine := range readyLogLines {
_, err = logFile.TmpFile.WriteString(logLine.Content)
if err != nil {
logFile.Cleanup(logger)
return logFile, err
}
logLine.ByteStart = currentByteStart
logLine.ByteContentStart = currentByteStart
logLine.ByteEnd = currentByteStart + int64(len(logLine.Content))
Expand Down Expand Up @@ -307,7 +301,7 @@ func AnalyzeStreamInGroups(logLines []state.LogLine, now time.Time, server *stat
}
}

logFile, err := writeTmpLogFile(analyzableLogLines, logger)
logFile, err := createLogFile(analyzableLogLines, logger)
if err != nil {
return state.TransientLogState{}, state.LogFile{}, logLines, err
}
Expand Down
Loading

0 comments on commit 4b82419

Please sign in to comment.