Skip to content

Commit

Permalink
feat(jobserver): allow to specify global configuration via env file
Browse files Browse the repository at this point in the history
As an example we provide to the webscraper a WEBSCRAPER_BLACKLIST
environment variable which contains a comma separated list of url to
blacklist during scraping.

The JobConfiguration is a generic map[string]interface{} that can be
populated top-level. It gets unmarshalled as JSON by the jobs to map the
relevant fields in the configuration.

Signed-off-by: mudler <[email protected]>
  • Loading branch information
mudler committed Nov 13, 2024
1 parent 9caa4ce commit 06dec3e
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 20 deletions.
19 changes: 18 additions & 1 deletion api/types/job.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package types

import "encoding/json"
import (
"encoding/json"
"fmt"
)

type JobResponse struct {
UID string `json:"uid"`
Expand Down Expand Up @@ -30,3 +33,17 @@ func (ja JobArguments) Unmarshal(i interface{}) error {
}
return json.Unmarshal(dat, i)
}

type JobConfiguration map[string]interface{}

func (jc JobConfiguration) Unmarshal(v interface{}) error {
data, err := json.Marshal(jc)
if err != nil {
return fmt.Errorf("error marshalling job configuration: %w", err)
}
if err := json.Unmarshal(data, v); err != nil {
return fmt.Errorf("error unmarshalling job configuration: %w", err)
}

return nil
}
9 changes: 3 additions & 6 deletions cmd/tee-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package main

import (
"context"
"os"

"github.com/masa-finance/tee-worker/internal/api"
)

func main() {
listenAddress := os.Getenv("LISTEN_ADDRESS")
if listenAddress == "" {
listenAddress = ":8080"
}
listenAddress := listenAddress()
jc := readConfig()

api.Start(context.Background(), listenAddress)
api.Start(context.Background(), listenAddress, jc)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 h1:5iH8iuqE5apketRbSF
github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/labstack/echo/v4 v4.12.0 h1:IKpw49IMryVB2p1a4dzwlhP1O2Tf2E0Ir/450lH+kI0=
github.com/labstack/echo/v4 v4.12.0/go.mod h1:UP9Cr2DJXbOK3Kr9ONYzNowSh7HP0aG0ShAyycHSJvM=
github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0=
Expand Down
2 changes: 1 addition & 1 deletion internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var _ = Describe("API", func() {
// Start the server
ctx, cancel = context.WithCancel(context.Background())

go Start(ctx, "127.0.0.1:40912")
go Start(ctx, "127.0.0.1:40912", types.JobConfiguration{})

// Wait for the server to start
time.Sleep(2 * time.Second)
Expand Down
4 changes: 2 additions & 2 deletions internal/api/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
"github.com/masa-finance/tee-worker/pkg/tee"
)

func Start(ctx context.Context, listenAddress string) {
func Start(ctx context.Context, listenAddress string, config types.JobConfiguration) {
// Echo instance
e := echo.New()

// Jobserver instance
jobServer := jobserver.NewJobServer(2)
jobServer := jobserver.NewJobServer(2, config)

go jobServer.Run(ctx)

Expand Down
22 changes: 20 additions & 2 deletions internal/jobs/webscraper.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,44 @@
package jobs

import (
"fmt"
"strings"

"github.com/masa-finance/tee-worker/api/types"
)

const WebScraperType = "web-scraper"

type WebScraper struct {
configuration WebScraperConfiguration
}

type WebScraperConfiguration struct {
Blacklist []string `json:"webscraper_blacklist"`
}

type WebScraperArgs struct {
URL string `json:"url"`
}

func NewWebScraper() *WebScraper {
return &WebScraper{}
func NewWebScraper(jc types.JobConfiguration) *WebScraper {
config := WebScraperConfiguration{}
jc.Unmarshal(&config)
return &WebScraper{configuration: config}
}

func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) {
args := &WebScraperArgs{}
j.Arguments.Unmarshal(args)

for _, u := range ws.configuration.Blacklist {
if strings.Contains(args.URL, u) {
return types.JobResult{
Error: fmt.Sprintf("URL blacklisted: %s", args.URL),
}, nil
}
}

// Do the web scraping here
// For now, just return the URL
return types.JobResult{
Expand Down
17 changes: 16 additions & 1 deletion internal/jobs/webscraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

var _ = Describe("Webscraper", func() {
It("should fake scraping for now", func() {
webScraper := NewWebScraper()
webScraper := NewWebScraper(types.JobConfiguration{})

res, err := webScraper.ExecuteJob(types.Job{
Type: "web-scraper",
Expand All @@ -22,4 +22,19 @@ var _ = Describe("Webscraper", func() {
Expect(res.Error).To(BeEmpty())
Expect(res.Data.(string)).To(Equal("google"))
})

It("should allow to blacklist urls", func() {
webScraper := NewWebScraper(types.JobConfiguration{
"webscraper_blacklist": []string{"google"},
})

res, err := webScraper.ExecuteJob(types.Job{
Type: "web-scraper",
Arguments: map[string]interface{}{
"url": "google",
},
})
Expect(err).ToNot(HaveOccurred())
Expect(res.Error).To(Equal("URL blacklisted: google"))
})
})
12 changes: 7 additions & 5 deletions internal/jobserver/jobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@ type JobServer struct {
jobChan chan types.Job
workers int

results map[string]types.JobResult
results map[string]types.JobResult
jobConfiguration types.JobConfiguration
}

func NewJobServer(workers int) *JobServer {
func NewJobServer(workers int, jc types.JobConfiguration) *JobServer {
if workers == 0 {
workers++
}

return &JobServer{
jobChan: make(chan types.Job),
results: make(map[string]types.JobResult),
workers: workers,
jobChan: make(chan types.Job),
results: make(map[string]types.JobResult),
workers: workers,
jobConfiguration: jc,
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/jobserver/jobserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

var _ = Describe("Jobserver", func() {
It("runs jobs", func() {
jobserver := NewJobServer(2)
jobserver := NewJobServer(2, types.JobConfiguration{})

uuid := jobserver.AddJob(types.Job{
Type: "web-scraper",
Expand Down
2 changes: 1 addition & 1 deletion internal/jobserver/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (js *JobServer) doWork(j types.Job) error {
var w worker
switch j.Type {
case jobs.WebScraperType:
w = jobs.NewWebScraper()
w = jobs.NewWebScraper(js.jobConfiguration)
default:
js.Lock()
js.results[j.UUID] = types.JobResult{
Expand Down

0 comments on commit 06dec3e

Please sign in to comment.