-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Messaging integration (GCP PubSub, AWS SQS, Kafka, etc) #88
Conversation
I was thinking to keep it simple initially it might be easier to listen to the topic and do a curl to localhost? That way we can re-use all the existing logic instead of implementing it twice. We can optimize later. |
We need a way of determining how many concurrent messages to process and when to stop pulling new messages of off the subscription. The easiest way is to invoke the functions directly b/c thats where we have the info. The alternative would be to always process a given number of requests concurrently and wait for the proxy handler to return. I see that as being harder to debug and also it adds another layer of concurrency settings. |
NOTE: Currently the code is expecting a request message that looks like the following which is slightly different from the issue's description. {
# Standard OpenAI fields
"model": "...",
"prompt": "What is the ...",
# Lingo-specific subscriber fields
"path": "/v1/completions",
"metadata": {
"optional-key": "optional-val"
}
} I am back-n-forth on whether we should nest all of the OpenAI fields under I am planning on making the update to nest under |
A few changes are still needed:
|
I do think it's the best longer term approach so we can have more control over queueing. It's important that we only ack messages that have their response sent back to a pubsub topic. So we would still have to wait. I think there might be a timeout from pubsub by when it needa an ack |
Appears to be a race somewhere in the integration tests (only fails sometimes):
|
Most recent commit appears to have fixed the race condition in the integration tests - running a lot of back-to-back tests to make sure now. |
Ready for testing on GCP. A very rudimentary test shows a request and response. Running the controller: MESSENGER_URLS='gcppubsub://projects/my-project/subscriptions/lingo-requests-sub|gcppubsub://projects/my-project/topics/lingo-responses' go run ./cmd/lingo/main.go Sending a request: $ gcloud pubsub topics publish lingo-requests \
--message='{"path":"/v1/completions", "metadata":{"a":"b"}, "body": {"model": "mdl-1"}}'
messageIds:
- '10824071783903012' I get a response: $ gcloud pubsub subscriptions pull lingo-responses-sub --auto-ack
┌────────────────────────────────────────────────────────────────────────────────────────────────────────────┬───────────────────┬──────────────┬──────────────────────────────────────┬──────────────────┬────────────┐
│ DATA │ MESSAGE_ID │ ORDERING_KEY │ ATTRIBUTES │ DELIVERY_ATTEMPT │ ACK_STATUS │
├────────────────────────────────────────────────────────────────────────────────────────────────────────────┼───────────────────┼──────────────┼──────────────────────────────────────┼──────────────────┼────────────┤
│ {"metadata":{"a":"b"},"status_code":404,"body":{"error":{"message":"backend not found for model: mdl-1"}}} │ 10824059966496759 │ │ request_message_id=10824071783903012 │ │ SUCCESS │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────┴───────────────────┴─────────── |
resp.Ack() | ||
|
||
require.JSONEq(t, fmt.Sprintf(` | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: error format - .body
should match OpenAI's errors.
|
||
// Slow down a bit to avoid churning through messages and running | ||
// up cloud costs when no meaningful work is being done. | ||
if consecutiveErrors := m.getConsecutiveErrors(); consecutiveErrors > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is also a risk that occasionally there is a short spike of errors that would slow things down. I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, we will probably need to tune this over time. I think its a good thing to slow stuff down when errors start building up. Right now the wait time will go back to zero once a single message is processed successfully.
I added this delay to account for a few cases:
- Spontaneous failures that might creep up overnight.
- Some job sending a million malformed requests into a topic and lingo churning through them racking up GPU and PubSub costs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment containing these thoughts
cmd/lingo/main.go
Outdated
// | ||
// URL Examples: | ||
// | ||
// Google PubSub: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to see an example in this format
"gcppubsub://projects/my-project/subscriptions/my-subscription|gcppubsub://projects/myproject/topics/mytopic"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Very nice! I think we should get this merged in as an experimental MVP and iterate over it. I fixed the docker build and e2e tests by upgrading our Docker image to golang 2.22. I've also verified this works in a GCP environment that has mistral and mixtral deployed. |
One thing I don't get is how do we limit the maximum amount of concurrent open requests? Seems right now there is no way to set such limit? |
|
||
log.Printf("Entering queue: %s", msg.LoggableID) | ||
|
||
complete := m.Queues.EnqueueAndWait(ctx, backendDeployment, msg.LoggableID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should block the entire receive loop.
The call to |
Added Issue to track retry functionality: #89 I am good to merge as-is. |
Add messaging integration (consume requests and produce responses via a messaging system).
Implemented via gocloud package to allow for future cross-cloud support.
Also refactors configuration to use environment variables exclusively.
Fixes #86