From 06dec3e5e41e0ab212301d32e4561b5d5f9b87e9 Mon Sep 17 00:00:00 2001 From: mudler Date: Wed, 13 Nov 2024 15:10:16 +0100 Subject: [PATCH] feat(jobserver): allow to specify global configuration via env file As an example we provide to the webscraper a WEBSCRAPER_BLACKLIST environment variable which contains a comma separated list of url to blacklist during scraping. The JobConfiguration is a generic map[string]interface{} that can be populated top-level. It gets unmarshalled as JSON by the jobs to map the relevant fields in the configuration. Signed-off-by: mudler --- api/types/job.go | 19 ++++++++++++++++++- cmd/tee-worker/main.go | 9 +++------ go.mod | 1 + go.sum | 2 ++ internal/api/api_test.go | 2 +- internal/api/start.go | 4 ++-- internal/jobs/webscraper.go | 22 ++++++++++++++++++++-- internal/jobs/webscraper_test.go | 17 ++++++++++++++++- internal/jobserver/jobserver.go | 12 +++++++----- internal/jobserver/jobserver_test.go | 2 +- internal/jobserver/worker.go | 2 +- 11 files changed, 72 insertions(+), 20 deletions(-) diff --git a/api/types/job.go b/api/types/job.go index 463e053..5715dfc 100644 --- a/api/types/job.go +++ b/api/types/job.go @@ -1,6 +1,9 @@ package types -import "encoding/json" +import ( + "encoding/json" + "fmt" +) type JobResponse struct { UID string `json:"uid"` @@ -30,3 +33,17 @@ func (ja JobArguments) Unmarshal(i interface{}) error { } return json.Unmarshal(dat, i) } + +type JobConfiguration map[string]interface{} + +func (jc JobConfiguration) Unmarshal(v interface{}) error { + data, err := json.Marshal(jc) + if err != nil { + return fmt.Errorf("error marshalling job configuration: %w", err) + } + if err := json.Unmarshal(data, v); err != nil { + return fmt.Errorf("error unmarshalling job configuration: %w", err) + } + + return nil +} diff --git a/cmd/tee-worker/main.go b/cmd/tee-worker/main.go index 836f5c0..2cb3541 100644 --- a/cmd/tee-worker/main.go +++ b/cmd/tee-worker/main.go @@ -2,16 +2,13 @@ package main import ( "context" - "os" "github.com/masa-finance/tee-worker/internal/api" ) func main() { - listenAddress := os.Getenv("LISTEN_ADDRESS") - if listenAddress == "" { - listenAddress = ":8080" - } + listenAddress := listenAddress() + jc := readConfig() - api.Start(context.Background(), listenAddress) + api.Start(context.Background(), listenAddress, jc) } diff --git a/go.mod b/go.mod index 4e7c7bb..0ff52f8 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect + github.com/joho/godotenv v1.5.1 // indirect github.com/labstack/gommon v0.4.2 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/go.sum b/go.sum index 7af2d09..1a19871 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,8 @@ github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 h1:5iH8iuqE5apketRbSF github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= +github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/labstack/echo/v4 v4.12.0 h1:IKpw49IMryVB2p1a4dzwlhP1O2Tf2E0Ir/450lH+kI0= github.com/labstack/echo/v4 v4.12.0/go.mod h1:UP9Cr2DJXbOK3Kr9ONYzNowSh7HP0aG0ShAyycHSJvM= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 660b066..ec36b16 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -24,7 +24,7 @@ var _ = Describe("API", func() { // Start the server ctx, cancel = context.WithCancel(context.Background()) - go Start(ctx, "127.0.0.1:40912") + go Start(ctx, "127.0.0.1:40912", types.JobConfiguration{}) // Wait for the server to start time.Sleep(2 * time.Second) diff --git a/internal/api/start.go b/internal/api/start.go index 23b4e44..7423598 100644 --- a/internal/api/start.go +++ b/internal/api/start.go @@ -13,12 +13,12 @@ import ( "github.com/masa-finance/tee-worker/pkg/tee" ) -func Start(ctx context.Context, listenAddress string) { +func Start(ctx context.Context, listenAddress string, config types.JobConfiguration) { // Echo instance e := echo.New() // Jobserver instance - jobServer := jobserver.NewJobServer(2) + jobServer := jobserver.NewJobServer(2, config) go jobServer.Run(ctx) diff --git a/internal/jobs/webscraper.go b/internal/jobs/webscraper.go index b9aee91..ddf39f9 100644 --- a/internal/jobs/webscraper.go +++ b/internal/jobs/webscraper.go @@ -1,26 +1,44 @@ package jobs import ( + "fmt" + "strings" + "github.com/masa-finance/tee-worker/api/types" ) const WebScraperType = "web-scraper" type WebScraper struct { + configuration WebScraperConfiguration +} + +type WebScraperConfiguration struct { + Blacklist []string `json:"webscraper_blacklist"` } type WebScraperArgs struct { URL string `json:"url"` } -func NewWebScraper() *WebScraper { - return &WebScraper{} +func NewWebScraper(jc types.JobConfiguration) *WebScraper { + config := WebScraperConfiguration{} + jc.Unmarshal(&config) + return &WebScraper{configuration: config} } func (ws *WebScraper) ExecuteJob(j types.Job) (types.JobResult, error) { args := &WebScraperArgs{} j.Arguments.Unmarshal(args) + for _, u := range ws.configuration.Blacklist { + if strings.Contains(args.URL, u) { + return types.JobResult{ + Error: fmt.Sprintf("URL blacklisted: %s", args.URL), + }, nil + } + } + // Do the web scraping here // For now, just return the URL return types.JobResult{ diff --git a/internal/jobs/webscraper_test.go b/internal/jobs/webscraper_test.go index 70926d0..f5f33cb 100644 --- a/internal/jobs/webscraper_test.go +++ b/internal/jobs/webscraper_test.go @@ -10,7 +10,7 @@ import ( var _ = Describe("Webscraper", func() { It("should fake scraping for now", func() { - webScraper := NewWebScraper() + webScraper := NewWebScraper(types.JobConfiguration{}) res, err := webScraper.ExecuteJob(types.Job{ Type: "web-scraper", @@ -22,4 +22,19 @@ var _ = Describe("Webscraper", func() { Expect(res.Error).To(BeEmpty()) Expect(res.Data.(string)).To(Equal("google")) }) + + It("should allow to blacklist urls", func() { + webScraper := NewWebScraper(types.JobConfiguration{ + "webscraper_blacklist": []string{"google"}, + }) + + res, err := webScraper.ExecuteJob(types.Job{ + Type: "web-scraper", + Arguments: map[string]interface{}{ + "url": "google", + }, + }) + Expect(err).ToNot(HaveOccurred()) + Expect(res.Error).To(Equal("URL blacklisted: google")) + }) }) diff --git a/internal/jobserver/jobserver.go b/internal/jobserver/jobserver.go index afd00a1..e8bd5c0 100644 --- a/internal/jobserver/jobserver.go +++ b/internal/jobserver/jobserver.go @@ -14,18 +14,20 @@ type JobServer struct { jobChan chan types.Job workers int - results map[string]types.JobResult + results map[string]types.JobResult + jobConfiguration types.JobConfiguration } -func NewJobServer(workers int) *JobServer { +func NewJobServer(workers int, jc types.JobConfiguration) *JobServer { if workers == 0 { workers++ } return &JobServer{ - jobChan: make(chan types.Job), - results: make(map[string]types.JobResult), - workers: workers, + jobChan: make(chan types.Job), + results: make(map[string]types.JobResult), + workers: workers, + jobConfiguration: jc, } } diff --git a/internal/jobserver/jobserver_test.go b/internal/jobserver/jobserver_test.go index 99e4403..c4627ec 100644 --- a/internal/jobserver/jobserver_test.go +++ b/internal/jobserver/jobserver_test.go @@ -12,7 +12,7 @@ import ( var _ = Describe("Jobserver", func() { It("runs jobs", func() { - jobserver := NewJobServer(2) + jobserver := NewJobServer(2, types.JobConfiguration{}) uuid := jobserver.AddJob(types.Job{ Type: "web-scraper", diff --git a/internal/jobserver/worker.go b/internal/jobserver/worker.go index 6f29ab4..47926d5 100644 --- a/internal/jobserver/worker.go +++ b/internal/jobserver/worker.go @@ -30,7 +30,7 @@ func (js *JobServer) doWork(j types.Job) error { var w worker switch j.Type { case jobs.WebScraperType: - w = jobs.NewWebScraper() + w = jobs.NewWebScraper(js.jobConfiguration) default: js.Lock() js.results[j.UUID] = types.JobResult{