Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add upload timings #1719

Open
wants to merge 3 commits into
base: sprint-1.19
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions zboxcore/sdk/chunked_upload_process_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
)

var (
hasherMap map[string]workerProcess
hasherMap map[string]workerProcess
hasherMapMu sync.Mutex
)

type workerProcess struct {
Expand Down Expand Up @@ -382,6 +383,16 @@ func (su *ChunkedUpload) processWebWorkerUpload(data *safejs.Value, blobber *Chu
su.updateChunkProgress(chunkEndIndex, pos)
finalRequestObject, _ := data.Get("isFinal")
finalRequest, _ := finalRequestObject.Bool()
totalSizeObject, _ := data.Get("totalSize")
totalSize, _ := totalSizeObject.Int()
timingObject, _ := data.Get("timing")
timing, _ := timingObject.Int()
logEvent := logEntry{
OpType: "upload",
DataSize: totalSize,
TimeTaken: int64(timing),
}
writeLogEntry(blobber.blobber.Baseurl, logEvent)
if finalRequest {
finalResult, err := data.Get("finalResult")
if err != nil {
Expand Down Expand Up @@ -442,9 +453,10 @@ func ProcessEventData(data safejs.Value) {
remotePath = fileMeta.RemotePath
}
if err != nil {
selfPostMessage(false, false, err.Error(), remotePath, 0, nil)
selfPostMessage(false, false, err.Error(), remotePath, 0, 0, 0, nil)
return
}
hasherMapMu.Lock()
wp, ok := hasherMap[fileMeta.RemotePath]
if !ok {
wp = workerProcess{
Expand All @@ -456,15 +468,20 @@ func ProcessEventData(data safejs.Value) {
wp.hasher = CreateHasher(formInfo.ShardSize)
hasherMap[fileMeta.RemotePath] = wp
}
hasherMapMu.Unlock()
if formInfo.IsFinal {
defer delete(hasherMap, fileMeta.RemotePath)
defer func() {
hasherMapMu.Lock()
delete(hasherMap, fileMeta.RemotePath)
hasherMapMu.Unlock()
}()
}
blobberID := os.Getenv("BLOBBER_ID")
formBuilder := CreateChunkedUploadFormBuilder(formInfo.StorageVersion, formInfo.EncryptionVersion, formInfo.PrivateSigningKey)
uploadData, err := formBuilder.Build(fileMeta, wp.hasher, formInfo.ConnectionID, blobberID, formInfo.ChunkSize, formInfo.ChunkStartIndex, formInfo.ChunkEndIndex, formInfo.IsFinal, formInfo.EncryptedKey, formInfo.EncryptedKeyPoint,
fileShards, thumbnailChunkData, formInfo.ShardSize)
if err != nil {
selfPostMessage(false, false, err.Error(), remotePath, formInfo.ChunkEndIndex, nil)
selfPostMessage(false, false, err.Error(), remotePath, formInfo.ChunkEndIndex, 0, 0, nil)
return
}
if formInfo.OnlyHash {
Expand All @@ -474,27 +491,35 @@ func ProcessEventData(data safejs.Value) {
ValidationRoot: uploadData.formData.ValidationRoot,
ThumbnailContentHash: uploadData.formData.ThumbnailContentHash,
}
selfPostMessage(true, true, "", remotePath, formInfo.ChunkEndIndex, finalResult)
selfPostMessage(true, true, "", remotePath, formInfo.ChunkEndIndex, 0, 0, finalResult)
} else {
selfPostMessage(true, false, "", remotePath, formInfo.ChunkEndIndex, nil)
selfPostMessage(true, false, "", remotePath, formInfo.ChunkEndIndex, 0, 0, nil)
}
return
}
blobberURL := os.Getenv("BLOBBER_URL")
if !formInfo.IsFinal {
wp.wg.Add(1)
}

go func(blobberData blobberData, remotePath string, wg *sync.WaitGroup) {
var (
totalSize int
timing int64
)
for _, dataBuffer := range blobberData.dataBuffers {
totalSize += dataBuffer.Len()
}
if formInfo.IsFinal && len(blobberData.dataBuffers) > 1 {
err = sendUploadRequest(blobberData.dataBuffers[:len(blobberData.dataBuffers)-1], blobberData.contentSlice[:len(blobberData.contentSlice)-1], blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod, formInfo.ClientId)
timing, err = sendUploadRequest(blobberData.dataBuffers[:len(blobberData.dataBuffers)-1], blobberData.contentSlice[:len(blobberData.contentSlice)-1], blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod, formInfo.ClientId)
if err != nil {
selfPostMessage(false, true, err.Error(), remotePath, formInfo.ChunkEndIndex, nil)
selfPostMessage(false, true, err.Error(), remotePath, formInfo.ChunkEndIndex, totalSize, timing, nil)
return
}
wg.Wait()
err = sendUploadRequest(blobberData.dataBuffers[len(blobberData.dataBuffers)-1:], blobberData.contentSlice[len(blobberData.contentSlice)-1:], blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod, formInfo.ClientId)
timing, err = sendUploadRequest(blobberData.dataBuffers[len(blobberData.dataBuffers)-1:], blobberData.contentSlice[len(blobberData.contentSlice)-1:], blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod, formInfo.ClientId)
if err != nil {
selfPostMessage(false, true, err.Error(), remotePath, formInfo.ChunkEndIndex, nil)
selfPostMessage(false, true, err.Error(), remotePath, formInfo.ChunkEndIndex, totalSize, timing, nil)
return
}
} else {
Expand All @@ -503,9 +528,9 @@ func ProcessEventData(data safejs.Value) {
} else {
defer wg.Done()
}
err = sendUploadRequest(blobberData.dataBuffers, blobberData.contentSlice, blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod, formInfo.ClientId)
timing, err = sendUploadRequest(blobberData.dataBuffers, blobberData.contentSlice, blobberURL, formInfo.AllocationID, formInfo.AllocationTx, formInfo.HttpMethod, formInfo.ClientId)
if err != nil {
selfPostMessage(false, formInfo.IsFinal, err.Error(), remotePath, formInfo.ChunkEndIndex, nil)
selfPostMessage(false, formInfo.IsFinal, err.Error(), remotePath, formInfo.ChunkEndIndex, totalSize, timing, nil)
return
}
}
Expand All @@ -515,9 +540,9 @@ func ProcessEventData(data safejs.Value) {
ValidationRoot: blobberData.formData.ValidationRoot,
ThumbnailContentHash: blobberData.formData.ThumbnailContentHash,
}
selfPostMessage(true, true, "", remotePath, formInfo.ChunkEndIndex, finalResult)
selfPostMessage(true, true, "", remotePath, formInfo.ChunkEndIndex, totalSize, timing, finalResult)
} else {
selfPostMessage(true, false, "", remotePath, formInfo.ChunkEndIndex, nil)
selfPostMessage(true, false, "", remotePath, formInfo.ChunkEndIndex, totalSize, timing, nil)
}
}(uploadData, remotePath, wp.wg)

Expand All @@ -527,13 +552,15 @@ func InitHasherMap() {
hasherMap = make(map[string]workerProcess)
}

func selfPostMessage(success, isFinal bool, errMsg, remotePath string, chunkEndIndex int, finalResult *FinalWorkerResult) {
func selfPostMessage(success, isFinal bool, errMsg, remotePath string, chunkEndIndex, totalSize int, timing int64, finalResult *FinalWorkerResult) {
obj := js.Global().Get("Object").New()
obj.Set("success", success)
obj.Set("error", errMsg)
obj.Set("isFinal", isFinal)
obj.Set("chunkEndIndex", chunkEndIndex)
obj.Set("remotePath", remotePath)
obj.Set("totalSize", totalSize)
obj.Set("timing", timing)
if finalResult != nil {
finalResultJSON, err := json.Marshal(finalResult)
if err != nil {
Expand Down Expand Up @@ -619,7 +646,7 @@ func parseEventData(data safejs.Value) (*FileMeta, *ChunkedUploadFormInfo, [][]b
return fileMeta, formInfo, fileShards, thumbnailChunkData, nil
}

func sendUploadRequest(dataBuffers []*bytes.Buffer, contentSlice []string, blobberURL, allocationID, allocationTx, httpMethod string, clientId ...string) (err error) {
func sendUploadRequest(dataBuffers []*bytes.Buffer, contentSlice []string, blobberURL, allocationID, allocationTx, httpMethod string, clientId ...string) (timing int64, err error) {
eg, _ := errgroup.WithContext(context.TODO())
for dataInd := 0; dataInd < len(dataBuffers); dataInd++ {
ind := dataInd
Expand All @@ -639,8 +666,10 @@ func sendUploadRequest(dataBuffers []*bytes.Buffer, contentSlice []string, blobb
err, shouldContinue = func() (err error, shouldContinue bool) {
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
now := time.Now()
err = zboxutil.FastHttpClient.DoTimeout(req, resp, DefaultUploadTimeOut)
fasthttp.ReleaseRequest(req)
timing = time.Since(now).Milliseconds()
if err != nil {
logger.Logger.Error("Upload : ", err, " baseURL ", blobberURL)
if errors.Is(err, fasthttp.ErrConnectionClosed) || errors.Is(err, syscall.EPIPE) || errors.Is(err, fasthttp.ErrDialTimeout) {
Expand Down Expand Up @@ -710,7 +739,7 @@ func sendUploadRequest(dataBuffers []*bytes.Buffer, contentSlice []string, blobb
return err
})
}
return eg.Wait()
return timing, eg.Wait()
}

type eventChanWorker struct {
Expand Down
Loading