Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

step 1: reuse the read for now #111

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -19203,7 +19203,7 @@
},
"limits": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.MonoVertexLimits",
"description": "Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, will override pipeline level settings"
"description": "Limits define the limitations such as read batch size for the mono vertex."
},
"metadata": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Metadata",
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -19198,7 +19198,7 @@
}
},
"limits": {
"description": "Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, will override pipeline level settings",
"description": "Limits define the limitations such as read batch size for the mono vertex.",
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.MonoVertexLimits"
},
"metadata": {
Expand Down
8 changes: 4 additions & 4 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -5720,8 +5720,8 @@ MonoVertexLimits </a> </em>
<em>(Optional)</em>
<p>

Limits define the limitations such as buffer read batch size for all the
vertices of a pipeline, will override pipeline level settings
Limits define the limitations such as read batch size for the mono
vertex.
</p>

</td>
Expand Down Expand Up @@ -6109,8 +6109,8 @@ MonoVertexLimits </a> </em>
<em>(Optional)</em>
<p>

Limits define the limitations such as buffer read batch size for all the
vertices of a pipeline, will override pipeline level settings
Limits define the limitations such as read batch size for the mono
vertex.
</p>

</td>
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/mono_vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ type MonoVertexSpec struct {
// +patchStrategy=merge
// +patchMergeKey=name
Volumes []corev1.Volume `json:"volumes,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,6,rep,name=volumes"`
// Limits define the limitations such as buffer read batch size for all the vertices of a pipeline, will override pipeline level settings
// Limits define the limitations such as read batch size for the mono vertex.
// +optional
Limits *MonoVertexLimits `json:"limits,omitempty" protobuf:"bytes,7,opt,name=limits"`
// Settings for autoscaling
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions pkg/sdkclient/batchmapper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ import (
"errors"
"io"

batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"
"github.com/numaproj/numaflow-go/pkg/info"

"github.com/numaproj/numaflow/pkg/sdkclient"
sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error"
grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// client contains the grpc connection and the grpc client.
Expand All @@ -39,7 +38,7 @@ type client struct {
}

// New creates a new client object.
func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
var opts = sdkclient.DefaultOptions(sdkclient.BatchMapAddr)

for _, inputOption := range inputOptions {
Expand Down
7 changes: 3 additions & 4 deletions pkg/sdkclient/grpc/grpc_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@ import (
"log"
"strconv"

"github.com/numaproj/numaflow-go/pkg/info"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

resolver "github.com/numaproj/numaflow/pkg/sdkclient/grpc_resolver"
sdkserverinfo "github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// ConnectToServer connects to the server with the given socket address based on the server info protocol.
func ConnectToServer(udsSockAddr string, serverInfo *info.ServerInfo, maxMessageSize int) (*grpc.ClientConn, error) {
func ConnectToServer(udsSockAddr string, serverInfo *serverinfo.ServerInfo, maxMessageSize int) (*grpc.ClientConn, error) {
var conn *grpc.ClientConn
var err error
var sockAddr string

// Check if Multiproc server mode is enabled
if multiProcServer, ok := serverInfo.Metadata[sdkserverinfo.MultiProcMetadata]; ok {
if multiProcServer, ok := serverInfo.Metadata[serverinfo.MultiProcKey]; ok {
// Extract the server ports from the server info file
numServers, _ := strconv.Atoi(multiProcServer)
// In Multiprocessing server mode we have multiple servers forks
Expand Down
5 changes: 3 additions & 2 deletions pkg/sdkclient/mapper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
"google.golang.org/protobuf/types/known/emptypb"

mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
"github.com/numaproj/numaflow-go/pkg/info"

"github.com/numaproj/numaflow/pkg/sdkclient"
sdkerror "github.com/numaproj/numaflow/pkg/sdkclient/error"
grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// client contains the grpc connection and the grpc client.
Expand All @@ -36,7 +37,7 @@ type client struct {
}

// New creates a new client object.
func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
var opts = sdkclient.DefaultOptions(sdkclient.MapAddr)

for _, inputOption := range inputOptions {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sdkclient/mapstreamer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"fmt"
"io"

mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow/pkg/sdkclient"
sdkerror "github.com/numaproj/numaflow/pkg/sdkclient/error"
grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// client contains the grpc connection and the grpc client.
Expand All @@ -38,7 +38,7 @@ type client struct {
}

// New creates a new client object.
func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
var opts = sdkclient.DefaultOptions(sdkclient.MapStreamAddr)

for _, inputOption := range inputOptions {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sdkclient/reducer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
"errors"
"io"

reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"

reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow/pkg/sdkclient"
sdkerr "github.com/numaproj/numaflow/pkg/sdkclient/error"
grpcutil "github.com/numaproj/numaflow/pkg/sdkclient/grpc"
"github.com/numaproj/numaflow/pkg/sdkclient/serverinfo"
)

// client contains the grpc connection and the grpc client.
Expand All @@ -38,7 +38,7 @@ type client struct {
}

// New creates a new client object.
func New(serverInfo *info.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
func New(serverInfo *serverinfo.ServerInfo, inputOptions ...sdkclient.Option) (Client, error) {
var opts = sdkclient.DefaultOptions(sdkclient.ReduceAddr)

for _, inputOption := range inputOptions {
Expand Down
103 changes: 67 additions & 36 deletions pkg/sdkclient/serverinfo/serverinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,23 @@ package serverinfo

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"

"github.com/Masterminds/semver/v3"
pep440 "github.com/aquasecurity/go-pep440-version"

"github.com/numaproj/numaflow-go/pkg/info"

"github.com/numaproj/numaflow"
)

// Metadata keys used in the server info file
const (
// MultiProcMetadata is the field used to indicate that MultiProc map mode is enabled
// The value contains the number of servers spawned.
MultiProcMetadata = "MULTIPROC"
// MapModeMetadata field is used to indicate which map mode is enabled
// If none is set, we consider unary map as default
MapModeMetadata = "MAP_MODE"
)

type MapMode string

const (
UnaryMap MapMode = "unary-map"
StreamMap MapMode = "stream-map"
BatchMap MapMode = "batch-map"
)
var END = fmt.Sprintf("%U__END__", '\\')

// SDKServerInfo wait for the server to start and return the server info.
func SDKServerInfo(inputOptions ...Option) (*info.ServerInfo, error) {
func SDKServerInfo(inputOptions ...Option) (*ServerInfo, error) {
var opts = DefaultOptions()

for _, inputOption := range inputOptions {
Expand All @@ -68,33 +52,32 @@ func SDKServerInfo(inputOptions ...Option) (*info.ServerInfo, error) {
}

// waitForServerInfo waits until the server info is ready. It returns an error if the server info is not ready within the given timeout
func waitForServerInfo(timeout time.Duration, filePath string) (*info.ServerInfo, error) {
func waitForServerInfo(timeout time.Duration, filePath string) (*ServerInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

if err := info.WaitUntilReady(ctx, info.WithServerInfoFilePath(filePath)); err != nil {
if err := waitUntilReady(ctx, WithServerInfoFilePath(filePath)); err != nil {
return nil, fmt.Errorf("failed to wait until server info is ready: %w", err)
}

serverInfo, err := info.Read(info.WithServerInfoFilePath(filePath))
serverInfo, err := read(WithServerInfoFilePath(filePath))
if err != nil {
return nil, fmt.Errorf("failed to read server info: %w", err)
}

sdkVersion := serverInfo.Version
minNumaflowVersion := serverInfo.MinimumNumaflowVersion
sdkLanguage := serverInfo.Language
numaflowVersion := numaflow.GetVersion().Version

// If MinimumNumaflowVersion is empty, skip the numaflow compatibility check as there was an
// error writing server info on the SDK side
// error writing server info file on the SDK side
if minNumaflowVersion == "" {
log.Printf("warning: failed to get the minimum numaflow version, skipping numaflow version compatibility check")
// If we are testing locally or in CI, we can skip checking for numaflow compatibility issues
// because both return us a version string that the version check libraries can't properly parse (local: "*latest*" CI: commit SHA)
// because both return us a version string that the version-check libraries can't properly parse,
// local: "*latest*", CI: commit SHA
} else if !strings.Contains(numaflowVersion, "latest") && !strings.Contains(numaflowVersion, numaflow.GetVersion().GitCommit) {
if err := checkNumaflowCompatibility(numaflowVersion, minNumaflowVersion); err != nil {
return nil, fmt.Errorf("numaflow %s does not satisfy the minimum required by SDK %s: %w",
return nil, fmt.Errorf("numaflow version %s does not satisfy the minimum required by SDK version %s: %w",
numaflowVersion, sdkVersion, err)
}
}
Expand All @@ -105,14 +88,66 @@ func waitForServerInfo(timeout time.Duration, filePath string) (*info.ServerInfo
log.Printf("warning: failed to get the SDK version/language, skipping SDK version compatibility check")
} else {
if err := checkSDKCompatibility(sdkVersion, sdkLanguage, minimumSupportedSDKVersions); err != nil {
return nil, fmt.Errorf("SDK %s does not satisfy the minimum required by numaflow %s: %w",
return nil, fmt.Errorf("SDK version %s does not satisfy the minimum required by numaflow version %s: %w",
sdkVersion, numaflowVersion, err)
}
}

return serverInfo, nil
}

// waitUntilReady waits until the server info is ready
func waitUntilReady(ctx context.Context, opts ...Option) error {
options := DefaultOptions()
for _, opt := range opts {
opt(options)
}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if fileInfo, err := os.Stat(options.serverInfoFilePath); err != nil {
log.Printf("Server info file %s is not ready...", options.serverInfoFilePath)
time.Sleep(1 * time.Second)
continue
} else {
if fileInfo.Size() > 0 {
return nil
}
}
}
}
}

// read reads the server info from a file
func read(opts ...Option) (*ServerInfo, error) {
options := DefaultOptions()
for _, opt := range opts {
opt(options)
}
// It takes some time for the server to write the server info file
// TODO: use a better way to wait for the file to be ready
retry := 0
b, err := os.ReadFile(options.serverInfoFilePath)
for !strings.HasSuffix(string(b), END) && err == nil && retry < 10 {
time.Sleep(100 * time.Millisecond)
b, err = os.ReadFile(options.serverInfoFilePath)
retry++
}
if err != nil {
return nil, err
}
if !strings.HasSuffix(string(b), END) {
return nil, fmt.Errorf("server info file is not ready")
}
b = b[:len(b)-len([]byte(END))]
info := &ServerInfo{}
if err := json.Unmarshal(b, info); err != nil {
return nil, fmt.Errorf("failed to unmarshal server info: %w", err)
}
return info, nil
}

func checkConstraint(version *semver.Version, constraint string) error {
if c, err := semver.NewConstraint(constraint); err != nil {
return fmt.Errorf("error parsing constraint: %w, constraint string: %s", err, constraint)
Expand All @@ -128,26 +163,23 @@ func checkNumaflowCompatibility(numaflowVersion string, minNumaflowVersion strin
if minNumaflowVersion == "" {
return fmt.Errorf("server info does not contain minimum numaflow version. Upgrade to newer SDK version")
}

numaflowVersionSemVer, err := semver.NewVersion(numaflowVersion)
if err != nil {
return fmt.Errorf("error parsing numaflow version: %w", err)
}

numaflowConstraint := fmt.Sprintf(">= %s", minNumaflowVersion)
if err = checkConstraint(numaflowVersionSemVer, numaflowConstraint); err != nil {
return fmt.Errorf("numaflow version %s must be upgraded to at least %s, in order to work with current SDK version: %w",
numaflowVersionSemVer.String(), minNumaflowVersion, err)
}

return nil
}

// checkSDKCompatibility checks if the current SDK version is compatible with the numaflow version
func checkSDKCompatibility(sdkVersion string, sdkLanguage info.Language, minSupportedSDKVersions sdkConstraints) error {
func checkSDKCompatibility(sdkVersion string, sdkLanguage Language, minSupportedSDKVersions sdkConstraints) error {
if sdkRequiredVersion, ok := minSupportedSDKVersions[sdkLanguage]; ok {
sdkConstraint := fmt.Sprintf(">= %s", sdkRequiredVersion)
if sdkLanguage == info.Python {
if sdkLanguage == Python {
// Python pre-releases/releases follow PEP440 specification which requires a different library for parsing
sdkVersionPEP440, err := pep440.Parse(sdkVersion)
if err != nil {
Expand Down Expand Up @@ -175,6 +207,5 @@ func checkSDKCompatibility(sdkVersion string, sdkLanguage info.Language, minSupp
}
}
}

return nil
}
Loading
Loading