Skip to content

Commit

Permalink
Report errors on background tasks (#538)
Browse files Browse the repository at this point in the history
  • Loading branch information
turt2live authored Jan 9, 2024
1 parent f601e6f commit ac7d366
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

* Dendrite homeservers can now have their media imported safely, and `adminApiKind` may be set to `dendrite`.
* Exporting MMR's data to Synapse is now possible with `import_to_synapse`. To use it, first run `gdpr_export` or similar.
* Errors encountered during a background task, such as an API-induced export, are exposed as `error_message` in the admin API.

### Changed

Expand Down
4 changes: 4 additions & 0 deletions api/custom/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type TaskStatus struct {
StartTs int64 `json:"start_ts"`
EndTs int64 `json:"end_ts"`
IsFinished bool `json:"is_finished"`
Error string `json:"error_message"`
}

func GetTask(r *http.Request, rctx rcontext.RequestContext, user _apimeta.UserInfo) interface{} {
Expand Down Expand Up @@ -54,6 +55,7 @@ func GetTask(r *http.Request, rctx rcontext.RequestContext, user _apimeta.UserIn
StartTs: task.StartTs,
EndTs: task.EndTs,
IsFinished: task.EndTs > 0,
Error: task.Error,
}}
}

Expand All @@ -76,6 +78,7 @@ func ListAllTasks(r *http.Request, rctx rcontext.RequestContext, user _apimeta.U
StartTs: task.StartTs,
EndTs: task.EndTs,
IsFinished: task.EndTs > 0,
Error: task.Error,
})
}

Expand All @@ -101,6 +104,7 @@ func ListUnfinishedTasks(r *http.Request, rctx rcontext.RequestContext, user _ap
StartTs: task.StartTs,
EndTs: task.EndTs,
IsFinished: task.EndTs > 0,
Error: task.Error,
})
}

Expand Down
25 changes: 18 additions & 7 deletions database/table_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,23 @@ type DbTask struct {
Params *AnonymousJson
StartTs int64
EndTs int64
Error string
}

const selectTask = "SELECT id, task, params, start_ts, end_ts FROM background_tasks WHERE id = $1;"
const insertTask = "INSERT INTO background_tasks (task, params, start_ts, end_ts) VALUES ($1, $2, $3, 0) RETURNING id, task, params, start_ts, end_ts;"
const selectAllTasks = "SELECT id, task, params, start_ts, end_ts FROM background_tasks;"
const selectIncompleteTasks = "SELECT id, task, params, start_ts, end_ts FROM background_tasks WHERE end_ts <= 0;"
const selectTask = "SELECT id, task, params, start_ts, end_ts, error FROM background_tasks WHERE id = $1;"
const insertTask = "INSERT INTO background_tasks (task, params, start_ts, end_ts) VALUES ($1, $2, $3, 0) RETURNING id, task, params, start_ts, end_ts, error;"
const selectAllTasks = "SELECT id, task, params, start_ts, end_ts, error FROM background_tasks;"
const selectIncompleteTasks = "SELECT id, task, params, start_ts, end_ts, error FROM background_tasks WHERE end_ts <= 0;"
const updateTaskEndTime = "UPDATE background_tasks SET end_ts = $2 WHERE id = $1;"
const updateTaskError = "UPDATE background_tasks SET error = $2 WHERE id = $1;"

type tasksTableStatements struct {
selectTask *sql.Stmt
insertTask *sql.Stmt
selectAllTasks *sql.Stmt
selectIncompleteTasks *sql.Stmt
updateTaskEndTime *sql.Stmt
updateTaskError *sql.Stmt
}

