Skip to content

Commit

Permalink
Ported some gRPC API calls to opaque API
Browse files Browse the repository at this point in the history
  • Loading branch information
cmaglie committed Dec 19, 2024
1 parent 4a06151 commit 64c52ce
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 174 deletions.
9 changes: 5 additions & 4 deletions commands/cmderrors/cmderrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/arduino/go-paths-helper"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

func composeErrorMsg(msg string, cause error) string {
Expand Down Expand Up @@ -322,10 +323,10 @@ func (ife *InitFailedError) Error() string {
func (ife *InitFailedError) GRPCStatus() *status.Status {
st, _ := status.
New(ife.Code, ife.Cause.Error()).
WithDetails(&rpc.FailedInstanceInitError{
Reason: ife.Reason,
Message: ife.Cause.Error(),
})
WithDetails(rpc.FailedInstanceInitError_builder{
Reason: &ife.Reason,
Message: proto.String(ife.Cause.Error()),
}.Build())
return st
}

Expand Down
122 changes: 55 additions & 67 deletions commands/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

func installTool(ctx context.Context, pm *packagemanager.PackageManager, tool *cores.ToolRelease, downloadCB rpc.DownloadProgressCB, taskCB rpc.TaskProgressCB) error {
pme, release := pm.NewExplorer()
defer release()

taskCB(&rpc.TaskProgress{Name: i18n.Tr("Downloading missing tool %s", tool)})
taskCB(rpc.TaskProgress_builder{Name: proto.String(i18n.Tr("Downloading missing tool %s", tool))}.Build())
if err := pme.DownloadToolRelease(ctx, tool, downloadCB); err != nil {
return errors.New(i18n.Tr("downloading %[1]s tool: %[2]s", tool, err))
}
taskCB(&rpc.TaskProgress{Completed: true})
taskCB(rpc.TaskProgress_builder{Completed: proto.Bool(true)}.Build())
if err := pme.InstallTool(tool, taskCB, true); err != nil {
return errors.New(i18n.Tr("installing %[1]s tool: %[2]s", tool, err))
}
Expand Down Expand Up @@ -97,7 +98,7 @@ func (s *arduinoCoreServerImpl) Create(ctx context.Context, req *rpc.CreateReque
if err != nil {
return nil, err
}
return &rpc.CreateResponse{Instance: inst}, nil
return rpc.CreateResponse_builder{Instance: inst}.Build(), nil
}

// InitStreamResponseToCallbackFunction returns a gRPC stream to be used in Init that sends
Expand Down Expand Up @@ -128,29 +129,23 @@ func (s *arduinoCoreServerImpl) Init(req *rpc.InitRequest, stream rpc.ArduinoCor
responseCallback = func(*rpc.InitResponse) error { return nil }
}
responseError := func(st *status.Status) {
responseCallback(&rpc.InitResponse{
Message: &rpc.InitResponse_Error{
Error: st.Proto(),
},
})
responseCallback(rpc.InitResponse_builder{
Error: st.Proto(),
}.Build())
}
taskCallback := func(msg *rpc.TaskProgress) {
responseCallback(&rpc.InitResponse{
Message: &rpc.InitResponse_InitProgress{
InitProgress: &rpc.InitResponse_Progress{
TaskProgress: msg,
},
},
})
responseCallback(rpc.InitResponse_builder{
InitProgress: rpc.InitResponse_Progress_builder{
TaskProgress: msg,
}.Build(),
}.Build())
}
downloadCallback := func(msg *rpc.DownloadProgress) {
responseCallback(&rpc.InitResponse{
Message: &rpc.InitResponse_InitProgress{
InitProgress: &rpc.InitResponse_Progress{
DownloadProgress: msg,
},
},
})
responseCallback(rpc.InitResponse_builder{
InitProgress: rpc.InitResponse_Progress_builder{
DownloadProgress: msg,
}.Build(),
}.Build())
}

// Try to extract profile if specified
Expand All @@ -165,11 +160,9 @@ func (s *arduinoCoreServerImpl) Init(req *rpc.InitRequest, stream rpc.ArduinoCor
return err
}
profile = p
responseCallback(&rpc.InitResponse{
Message: &rpc.InitResponse_Profile{
Profile: profile.ToRpc(),
},
})
responseCallback(rpc.InitResponse_builder{
Profile: profile.ToRpc(),
}.Build())
}

// Perform first-update of indexes if needed
Expand Down Expand Up @@ -369,38 +362,38 @@ func (s *arduinoCoreServerImpl) Init(req *rpc.InitRequest, stream rpc.ArduinoCor

if !libDir.IsDir() {
// Download library
taskCallback(&rpc.TaskProgress{Name: i18n.Tr("Downloading library %s", libraryRef)})
taskCallback(rpc.TaskProgress_builder{Name: proto.String(i18n.Tr("Downloading library %s", libraryRef))}.Build())
libRelease, err := li.FindRelease(libraryRef.Library, libraryRef.Version)
if err != nil {
taskCallback(&rpc.TaskProgress{Name: i18n.Tr("Library %s not found", libraryRef)})
taskCallback(rpc.TaskProgress_builder{Name: proto.String(i18n.Tr("Library %s not found", libraryRef))}.Build())
err := &cmderrors.LibraryNotFoundError{Library: libraryRef.Library}
responseError(err.GRPCStatus())
continue
}
config, err := s.settings.DownloaderConfig()
if err != nil {
taskCallback(&rpc.TaskProgress{Name: i18n.Tr("Error downloading library %s", libraryRef)})
taskCallback(rpc.TaskProgress_builder{Name: proto.String(i18n.Tr("Error downloading library %s", libraryRef))}.Build())
e := &cmderrors.FailedLibraryInstallError{Cause: err}
responseError(e.GRPCStatus())
continue
}
if err := libRelease.Resource.Download(ctx, pme.DownloadDir, config, libRelease.String(), downloadCallback, ""); err != nil {
taskCallback(&rpc.TaskProgress{Name: i18n.Tr("Error downloading library %s", libraryRef)})
taskCallback(rpc.TaskProgress_builder{Name: proto.String(i18n.Tr("Error downloading library %s", libraryRef))}.Build())
e := &cmderrors.FailedLibraryInstallError{Cause: err}
responseError(e.GRPCStatus())
continue
}
taskCallback(&rpc.TaskProgress{Completed: true})
taskCallback(rpc.TaskProgress_builder{Completed: proto.Bool(true)}.Build())

// Install library
taskCallback(&rpc.TaskProgress{Name: i18n.Tr("Installing library %s", libraryRef)})
taskCallback(rpc.TaskProgress_builder{Name: proto.String(i18n.Tr("Installing library %s", libraryRef))}.Build())
if err := libRelease.Resource.Install(pme.DownloadDir, libRoot, libDir); err != nil {
taskCallback(&rpc.TaskProgress{Name: i18n.Tr("Error installing library %s", libraryRef)})
taskCallback(rpc.TaskProgress_builder{Name: proto.String(i18n.Tr("Error installing library %s", libraryRef))}.Build())
e := &cmderrors.FailedLibraryInstallError{Cause: err}
responseError(e.GRPCStatus())
continue
}
taskCallback(&rpc.TaskProgress{Completed: true})
taskCallback(rpc.TaskProgress_builder{Completed: proto.Bool(true)}.Build())
}

lmb.AddLibrariesDir(librariesmanager.LibrariesDir{
Expand Down Expand Up @@ -456,8 +449,7 @@ func UpdateLibrariesIndexStreamResponseToCallbackFunction(ctx context.Context, d
func (s *arduinoCoreServerImpl) UpdateLibrariesIndex(req *rpc.UpdateLibrariesIndexRequest, stream rpc.ArduinoCoreService_UpdateLibrariesIndexServer) error {
syncSend := NewSynchronizedSend(stream.Send)
downloadCB := func(p *rpc.DownloadProgress) {
syncSend.Send(&rpc.UpdateLibrariesIndexResponse{
Message: &rpc.UpdateLibrariesIndexResponse_DownloadProgress{DownloadProgress: p}})
syncSend.Send(rpc.UpdateLibrariesIndexResponse_builder{DownloadProgress: p}.Build())
}

pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
Expand All @@ -469,16 +461,14 @@ func (s *arduinoCoreServerImpl) UpdateLibrariesIndex(req *rpc.UpdateLibrariesInd
index := globals.LibrariesIndexResource

resultCB := func(status rpc.IndexUpdateReport_Status) {
syncSend.Send(&rpc.UpdateLibrariesIndexResponse{
Message: &rpc.UpdateLibrariesIndexResponse_Result_{
Result: &rpc.UpdateLibrariesIndexResponse_Result{
LibrariesIndex: &rpc.IndexUpdateReport{
IndexUrl: index.URL.String(),
Status: status,
},
},
},
})
syncSend.Send(rpc.UpdateLibrariesIndexResponse_builder{
Result: rpc.UpdateLibrariesIndexResponse_Result_builder{
LibrariesIndex: rpc.IndexUpdateReport_builder{
IndexUrl: proto.String(index.URL.String()),
Status: &status,
}.Build(),
}.Build(),
}.Build())
}

// Create the index directory if it doesn't exist
Expand Down Expand Up @@ -535,17 +525,15 @@ func (s *arduinoCoreServerImpl) UpdateIndex(req *rpc.UpdateIndexRequest, stream
}

report := func(indexURL *url.URL, status rpc.IndexUpdateReport_Status) *rpc.IndexUpdateReport {
return &rpc.IndexUpdateReport{
IndexUrl: indexURL.String(),
Status: status,
}
return rpc.IndexUpdateReport_builder{
IndexUrl: proto.String(indexURL.String()),
Status: &status,
}.Build()
}

syncSend := NewSynchronizedSend(stream.Send)
var downloadCB rpc.DownloadProgressCB = func(p *rpc.DownloadProgress) {
syncSend.Send(&rpc.UpdateIndexResponse{
Message: &rpc.UpdateIndexResponse_DownloadProgress{DownloadProgress: p},
})
syncSend.Send(rpc.UpdateIndexResponse_builder{DownloadProgress: p}.Build())
}
indexpath := s.settings.DataDir()

Expand All @@ -555,7 +543,7 @@ func (s *arduinoCoreServerImpl) UpdateIndex(req *rpc.UpdateIndexRequest, stream
}

failed := false
result := &rpc.UpdateIndexResponse_Result{}
result := rpc.UpdateIndexResponse_Result_builder{}
for _, u := range urls {
URL, err := url.Parse(u)
if err != nil {
Expand All @@ -564,7 +552,7 @@ func (s *arduinoCoreServerImpl) UpdateIndex(req *rpc.UpdateIndexRequest, stream
downloadCB.Start(u, i18n.Tr("Downloading index: %s", u))
downloadCB.End(false, msg)
failed = true
result.UpdatedIndexes = append(result.GetUpdatedIndexes(), report(URL, rpc.IndexUpdateReport_STATUS_FAILED))
result.UpdatedIndexes = append(result.UpdatedIndexes, report(URL, rpc.IndexUpdateReport_STATUS_FAILED))
continue
}

Expand All @@ -582,9 +570,9 @@ func (s *arduinoCoreServerImpl) UpdateIndex(req *rpc.UpdateIndexRequest, stream
downloadCB.Start(u, i18n.Tr("Downloading index: %s", filepath.Base(URL.Path)))
downloadCB.End(false, msg)
failed = true
result.UpdatedIndexes = append(result.GetUpdatedIndexes(), report(URL, rpc.IndexUpdateReport_STATUS_FAILED))
result.UpdatedIndexes = append(result.UpdatedIndexes, report(URL, rpc.IndexUpdateReport_STATUS_FAILED))
} else {
result.UpdatedIndexes = append(result.GetUpdatedIndexes(), report(URL, rpc.IndexUpdateReport_STATUS_SKIPPED))
result.UpdatedIndexes = append(result.UpdatedIndexes, report(URL, rpc.IndexUpdateReport_STATUS_SKIPPED))
}
continue
}
Expand All @@ -596,14 +584,14 @@ func (s *arduinoCoreServerImpl) UpdateIndex(req *rpc.UpdateIndexRequest, stream
downloadCB.Start(u, i18n.Tr("Downloading index: %s", filepath.Base(URL.Path)))
downloadCB.End(false, i18n.Tr("Invalid index URL: %s", err))
failed = true
result.UpdatedIndexes = append(result.GetUpdatedIndexes(), report(URL, rpc.IndexUpdateReport_STATUS_FAILED))
result.UpdatedIndexes = append(result.UpdatedIndexes, report(URL, rpc.IndexUpdateReport_STATUS_FAILED))
continue
}
indexFile := indexpath.Join(indexFileName)
if info, err := indexFile.Stat(); err == nil {
ageSecs := int64(time.Since(info.ModTime()).Seconds())
if ageSecs < req.GetUpdateIfOlderThanSecs() {
result.UpdatedIndexes = append(result.GetUpdatedIndexes(), report(URL, rpc.IndexUpdateReport_STATUS_ALREADY_UP_TO_DATE))
result.UpdatedIndexes = append(result.UpdatedIndexes, report(URL, rpc.IndexUpdateReport_STATUS_ALREADY_UP_TO_DATE))
continue
}
}
Expand All @@ -622,14 +610,14 @@ func (s *arduinoCoreServerImpl) UpdateIndex(req *rpc.UpdateIndexRequest, stream
}
if err := indexResource.Download(stream.Context(), indexpath, downloadCB, config); err != nil {
failed = true
result.UpdatedIndexes = append(result.GetUpdatedIndexes(), report(URL, rpc.IndexUpdateReport_STATUS_FAILED))
result.UpdatedIndexes = append(result.UpdatedIndexes, report(URL, rpc.IndexUpdateReport_STATUS_FAILED))
} else {
result.UpdatedIndexes = append(result.GetUpdatedIndexes(), report(URL, rpc.IndexUpdateReport_STATUS_UPDATED))
result.UpdatedIndexes = append(result.UpdatedIndexes, report(URL, rpc.IndexUpdateReport_STATUS_UPDATED))
}
}
syncSend.Send(&rpc.UpdateIndexResponse{
Message: &rpc.UpdateIndexResponse_Result_{Result: result},
})
syncSend.Send(rpc.UpdateIndexResponse_builder{
Result: result.Build(),
}.Build())
if failed {
return &cmderrors.FailedDownloadError{Message: i18n.Tr("Some indexes could not be updated.")}
}
Expand All @@ -644,7 +632,7 @@ func firstUpdate(ctx context.Context, srv rpc.ArduinoCoreServiceServer, instance
if libraryIndex.NotExist() {
// The library_index.json file doesn't exists, that means the CLI is run for the first time
// so we proceed with the first update that downloads the file
req := &rpc.UpdateLibrariesIndexRequest{Instance: instance}
req := rpc.UpdateLibrariesIndexRequest_builder{Instance: instance}.Build()
stream, _ := UpdateLibrariesIndexStreamResponseToCallbackFunction(ctx, downloadCb)
if err := srv.UpdateLibrariesIndex(req, stream); err != nil {
return err
Expand All @@ -667,7 +655,7 @@ func firstUpdate(ctx context.Context, srv rpc.ArduinoCoreServiceServer, instance
// or the 3rd party package index URL has just been added. Similarly to the
// library update we download that file and all the other package indexes from
// additional_urls
req := &rpc.UpdateIndexRequest{Instance: instance}
req := rpc.UpdateIndexRequest_builder{Instance: instance}.Build()
stream, _ := UpdateIndexStreamResponseToCallbackFunction(ctx, downloadCb)
if err := srv.UpdateIndex(req, stream); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion commands/internal/instances/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func Create(dataDir, packagesDir, userPackagesDir, downloadsDir *paths.Path, ext
instancesCount++
instancesMux.Unlock()

return &rpc.Instance{Id: id}, nil
return rpc.Instance_builder{Id: &id}.Build(), nil
}

// IsValid returns true if the given instance is valid.
Expand Down
17 changes: 9 additions & 8 deletions commands/service_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/djherbis/nio/v3"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)

type monitorPipeServer struct {
Expand All @@ -55,14 +56,14 @@ func (s *monitorPipeServer) Send(resp *rpc.MonitorResponse) error {

func (s *monitorPipeServer) Recv() (r *rpc.MonitorRequest, e error) {
if conf := s.req.Swap(nil); conf != nil {
return &rpc.MonitorRequest{Message: &rpc.MonitorRequest_OpenRequest{OpenRequest: conf}}, nil
return rpc.MonitorRequest_builder{OpenRequest: conf}.Build(), nil
}
buff := make([]byte, 4096)
n, err := s.in.Read(buff)
if err != nil {
return nil, err
}
return &rpc.MonitorRequest{Message: &rpc.MonitorRequest_TxData{TxData: buff[:n]}}, nil
return rpc.MonitorRequest_builder{TxData: buff[:n]}.Build(), nil
}

func (s *monitorPipeServer) Context() context.Context {
Expand Down Expand Up @@ -162,7 +163,7 @@ func (s *arduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer

// Send a message with Success set to true to notify the caller of the port being now active
syncSend := NewSynchronizedSend(stream.Send)
_ = syncSend.Send(&rpc.MonitorResponse{Message: &rpc.MonitorResponse_Success{Success: true}})
_ = syncSend.Send(rpc.MonitorResponse_builder{Success: proto.Bool(true)}.Build())

ctx, cancel := context.WithCancel(stream.Context())
gracefulCloseInitiated := &atomic.Bool{}
Expand All @@ -177,13 +178,13 @@ func (s *arduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
return
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Message: &rpc.MonitorResponse_Error{Error: err.Error()}})
syncSend.Send(rpc.MonitorResponse_builder{Error: proto.String(err.Error())}.Build())
return
}
if conf := msg.GetUpdatedConfiguration(); conf != nil {
for _, c := range conf.GetSettings() {
if err := monitor.Configure(c.GetSettingId(), c.GetValue()); err != nil {
syncSend.Send(&rpc.MonitorResponse{Message: &rpc.MonitorResponse_Error{Error: err.Error()}})
syncSend.Send(rpc.MonitorResponse_builder{Error: proto.String(err.Error())}.Build())
}
}
}
Expand All @@ -201,7 +202,7 @@ func (s *arduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
return
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Message: &rpc.MonitorResponse_Error{Error: err.Error()}})
syncSend.Send(rpc.MonitorResponse_builder{Error: proto.String(err.Error())}.Build())
return
}
tx = tx[n:]
Expand All @@ -219,10 +220,10 @@ func (s *arduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
break
}
if err != nil {
syncSend.Send(&rpc.MonitorResponse{Message: &rpc.MonitorResponse_Error{Error: err.Error()}})
syncSend.Send(rpc.MonitorResponse_builder{Error: proto.String(err.Error())}.Build())
break
}
if err := syncSend.Send(&rpc.MonitorResponse{Message: &rpc.MonitorResponse_RxData{RxData: buff[:n]}}); err != nil {
if err := syncSend.Send(rpc.MonitorResponse_builder{RxData: buff[:n]}.Build()); err != nil {
break
}
}
Expand Down
Loading

0 comments on commit 64c52ce

Please sign in to comment.