Skip to content

Commit

Permalink
Merge pull request #10 from bricks-cloud/v0.0.9
Browse files Browse the repository at this point in the history
[v0.0.9] Start providing metrics regarding number of api calls and associated costs
  • Loading branch information
spikelu2016 authored Nov 3, 2023
2 parents d40604a + 2ba5507 commit 9776e30
Show file tree
Hide file tree
Showing 12 changed files with 508 additions and 78 deletions.
9 changes: 7 additions & 2 deletions cmd/bricksllm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func main() {
log.Sugar().Fatalf("error creating keys table: %v", err)
}

err = store.CreateEventsTable()
if err != nil {
log.Sugar().Fatalf("error creating events table: %v", err)
}

memStore, err := memdb.NewMemDb(store, log, cfg.InMemoryDbUpdateInterval)
if err != nil {
log.Sugar().Fatalf("cannot initialize memdb: %v", err)
Expand Down Expand Up @@ -102,7 +107,7 @@ func main() {

e := encrypter.NewEncrypter()
m := manager.NewManager(store, e)
krm := manager.NewReportingManager(costStorage, store)
krm := manager.NewReportingManager(costStorage, store, store)
as, err := web.NewAdminServer(log, *modePtr, m, krm)
if err != nil {
log.Sugar().Fatalf("error creating admin http server: %v", err)
Expand All @@ -117,7 +122,7 @@ func main() {

ce := openai.NewCostEstimator(openai.OpenAiPerThousandTokenCost, tc)
v := validator.NewValidator(costLimitCache, rateLimitCache, costStorage)
rec := recorder.NewRecorder(costStorage, costLimitCache, ce)
rec := recorder.NewRecorder(costStorage, costLimitCache, ce, store)
rlm := manager.NewRateLimitManager(rateLimitCache)

ps, err := web.NewProxyServer(log, *modePtr, *privacyPtr, m, store, memStore, ce, v, rec, cfg.OpenAiKey, e, rlm)
Expand Down
14 changes: 12 additions & 2 deletions cmd/tool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func main() {
log.Sugar().Fatalf("error creating keys table: %v", err)
}

err = store.CreateEventsTable()
if err != nil {
log.Sugar().Fatalf("error creating events table: %v", err)
}

memStore, err := memdb.NewMemDb(store, log, cfg.InMemoryDbUpdateInterval)
if err != nil {
log.Sugar().Fatalf("cannot initialize memdb: %v", err)
Expand Down Expand Up @@ -101,7 +106,7 @@ func main() {

e := encrypter.NewEncrypter()
m := manager.NewManager(store, e)
krm := manager.NewReportingManager(costStorage, store)
krm := manager.NewReportingManager(costStorage, store, store)
as, err := web.NewAdminServer(log, *modePtr, m, krm)
if err != nil {
log.Sugar().Fatalf("error creating admin http server: %v", err)
Expand All @@ -116,7 +121,7 @@ func main() {

ce := openai.NewCostEstimator(openai.OpenAiPerThousandTokenCost, tc)
v := validator.NewValidator(costLimitCache, rateLimitCache, costStorage)
rec := recorder.NewRecorder(costStorage, costLimitCache, ce)
rec := recorder.NewRecorder(costStorage, costLimitCache, ce, store)
rlm := manager.NewRateLimitManager(rateLimitCache)

ps, err := web.NewProxyServer(log, *modePtr, *privacyPtr, m, store, memStore, ce, v, rec, cfg.OpenAiKey, e, rlm)
Expand Down Expand Up @@ -156,5 +161,10 @@ func main() {
log.Sugar().Fatalf("error dropping keys table: %v", err)
}

err = store.DropEventsTable()
if err != nil {
log.Sugar().Fatalf("error dropping events table: %v", err)
}

log.Sugar().Infof("server exited")
}
17 changes: 17 additions & 0 deletions internal/event/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package event

import "github.com/bricks-cloud/bricksllm/internal/provider"

type Event struct {
Id string
CreatedAt int64
Tags []string
KeyId string
CostInUsd float64
Provider provider.Provider
Model string
Status int
PromptTokenCount int
CompletionTokenCount int
LatencyInMs int
}
25 changes: 25 additions & 0 deletions internal/event/reporting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package event

type DataPoint struct {
TimeStamp int64 `json:"timeStamp"`
NumberOfRequests int64 `json:"numberOfRequests"`
CostInUsd float64 `json:"costInUsd"`
LatencyInMs int `json:"latencyInMs"`
PromptTokenCount int `json:"promptTokenCount"`
CompletionTokenCount int `json:"completionTokenCount"`
SuccessCouunt int `json:"successCount"`
}

type ReportingResponse struct {
DataPoints []*DataPoint `json:"dataPoints"`
LatencyInMsMedian float64 `json:"latencyInMsMedian"`
LatencyInMs99th float64 `json:"latencyInMs99th"`
}

type ReportingRequest struct {
KeyIds []string `json:"keyIds"`
Tags []string `json:"tags"`
Start int64 `json:"start"`
End int64 `json:"end"`
Increment int64 `json:"increment"`
}
32 changes: 31 additions & 1 deletion internal/manager/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package manager

import (
"github.com/bricks-cloud/bricksllm/internal/errors"
"github.com/bricks-cloud/bricksllm/internal/event"
"github.com/bricks-cloud/bricksllm/internal/key"
)

Expand All @@ -13,16 +14,45 @@ type keyStorage interface {
GetKey(keyId string) (*key.ResponseKey, error)
}

type eventStorage interface {
GetEventDataPoints(start, end, increment int64, tags, keyIds []string) ([]*event.DataPoint, error)
GetLatencyPercentiles(start, end int64, tags, keyIds []string) ([]float64, error)
}

type ReportingManager struct {
es eventStorage
cs costStorage
ks keyStorage
}

func NewReportingManager(cs costStorage, ks keyStorage) *ReportingManager {
func NewReportingManager(cs costStorage, ks keyStorage, es eventStorage) *ReportingManager {
return &ReportingManager{
cs: cs,
ks: ks,
es: es,
}
}

func (rm *ReportingManager) GetEventReporting(e *event.ReportingRequest) (*event.ReportingResponse, error) {
dataPoints, err := rm.es.GetEventDataPoints(e.Start, e.End, e.Increment, e.Tags, e.KeyIds)
if err != nil {
return nil, err
}

percentiles, err := rm.es.GetLatencyPercentiles(e.Start, e.End, e.Tags, e.KeyIds)
if err != nil {
return nil, err
}

if len(percentiles) == 0 {
return nil, errors.NewNotFoundError("latency percentiles are not found")
}

return &event.ReportingResponse{
DataPoints: dataPoints,
LatencyInMsMedian: percentiles[0],
LatencyInMs99th: percentiles[1],
}, nil
}

func (rm *ReportingManager) GetKeyReporting(keyId string) (*key.KeyReporting, error) {
Expand Down
14 changes: 14 additions & 0 deletions internal/provider/openai/cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ func NewCostEstimator(m map[string]map[string]float64, tc tokenCounter) *CostEst
}
}

func (ce *CostEstimator) EstimateTotalCost(model string, promptTks, completionTks int) (float64, error) {
promptCost, err := ce.EstimatePromptCost(model, promptTks)
if err != nil {
return 0, err
}

completionCost, err := ce.EstimateCompletionCost(model, completionTks)
if err != nil {
return 0, err
}

return promptCost + completionCost, nil
}

func (ce *CostEstimator) EstimatePromptCost(model string, tks int) (float64, error) {
costMap, ok := ce.tokenCostMap["prompt"]
if !ok {
Expand Down
7 changes: 7 additions & 0 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package provider

type Provider string

const (
OpenAiProvider Provider = "openai"
)
42 changes: 14 additions & 28 deletions internal/recorder/recorder.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
package recorder

import (
"errors"
"strings"

"github.com/bricks-cloud/bricksllm/internal/event"
"github.com/bricks-cloud/bricksllm/internal/key"
)

type Recorder struct {
s Store
c Cache
ce CostEstimator
es EventsStore
}

type EventsStore interface {
InsertEvent(e *event.Event) error
}

type Store interface {
Expand All @@ -26,38 +29,17 @@ type CostEstimator interface {
EstimateCompletionCost(model string, tks int) (float64, error)
}

func NewRecorder(s Store, c Cache, ce CostEstimator) *Recorder {
func NewRecorder(s Store, c Cache, ce CostEstimator, es EventsStore) *Recorder {
return &Recorder{
s: s,
c: c,
ce: ce,
es: es,
}
}

func (r *Recorder) RecordKeySpend(keyId string, model string, promptTks int, completionTks int, costLimitUnit key.TimeUnit) error {
used := model
if strings.HasPrefix(model, "ft") {
split := strings.Split(model, ":")
if len(split) < 2 || len(split[1]) == 0 {
return errors.New("model can not be empty")
}

used = split[1]
}

promptCost, err := r.ce.EstimatePromptCost(used, promptTks)
if err != nil {
return err
}

completionCost, err := r.ce.EstimateCompletionCost(used, completionTks)
if err != nil {
return err
}

micros := (promptCost + completionCost) * 1000000

err = r.s.IncrementCounter(keyId, int64(micros))
func (r *Recorder) RecordKeySpend(keyId string, model string, micros int64, costLimitUnit key.TimeUnit) error {
err := r.s.IncrementCounter(keyId, micros)
if err != nil {
return err
}
Expand All @@ -71,3 +53,7 @@ func (r *Recorder) RecordKeySpend(keyId string, model string, promptTks int, com

return nil
}

func (r *Recorder) RecordEvent(e *event.Event) error {
return r.es.InsertEvent(e)
}
88 changes: 88 additions & 0 deletions internal/server/web/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package web
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"github.com/bricks-cloud/bricksllm/internal/event"
"github.com/bricks-cloud/bricksllm/internal/key"
"github.com/bricks-cloud/bricksllm/internal/util"
"github.com/gin-gonic/gin"
Expand All @@ -22,6 +24,7 @@ type KeyManager interface {

type KeyReportingManager interface {
GetKeyReporting(keyId string) (*key.KeyReporting, error)
GetEventReporting(e *event.ReportingRequest) (*event.ReportingResponse, error)
}

type ErrorResponse struct {
Expand Down Expand Up @@ -50,6 +53,7 @@ func NewAdminServer(log *zap.Logger, mode string, m KeyManager, krm KeyReporting
router.DELETE("/api/key-management/keys/:id", getDeleteKeyHandler(m, log, prod))

router.GET("/api/reporting/keys/:id", getGetKeyReportingHandler(krm, log, prod))
router.GET("/api/reporting/events", getGetEventMetrics(krm, log, prod))

srv := &http.Server{
Addr: ":8001",
Expand Down Expand Up @@ -349,6 +353,90 @@ type notFoundError interface {
NotFound()
}

func validateEventReportingRequest(r *event.ReportingRequest) bool {
if r.Start == 0 || r.End == 0 || r.Increment <= 0 {
return false
}

if r.Start >= r.End {
return false
}

return true
}

func getGetEventMetrics(m KeyReportingManager, log *zap.Logger, prod bool) gin.HandlerFunc {
return func(c *gin.Context) {
path := "/api/reporting/events"
if c == nil || c.Request == nil {
c.JSON(http.StatusInternalServerError, &ErrorResponse{
Type: "/errors/empty-context",
Title: "context is empty error",
Status: http.StatusInternalServerError,
Detail: "gin context is empty",
Instance: path,
})
return
}

cid := c.Param(correlationId)
data, err := io.ReadAll(c.Request.Body)
if err != nil {
logError(log, "error when reading event reporting request body", prod, cid, err)
c.JSON(http.StatusInternalServerError, &ErrorResponse{
Type: "/errors/request-body-read",
Title: "request body reader error",
Status: http.StatusInternalServerError,
Detail: err.Error(),
Instance: path,
})
return
}

request := &event.ReportingRequest{}
err = json.Unmarshal(data, request)
if err != nil {
logError(log, "error when unmarshalling event reporting request body", prod, cid, err)
c.JSON(http.StatusInternalServerError, &ErrorResponse{
Type: "/errors/json-unmarshal",
Title: "json unmarshaller error",
Status: http.StatusInternalServerError,
Detail: err.Error(),
Instance: path,
})
return
}

if !validateEventReportingRequest(request) {
err = fmt.Errorf("event reporting request %+v is not valid", request)
logError(log, "invalid reporting request", prod, cid, err)
c.JSON(http.StatusInternalServerError, &ErrorResponse{
Type: "/errors/invalid-reporting-request",
Title: "invalid reporting request",
Status: http.StatusBadRequest,
Detail: err.Error(),
Instance: path,
})
return
}

reportingResponse, err := m.GetEventReporting(request)
if err != nil {
logError(log, "error when getting event reporting", prod, cid, err)
c.JSON(http.StatusInternalServerError, &ErrorResponse{
Type: "/errors/event-reporting-manager",
Title: "event reporting error",
Status: http.StatusInternalServerError,
Detail: err.Error(),
Instance: path,
})
return
}

c.JSON(http.StatusOK, reportingResponse)
}
}

func getGetKeyReportingHandler(m KeyReportingManager, log *zap.Logger, prod bool) gin.HandlerFunc {
return func(c *gin.Context) {
path := "/api/reporting/keys/:id"
Expand Down
Loading

0 comments on commit 9776e30

Please sign in to comment.