type tasksTableWithContext struct {
Expand All @@ -53,6 +56,9 @@ func prepareTasksTables(db *sql.DB) (*tasksTableStatements, error) {
if stmts.updateTaskEndTime, err = db.Prepare(updateTaskEndTime); err != nil {
return nil, errors.New("error preparing updateTaskEndTime: " + err.Error())
}
if stmts.updateTaskError, err = db.Prepare(updateTaskError); err != nil {
return nil, errors.New("error preparing updateTaskError: " + err.Error())
}

return stmts, nil
}
Expand All @@ -67,7 +73,7 @@ func (s *tasksTableStatements) Prepare(ctx rcontext.RequestContext) *tasksTableW
func (s *tasksTableWithContext) Insert(name string, params *AnonymousJson, startTs int64) (*DbTask, error) {
row := s.statements.insertTask.QueryRowContext(s.ctx, name, params, startTs)
val := &DbTask{}
err := row.Scan(&val.TaskId, &val.Name, &val.Params, &val.StartTs, &val.EndTs)
err := row.Scan(&val.TaskId, &val.Name, &val.Params, &val.StartTs, &val.EndTs, &val.Error)
if err != nil {
return nil, err
}
Expand All @@ -79,10 +85,15 @@ func (s *tasksTableWithContext) SetEndTime(taskId int, endTs int64) error {
return err
}

func (s *tasksTableWithContext) SetError(taskId int, errVal string) error {
_, err := s.statements.updateTaskError.ExecContext(s.ctx, taskId, errVal)
return err
}

func (s *tasksTableWithContext) Get(id int) (*DbTask, error) {
row := s.statements.selectTask.QueryRowContext(s.ctx, id)
val := &DbTask{}
err := row.Scan(&val.TaskId, &val.Name, &val.Params, &val.StartTs, &val.EndTs)
err := row.Scan(&val.TaskId, &val.Name, &val.Params, &val.StartTs, &val.EndTs, &val.Error)
if errors.Is(err, sql.ErrNoRows) {
err = nil
val = nil
Expand All @@ -105,7 +116,7 @@ func (s *tasksTableWithContext) GetAll(includingFinished bool) ([]*DbTask, error
}
for rows.Next() {
val := &DbTask{}
if err = rows.Scan(&val.TaskId, &val.Name, &val.Params, &val.StartTs, &val.EndTs); err != nil {
if err = rows.Scan(&val.TaskId, &val.Name, &val.Params, &val.StartTs, &val.EndTs, &val.Error); err != nil {
return nil, err
}
results = append(results, val)
Expand Down
18 changes: 14 additions & 4 deletions docs/admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ The response is a list of all known tasks:
},
"start_ts": 1567460189913,
"end_ts": 1567460190502,
"is_finished": true
"is_finished": true,
"error_message": ""
},
{
"task_id": 2,
Expand All @@ -334,13 +335,16 @@ The response is a list of all known tasks:
},
"start_ts": 1567460189913,
"end_ts": 0,
"is_finished": false
"is_finished": false,
"error_message": ""
}
]
```

**Note**: The `params` vary depending on the task.

If `error_message` is present and not an empty string on the returned task, the task failed part way through.

#### Listing unfinished tasks

URL: `GET /_matrix/media/unstable/admin/tasks/unfinished`
Expand All @@ -358,13 +362,16 @@ The response is a list of all unfinished tasks:
},
"start_ts": 1567460189913,
"end_ts": 0,
"is_finished": false
"is_finished": false,
"error_message": ""
}
]
```

**Note**: The `params` vary depending on the task.

If `error_message` is present and not an empty string on the returned task, the task failed part way through.

#### Getting information on a specific task

URL: `GET /_matrix/media/unstable/admin/tasks/<task ID>`
Expand All @@ -383,12 +390,15 @@ The response is the status of the task:
},
"start_ts": 1567460189913,
"end_ts": 1567460190502,
"is_finished": true
"is_finished": true,
"error_message": ""
}
```

**Note**: The `params` vary depending on the task.

If `error_message` is present and not an empty string on the returned task, the task failed part way through.

## Exporting/Importing data

Exports (and therefore imports) are currently done on a per-user basis. This is primarily useful when moving users to new hosts or doing GDPR exports of user data.
Expand Down
1 change: 1 addition & 0 deletions migrations/28_add_task_error_column_down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE background_tasks DROP COLUMN error;
1 change: 1 addition & 0 deletions migrations/28_add_task_error_column_up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE background_tasks ADD COLUMN error TEXT NOT NULL DEFAULT '';
9 changes: 9 additions & 0 deletions tasks/task_runner/00-internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@ func markDone(ctx rcontext.RequestContext, task *database.DbTask) {
}
ctx.Log.Infof("Task '%s' completed", task.Name)
}

func markError(ctx rcontext.RequestContext, task *database.DbTask, errVal error) {
taskDb := database.GetInstance().Tasks.Prepare(ctx)
if err := taskDb.SetError(task.TaskId, errVal.Error()); err != nil {
ctx.Log.Warn("Error updating task with error message: ", err)
sentry.CaptureException(err)
}
ctx.Log.Debugf("Task '%s' flagged with error", task.Name)
}
7 changes: 7 additions & 0 deletions tasks/task_runner/datastore_migrate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package task_runner

import (
"errors"
"fmt"

"github.com/getsentry/sentry-go"
Expand All @@ -22,31 +23,36 @@ func DatastoreMigrate(ctx rcontext.RequestContext, task *database.DbTask) {

params := DatastoreMigrateParams{}
if err := task.Params.ApplyTo(&params); err != nil {
markError(ctx, task, errors.Join(errors.New("error in decode"), err))
ctx.Log.Error("Error decoding params: ", err)
sentry.CaptureException(err)
return
}

if params.SourceDsId == params.TargetDsId {
markError(ctx, task, errors.New("source and target are the same"))
ctx.Log.Error("Source and target datastore are the same")
return
}

sourceDs, ok := datastores.Get(ctx, params.SourceDsId)
if !ok {
markError(ctx, task, errors.New("missing source"))
ctx.Log.Error("Unable to locate source datastore ID")
return
}

targetDs, ok := datastores.Get(ctx, params.TargetDsId)
if !ok {
markError(ctx, task, errors.New("missing target"))
ctx.Log.Error("Unable to locate target datastore ID")
return
}

db := database.GetInstance().MetadataView.Prepare(ctx)

if records, err := db.GetMediaForDatastoreByLastAccess(params.SourceDsId, params.BeforeTs); err != nil {
markError(ctx, task, errors.Join(errors.New("error in locate"), err))
ctx.Log.Error("Error getting movable media: ", err)
sentry.CaptureException(err)
return
Expand All @@ -55,6 +61,7 @@ func DatastoreMigrate(ctx rcontext.RequestContext, task *database.DbTask) {
}

if records, err := db.GetThumbnailsForDatastoreByLastAccess(params.SourceDsId, params.BeforeTs); err != nil {
markError(ctx, task, errors.Join(errors.New("error in thumbnails"), err))
ctx.Log.Error("Error getting movable thumbnails: ", err)
sentry.CaptureException(err)
return
Expand Down
9 changes: 9 additions & 0 deletions tasks/task_runner/export_data.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package task_runner

import (
"errors"
"io"

"github.com/getsentry/sentry-go"
Expand All @@ -22,43 +23,50 @@ func ExportData(ctx rcontext.RequestContext, task *database.DbTask) {

params := ExportDataParams{}
if err := task.Params.ApplyTo(&params); err != nil {
markError(ctx, task, errors.Join(errors.New("error in decode"), err))
ctx.Log.Error("Error decoding params: ", err)
sentry.CaptureException(err)
return
}

if params.ExportId == "" {
markError(ctx, task, errors.New("missing export id"))
ctx.Log.Error("No export ID provided")
sentry.CaptureMessage("No export ID provided")
return
}

exportDb := database.GetInstance().Exports.Prepare(ctx)
if existingEntity, err := exportDb.GetEntity(params.ExportId); err != nil {
markError(ctx, task, errors.Join(errors.New("error in validate"), err))
ctx.Log.Error("Error checking export ID: ", err)
sentry.CaptureException(err)
return
} else if existingEntity != "" {
markError(ctx, task, errors.New("export id already in use"))
ctx.Log.Error("Export ID already in use")
sentry.CaptureMessage("Export ID already in use")
return
}

entityId := params.UserId
if entityId != "" && entityId[0] != '@' {
markError(ctx, task, errors.New("invalid user id"))
ctx.Log.Error("Invalid user ID")
sentry.CaptureMessage("Invalid user ID")
return
} else if entityId == "" {
entityId = params.ServerName
}
if entityId == "" {
markError(ctx, task, errors.New("no entity provided"))
ctx.Log.Error("No entity provided")
sentry.CaptureMessage("No entity provided")
return
}

if err := exportDb.Insert(params.ExportId, entityId); err != nil {
markError(ctx, task, errors.Join(errors.New("error in persist"), err))
ctx.Log.Error("Error persisting export ID: ", err)
sentry.CaptureException(err)
return
Expand Down Expand Up @@ -92,6 +100,7 @@ func ExportData(ctx rcontext.RequestContext, task *database.DbTask) {
}

if err := archival.ExportEntityData(ctx, params.ExportId, entityId, params.IncludeS3Urls, persistPart); err != nil {
markError(ctx, task, errors.Join(errors.New("error in archival"), err))
ctx.Log.Error("Error during export: ", err)
sentry.CaptureException(err)
return
Expand Down
15 changes: 9 additions & 6 deletions tasks/task_runner/import_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func (e *importEngine) appendFile(data io.ReadCloser) error {
return nil
}

func (e *importEngine) finish(chClose bool) error {
func (e *importEngine) finish(chClose bool, err error) error {
if err != nil {
markError(e.ctx, e.task, err)
}
markDone(e.ctx, e.task)
importEngines.Delete(e.importId)
if chClose {
Expand Down Expand Up @@ -118,7 +121,7 @@ func (e *importEngine) workFn(ch chan *os.File) {
if err != nil {
e.ctx.Log.Error("Error during manifest search: ", err)
sentry.CaptureException(err)
_ = e.finish(false)
_ = e.finish(false, errors.Join(errors.New("error in search"), err))
return
}
if ok {
Expand All @@ -127,7 +130,7 @@ func (e *importEngine) workFn(ch chan *os.File) {
if err = e.archiver.ProcessFile(f2, v2archive.ProcessOpts{}); err != nil {
e.ctx.Log.Error("Error during file processing (branch-1): ", err)
sentry.CaptureException(err)
_ = e.finish(false)
_ = e.finish(false, errors.Join(errors.New("error in processing"), err))
return
}
}
Expand All @@ -137,14 +140,14 @@ func (e *importEngine) workFn(ch chan *os.File) {
if err := e.archiver.ProcessFile(f, v2archive.ProcessOpts{}); err != nil {
e.ctx.Log.Error("Error during file processing (branch-2): ", err)
sentry.CaptureException(err)
_ = e.finish(false)
_ = e.finish(false, errors.Join(errors.New("error in processing2"), err))
return
}
}

if e.archiver.HasManifest() && len(e.archiver.GetNotUploadedMxcUris()) == 0 {
e.ctx.Log.Debug("No more files waiting for import - closing engine")
_ = e.finish(false)
_ = e.finish(false, nil)
return
}
}
Expand Down Expand Up @@ -188,7 +191,7 @@ func FinishImport(ctx rcontext.RequestContext, importId string) error {
} else if engine, ok := val.(*importEngine); !ok {
return errors.New("logic error: non-engine stored")
} else if engine != nil {
return engine.finish(true)
return engine.finish(true, nil)
}
return errors.New("logic error: missed engine lookup")
}

0 comments on commit ac7d366

Please sign in to comment.