-
Notifications
You must be signed in to change notification settings - Fork 189
/
Copy pathtinit.go
82 lines (70 loc) · 1.84 KB
/
tinit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Package transport provides long-lived http/tcp connections for
// intra-cluster communications (see README for details and usage example).
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION. All rights reserved.
*/
package transport
import (
"container/heap"
"os"
"strconv"
"time"
"github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
"github.com/NVIDIA/aistore/cmn/debug"
"github.com/NVIDIA/aistore/cmn/nlog"
"github.com/NVIDIA/aistore/memsys"
)
// transport defaults
const (
dfltTick = time.Second
dfltTickIdle = dfltTick << 8 // (when there are no streams to _collect_)
dfltIdleTeardown = 4 * time.Second // (see config.Transport.IdleTeardown)
)
const (
dfltCollectLog = 10 * time.Minute
dfltCollectChan = 256
)
type global struct {
tstats cos.StatsUpdater // strict subset of stats.Tracker interface (the minimum required)
mm *memsys.MMSA
}
var (
g global
)
func Init(tstats cos.StatsUpdater) *StreamCollector {
g.mm = memsys.PageMM()
g.tstats = tstats
nextSessionID.Store(100)
for i := range numHmaps {
hmaps[i] = make(hmap, 4)
}
// real stream collector
gc = &collector{
ctrlCh: make(chan ctrl, dfltCollectChan),
streams: make(map[string]*streamBase, 64),
heap: make([]*streamBase, 0, 64), // min-heap sorted by stream.time.ticks
}
gc.stopCh.Init()
heap.Init(gc)
sc = &StreamCollector{}
return sc
}
func burst(extra *Extra) (burst int) {
if extra.ChanBurst > 0 {
debug.Assert(extra.ChanBurst <= cmn.MaxTransportBurst, extra.ChanBurst)
return min(extra.ChanBurst, cmn.MaxTransportBurst)
}
if burst = extra.Config.Transport.Burst; burst == 0 {
burst = cmn.DfltTransportBurst
}
// (feat)
if a := os.Getenv("AIS_STREAM_BURST_NUM"); a != "" {
if burst64, err := strconv.ParseInt(a, 10, 0); err != nil {
nlog.Errorln(err)
} else {
burst = int(burst64)
}
}
return
}